1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.sandesha.storage.queue;
19
20 import org.apache.axis.components.logger.LogFactory;
21 import org.apache.commons.logging.Log;
22 import org.apache.sandesha.RMMessageContext;
23 import org.apache.sandesha.util.PolicyLoader;
24
25 import java.util.*;
26
27
28
29
30
31 /***
32 * @author Chamikara Jayalath
33 * @author Jaliya Ekanayaka
34 */
35
36 /***
37 * This class works as a hash map for storing response messages until they are
38 * sent.
39 */
40 public class OutgoingSequence extends AbstractSequence {
41
42 private String outSequenceId;
43 private boolean outSeqApproved;
44 private HashMap hash;
45 private ArrayList markedAsDelete;
46 private ArrayList sendMsgNoList;
47 private long lastMsgNo = -1;
48 private long nextAutoNumber;
49 private static final Log log = LogFactory.getLog(OutgoingSequence.class.getName());
50 public boolean terminateSent = false;
51 private boolean hasResponse = false;
52
53 public boolean hasResponse() {
54 return hasResponse;
55 }
56
57 public void setHasResponse(boolean hasResponse) {
58 this.hasResponse = hasResponse;
59 }
60
61 public boolean isTerminateSent() {
62 return terminateSent;
63 }
64
65 public void setTerminateSent(boolean terminateSent) {
66 this.terminateSent = terminateSent;
67 }
68
69 public OutgoingSequence(String sequenceId) {
70 this.sequenceId = sequenceId;
71 hash = new HashMap();
72 markedAsDelete = new ArrayList();
73 nextAutoNumber = 1;
74 outSeqApproved = false;
75 sendMsgNoList = new ArrayList();
76 }
77
78
79
80
81
82
83 public boolean isOutSeqApproved() {
84 return outSeqApproved;
85 }
86
87 public void setOutSeqApproved(boolean b) {
88 outSeqApproved = b;
89 }
90
91 public String getOutSequenceId() {
92 return outSequenceId;
93 }
94
95 public void setOutSequenceId(String string) {
96 outSequenceId = string;
97 }
98
99 /***
100 * adds the message to map.
101 */
102 public Object putNewMessage(RMMessageContext msg) {
103 Long key = new Long(nextAutoNumber);
104 Object obj = hash.put(key, msg);
105 increaseAutoNo();
106 return obj;
107 }
108
109 /***
110 * Increases auto number by 1.
111 */
112 private void increaseAutoNo() {
113 nextAutoNumber++;
114 }
115
116 /***
117 * Returns the next deliverable message if has any. Otherwise returns null.
118 */
119 public RMMessageContext getNextMessageToSend() {
120 RMMessageContext minMsg = null;
121 Iterator keys = hash.keySet().iterator();
122
123 whileLoop: while (keys.hasNext()) {
124 RMMessageContext tempMsg;
125 tempMsg = (RMMessageContext) hash.get(keys.next());
126 Long msgNo = new Long(tempMsg.getMsgNumber());
127 if (markedAsDelete.contains(msgNo)) {
128 continue;
129 }
130 long lastSentTime = tempMsg.getLastSentTime();
131 Date d = new Date();
132 long currentTime = d.getTime();
133
134 long retransmissionInterval = PolicyLoader.getInstance().getBaseRetransmissionInterval();
135 if (currentTime >= lastSentTime + retransmissionInterval) {
136 if (minMsg == null)
137 minMsg = tempMsg;
138 else {
139 long msgNo1, msgNo2;
140 msgNo1 = tempMsg.getMsgNumber();
141 msgNo2 = minMsg.getMsgNumber();
142 if (msgNo1 < msgNo2)
143 minMsg = tempMsg;
144 }
145 }
146 }
147
148 Date d = new Date();
149 long time = d.getTime();
150 if (minMsg != null) {
151 minMsg.setLastSentTime(time);
152 }
153
154 return minMsg;
155 }
156
157 public boolean hasMessage(Long key) {
158 Object obj = hash.get(key);
159
160 return (!(obj == null));
161 }
162
163 public void clearSequence(boolean yes) {
164 if (!yes)
165 return;
166 hash.clear();
167 nextAutoNumber = 1;
168 outSeqApproved = false;
169 outSequenceId = null;
170 sequenceId = null;
171 }
172
173 public Set getAllKeys() {
174 return hash.keySet();
175 }
176
177 public String getMessageId(Long key) {
178 RMMessageContext msg = (RMMessageContext) hash.get(key);
179 if (msg == null)
180 return null;
181
182 return msg.getMessageID();
183
184 }
185
186
187 public RMMessageContext deleteMessage(Long msgId) {
188 RMMessageContext msg = (RMMessageContext) hash.get(msgId);
189 if (msg == null)
190 return null;
191 hash.remove(msgId);
192 return msg;
193 }
194
195 public boolean markMessageDeleted(Long messageNo) {
196 if (hash.containsKey(messageNo)) {
197 markedAsDelete.add(messageNo);
198 return true;
199 }
200 return false;
201 }
202
203 public long nextMessageNumber() {
204 return nextAutoNumber;
205 }
206
207 public boolean isMessagePresent(String msgId) {
208 boolean b = false;
209 b = hash.containsKey(msgId);
210 return b;
211 }
212
213 public boolean hasMessageWithId(String msgId) {
214 Iterator it = hash.keySet().iterator();
215 boolean result = false;
216 while (it.hasNext()) {
217 RMMessageContext msg = (RMMessageContext) hash.get(it.next());
218 if (msg.getMessageID().equals(msgId)) {
219 result = true;
220 break;
221 }
222 }
223 return result;
224 }
225
226 public List getReceivedMsgNumbers() {
227 List result = new ArrayList();
228 Iterator it = hash.keySet().iterator();
229
230 while (it.hasNext()) {
231 Object key = it.next();
232 RMMessageContext msg = (RMMessageContext) hash.get(key);
233 long l = msg.getMsgNumber();
234 result.add(new Long(l));
235 }
236 return result;
237 }
238
239 public void setAckReceived(long msgNo) {
240 RMMessageContext msg = (RMMessageContext) hash.get(new Long(msgNo));
241 if (msg != null) {
242 msg.setAckReceived(true);
243 } else {
244 log.error("ERROR: MESSAGE IS NULL IN ResponseSeqHash");
245 }
246
247 }
248
249 public boolean isAckComplete() {
250 long lastMsgNo = getLastMsgNumber();
251 if (lastMsgNo <= 0) {
252 return false;
253 }
254 Iterator it = hash.keySet().iterator();
255 for (long i = 1; i < lastMsgNo; i++) {
256 if (!hasMessage(new Long(i))) {
257 return false;
258 }
259 }
260
261 it = hash.keySet().iterator();
262 while (it.hasNext()) {
263 RMMessageContext msg = (RMMessageContext) hash.get(it.next());
264 if (!msg.isAckReceived()) {
265 return false;
266 }
267 }
268 return true;
269
270 }
271
272 public void addMsgToSendList(long msgNo) {
273 sendMsgNoList.add(new Long(msgNo));
274 }
275
276 public boolean isMsgInSentList(long msgNo) {
277 return sendMsgNoList.contains(new Long(msgNo));
278 }
279
280 public boolean hasLastMsgReceived() {
281 if (lastMsgNo > 0)
282 return true;
283
284 return false;
285 }
286
287 public long getLastMsgNumber() {
288 if (lastMsgNo > 0)
289 return lastMsgNo;
290
291 return -1;
292 }
293
294 public void setLastMsg(long lastMsg) {
295 lastMsgNo = lastMsg;
296 }
297
298 }