View Javadoc

1   /*
2    * Copyright  1999-2004 The Apache Software Foundation.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  
18  package org.apache.sandesha.storage.queue;
19  
20  import org.apache.axis.components.logger.LogFactory;
21  import org.apache.commons.logging.Log;
22  import org.apache.sandesha.RMMessageContext;
23  import org.apache.sandesha.util.PolicyLoader;
24  
25  import java.util.*;
26  
27  /*
28   * Created on Aug 4, 2004 at 5:08:29 PM
29   */
30  
31  /***
32   * @author Chamikara Jayalath
33   * @author Jaliya Ekanayaka
34   */
35  
36  /***
37   * This class works as a hash map for storing response messages until they are
38   * sent.
39   */
40  public class OutgoingSequence extends AbstractSequence {
41  
42      private String outSequenceId;
43      private boolean outSeqApproved;
44      private HashMap hash;
45      private ArrayList markedAsDelete;
46      private ArrayList sendMsgNoList;
47      private long lastMsgNo = -1;
48      private long nextAutoNumber; // key for storing messages.
49      private static final Log log = LogFactory.getLog(OutgoingSequence.class.getName());
50      public boolean terminateSent = false;
51      private boolean hasResponse = false;
52  
53      public boolean hasResponse() {
54          return hasResponse;
55      }
56  
57      public void setHasResponse(boolean hasResponse) {
58          this.hasResponse = hasResponse;
59      }
60  
61      public boolean isTerminateSent() {
62          return terminateSent;
63      }
64  
65      public void setTerminateSent(boolean terminateSent) {
66          this.terminateSent = terminateSent;
67      }
68  
69      public OutgoingSequence(String sequenceId) {
70          this.sequenceId = sequenceId;
71          hash = new HashMap();
72          markedAsDelete = new ArrayList();
73          nextAutoNumber = 1; //This is the key for storing messages.
74          outSeqApproved = false;
75          sendMsgNoList = new ArrayList();
76      }
77  
78      /*
79       * public boolean hasMessagesToSend(){ return hasMessagesToSend; }
80       */
81  
82  
83      public boolean isOutSeqApproved() {
84          return outSeqApproved;
85      }
86  
87      public void setOutSeqApproved(boolean b) {
88          outSeqApproved = b;
89      }
90  
91      public String getOutSequenceId() {
92          return outSequenceId;
93      }
94  
95      public void setOutSequenceId(String string) {
96          outSequenceId = string;
97      }
98  
99      /***
100      * adds the message to map.
101      */
102     public Object putNewMessage(RMMessageContext msg) {
103         Long key = new Long(nextAutoNumber);
104         Object obj = hash.put(key, msg);
105         increaseAutoNo();
106         return obj;
107     }
108 
109     /***
110      * Increases auto number by 1.
111      */
112     private void increaseAutoNo() {
113         nextAutoNumber++;
114     }
115 
116     /***
117      * Returns the next deliverable message if has any. Otherwise returns null.
118      */
119     public RMMessageContext getNextMessageToSend() {
120         RMMessageContext minMsg = null;
121         Iterator keys = hash.keySet().iterator();
122 
123         whileLoop: while (keys.hasNext()) {
124             RMMessageContext tempMsg;
125             tempMsg = (RMMessageContext) hash.get(keys.next());
126             Long msgNo = new Long(tempMsg.getMsgNumber());
127             if (markedAsDelete.contains(msgNo)) {
128                 continue;
129             }
130             long lastSentTime = tempMsg.getLastSentTime();
131             Date d = new Date();
132             long currentTime = d.getTime();
133 
134             long retransmissionInterval = PolicyLoader.getInstance().getBaseRetransmissionInterval();
135             if (currentTime >= lastSentTime + retransmissionInterval) {
136                 if (minMsg == null)
137                     minMsg = tempMsg;
138                 else {
139                     long msgNo1, msgNo2;
140                     msgNo1 = tempMsg.getMsgNumber();
141                     msgNo2 = minMsg.getMsgNumber();
142                     if (msgNo1 < msgNo2)
143                         minMsg = tempMsg;
144                 }
145             }
146         }
147 
148         Date d = new Date();
149         long time = d.getTime();
150         if (minMsg != null) {
151             minMsg.setLastSentTime(time);
152         }
153 
154         return minMsg;
155     }
156 
157     public boolean hasMessage(Long key) {
158         Object obj = hash.get(key);
159 
160         return (!(obj == null));
161     }
162 
163     public void clearSequence(boolean yes) {
164         if (!yes)
165             return;
166         hash.clear();
167         nextAutoNumber = 1;
168         outSeqApproved = false;
169         outSequenceId = null;
170         sequenceId = null;
171     }
172 
173     public Set getAllKeys() {
174         return hash.keySet();
175     }
176 
177     public String getMessageId(Long key) {
178         RMMessageContext msg = (RMMessageContext) hash.get(key);
179         if (msg == null)
180             return null;
181 
182         return msg.getMessageID();
183 
184     }
185 
186     //Deleting returns the deleted message.
187     public RMMessageContext deleteMessage(Long msgId) {
188         RMMessageContext msg = (RMMessageContext) hash.get(msgId);
189         if (msg == null)
190             return null;
191         hash.remove(msgId);
192         return msg;
193     }
194 
195     public boolean markMessageDeleted(Long messageNo) {
196         if (hash.containsKey(messageNo)) {
197             markedAsDelete.add(messageNo);
198             return true;
199         }
200         return false;
201     }
202 
203     public long nextMessageNumber() {
204         return nextAutoNumber;
205     }
206 
207     public boolean isMessagePresent(String msgId) {
208         boolean b = false;
209         b = hash.containsKey(msgId);
210         return b;
211     }
212 
213     public boolean hasMessageWithId(String msgId) {
214         Iterator it = hash.keySet().iterator();
215         boolean result = false;
216         while (it.hasNext()) {
217             RMMessageContext msg = (RMMessageContext) hash.get(it.next());
218             if (msg.getMessageID().equals(msgId)) {
219                 result = true;
220                 break;
221             }
222         }
223         return result;
224     }
225 
226     public List getReceivedMsgNumbers() {
227         List result = new ArrayList();
228         Iterator it = hash.keySet().iterator();
229 
230         while (it.hasNext()) {
231             Object key = it.next();
232             RMMessageContext msg = (RMMessageContext) hash.get(key);
233             long l = msg.getMsgNumber();
234             result.add(new Long(l));
235         }
236         return result;
237     }
238 
239     public void setAckReceived(long msgNo) {
240         RMMessageContext msg = (RMMessageContext) hash.get(new Long(msgNo));
241         if (msg != null) {
242             msg.setAckReceived(true);
243         } else {
244             log.error("ERROR: MESSAGE IS NULL IN ResponseSeqHash");
245         }
246 
247     }
248 
249     public boolean isAckComplete() {
250         long lastMsgNo = getLastMsgNumber();
251         if (lastMsgNo <= 0) {
252             return false;
253         }
254         Iterator it = hash.keySet().iterator();
255         for (long i = 1; i < lastMsgNo; i++) {
256             if (!hasMessage(new Long(i))) {
257                 return false;
258             }
259         }
260 
261         it = hash.keySet().iterator();
262         while (it.hasNext()) {
263             RMMessageContext msg = (RMMessageContext) hash.get(it.next());
264             if (!msg.isAckReceived()) {
265                 return false;
266             }
267         }
268         return true;
269 
270     }
271 
272     public void addMsgToSendList(long msgNo) {
273         sendMsgNoList.add(new Long(msgNo));
274     }
275 
276     public boolean isMsgInSentList(long msgNo) {
277         return sendMsgNoList.contains(new Long(msgNo));
278     }
279 
280     public boolean hasLastMsgReceived() {
281         if (lastMsgNo > 0)
282             return true;
283 
284         return false;
285     }
286 
287     public long getLastMsgNumber() {
288         if (lastMsgNo > 0)
289             return lastMsgNo;
290 
291         return -1;
292     }
293 
294     public void setLastMsg(long lastMsg) {
295         lastMsgNo = lastMsg;
296     }
297 
298 }