1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.sandesha.storage.queue;
19
20 import org.apache.axis.components.logger.LogFactory;
21 import org.apache.axis.components.uuid.UUIDGen;
22 import org.apache.axis.components.uuid.UUIDGenFactory;
23 import org.apache.commons.logging.Log;
24 import org.apache.sandesha.Constants;
25 import org.apache.sandesha.RMMessageContext;
26 import org.apache.sandesha.util.PolicyLoader;
27
28 import java.util.*;
29
30
31
32
33
34
35 /***
36 * @author Chamikara Jayalath
37 * @author Jaliya Ekanayaka
38 */
39
40 public class SandeshaQueue {
41
42 private static SandeshaQueue clientQueue = null;
43 private static SandeshaQueue serverQueue = null;
44 HashMap incomingMap;
45 HashMap outgoingMap;
46 ArrayList highPriorityQueue;
47 HashMap queueBin;
48 ArrayList lowPriorityQueue;
49 private List requestedSequences;
50 HashMap acksToMap;
51 HashMap offerMap;
52 private static final Log log = LogFactory.getLog(SandeshaQueue.class.getName());
53
54 public static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
55
56 private SandeshaQueue() {
57 incomingMap = new HashMap();
58 outgoingMap = new HashMap();
59 highPriorityQueue = new ArrayList();
60 queueBin = new HashMap();
61 lowPriorityQueue = new ArrayList();
62 requestedSequences = new ArrayList();
63 acksToMap = new HashMap();
64 offerMap = new HashMap();
65 }
66
67 public static SandeshaQueue getInstance(byte endPoint) {
68 if (endPoint == Constants.CLIENT) {
69 if (clientQueue == null) {
70 clientQueue = new SandeshaQueue();
71 }
72 return clientQueue;
73 } else {
74 if (serverQueue == null) {
75 serverQueue = new SandeshaQueue();
76 }
77 return serverQueue;
78 }
79
80 }
81
82 public boolean addMessageToIncomingSequence(String seqId, Long messageNo,
83 RMMessageContext msgCon) throws QueueException {
84 boolean successful = false;
85
86 if (seqId == null || msgCon == null)
87 throw new QueueException(Constants.Queue.ADD_ERROR);
88
89 if (isIncomingSequenceExists(seqId)) {
90 IncomingSequence seqHash = (IncomingSequence) incomingMap.get(seqId);
91
92 synchronized (seqHash) {
93 if (seqHash == null)
94 throw new QueueException(Constants.Queue.QUEUE_INCONSIS);
95
96 if (seqHash.hasMessage(messageNo))
97 throw new QueueException(Constants.Queue.MESSAGE_EXISTS);
98
99 if (msgCon.isLastMessage())
100 seqHash.setLastMsg(msgCon.getMsgNumber());
101
102 seqHash.setSequenceId(msgCon.getSequenceID());
103 seqHash.putNewMessage(messageNo, msgCon);
104 successful = true;
105 }
106 }
107
108 return successful;
109 }
110
111 public boolean addMessageToOutgoingSequence(String seqId, RMMessageContext msgCon)
112 throws QueueException {
113 boolean successful = false;
114
115 if (seqId == null || msgCon == null)
116 throw new QueueException(Constants.Queue.ADD_ERROR);
117
118 if (isOutgoingSequenceExists(seqId)) {
119 OutgoingSequence resSeqHash = (OutgoingSequence) outgoingMap.get(seqId);
120
121 synchronized (resSeqHash) {
122 if (resSeqHash == null)
123 throw new QueueException(Constants.Queue.QUEUE_INCONSIS);
124 resSeqHash.putNewMessage(msgCon);
125 successful = true;
126
127
128 if (msgCon.isLastMessage())
129 resSeqHash.setLastMsg(msgCon.getMsgNumber());
130
131 if (msgCon.isHasResponse())
132 resSeqHash.setHasResponse(true);
133 }
134 }
135 return successful;
136 }
137
138 public boolean isIncomingSequenceExists(String seqId) {
139 synchronized (incomingMap) {
140 return incomingMap.containsKey(seqId);
141 }
142 }
143
144 public synchronized boolean isOutgoingSequenceExists(String resSeqId) {
145 synchronized (outgoingMap) {
146 return outgoingMap.containsKey(resSeqId);
147 }
148 }
149
150 public RMMessageContext nextIncomingMessageToProcess(Object sequence) throws QueueException {
151 if (sequence == null)
152 return null;
153
154 AbstractSequence absSeq = (AbstractSequence) sequence;
155
156 IncomingSequence sh = (IncomingSequence) incomingMap.get(absSeq.getSequenceId());
157 synchronized (sh) {
158 if (sh == null)
159 throw new QueueException(Constants.Queue.SEQUENCE_ABSENT);
160
161 if (!sh.hasProcessableMessages())
162 return null;
163
164 RMMessageContext msgCon = sh.getNextMessageToProcess();
165 return msgCon;
166 }
167 }
168
169 public RMMessageContext nextOutgoingMessageToSend() throws QueueException {
170 RMMessageContext msg = null;
171 synchronized (outgoingMap) {
172 Iterator it = outgoingMap.keySet().iterator();
173
174 whileLoop: while (it.hasNext()) {
175 RMMessageContext tempMsg;
176 String tempKey = (String) it.next();
177 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(tempKey);
178 if (rsh.isOutSeqApproved()) {
179 tempMsg = rsh.getNextMessageToSend();
180 if (tempMsg != null) {
181 msg = tempMsg;
182 msg.setSequenceID(rsh.getOutSequenceId());
183 msg.setOldSequenceID(rsh.getSequenceId());
184 break whileLoop;
185 }
186 }
187 }
188 }
189 return msg;
190 }
191
192 public void createNewIncomingSequence(String sequenceId) throws QueueException {
193 if (sequenceId == null)
194 throw new QueueException(Constants.Queue.SEQUENCE_ID_NULL);
195
196 synchronized (incomingMap) {
197 IncomingSequence sh = new IncomingSequence(sequenceId);
198 incomingMap.put(sequenceId, sh);
199
200 }
201 }
202
203 public void createNewOutgoingSequence(String sequenceId) throws QueueException {
204 if (sequenceId == null)
205 throw new QueueException(Constants.Queue.SEQUENCE_ID_NULL);
206
207 synchronized (outgoingMap) {
208 OutgoingSequence rsh = new OutgoingSequence(sequenceId);
209 outgoingMap.put(sequenceId, rsh);
210 }
211
212 }
213
214 /***
215 * Adds a new message to the responses queue.
216 */
217 public void addPriorityMessage(RMMessageContext msg) throws QueueException {
218 synchronized (highPriorityQueue) {
219 if (msg == null)
220 throw new QueueException(Constants.Queue.MESSAGE_ID_NULL);
221
222 highPriorityQueue.add(msg);
223 }
224 }
225
226 public void addLowPriorityMessage(RMMessageContext msg) throws QueueException {
227 synchronized (lowPriorityQueue) {
228 if (msg == null)
229 throw new QueueException(Constants.Queue.MESSAGE_ID_NULL);
230 lowPriorityQueue.add(msg);
231 }
232 }
233
234
235 public RMMessageContext nextPriorityMessageToSend() throws QueueException {
236
237 synchronized (highPriorityQueue) {
238
239
240 if (highPriorityQueue.size() <= 0)
241 return null;
242
243 RMMessageContext msg = null;
244 int size = highPriorityQueue.size();
245 synchronized (highPriorityQueue) {
246 forLoop:
247 for (int i = 0; i < size; i++) {
248 RMMessageContext tempMsg = (RMMessageContext) highPriorityQueue.get(i);
249 if (tempMsg != null) {
250 switch (tempMsg.getMessageType()) {
251
252 case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
253 long lastSentTime = tempMsg.getLastSentTime();
254 Date d = new Date();
255 long currentTime = d.getTime();
256 if (currentTime >=
257 lastSentTime + Constants.RETRANSMISSION_INTERVAL) {
258
259 String newCreateSeqId = Constants.UUID + uuidGen.nextUUID();
260 tempMsg.setMessageID(newCreateSeqId);
261
262 tempMsg.setLastSentTime(currentTime);
263 msg = tempMsg;
264 break forLoop;
265
266
267 }
268 break;
269 case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
270
271
272
273
274
275
276
277 String sequenceId = tempMsg.getSequenceID();
278 if (sequenceId == null)
279 continue;
280
281 String key = getKeyFromIncomingSequenceId(sequenceId);
282 IncomingSequence sequence = (IncomingSequence) incomingMap.get(key);
283 if (sequence == null)
284 continue;
285
286 d = new Date();
287 currentTime = d.getTime();
288
289 if (sequence.isSendAck()) {
290
291 tempMsg.setLastSentTime(currentTime);
292 msg = tempMsg;
293 sequence.setSendAck(false);
294 sequence.setFinalAckedTime(currentTime);
295 break forLoop;
296
297 } else {
298 long ackInterval = PolicyLoader.getInstance()
299 .getAcknowledgementInterval();
300 long finalAckedTime = sequence.getFinalAckedTime();
301 long finalMsgArrivedTime = sequence.getFinalMsgArrivedTime();
302
303 if ((finalMsgArrivedTime > finalAckedTime) &&
304 (currentTime > finalMsgArrivedTime + ackInterval))
305 sequence.setSendAck(true);
306 }
307
308 break;
309 default:
310 highPriorityQueue.remove(i);
311 queueBin.put(tempMsg.getMessageID(), tempMsg);
312 msg = tempMsg;
313 break forLoop;
314 }
315 }
316 }
317 }
318
319
320 return msg;
321
322 }
323 }
324
325 public List nextAllSeqsToProcess() {
326 List seqs = new ArrayList();
327
328 synchronized (incomingMap) {
329 Iterator it = incomingMap.keySet().iterator();
330
331 while (it.hasNext()) {
332 Object tempKey = it.next();
333 IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey);
334 if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
335 seqs.add(sh);
336 }
337 return seqs;
338 }
339 }
340
341 public List nextAllSeqIdsToProcess() {
342 List ids = new ArrayList();
343
344 synchronized (incomingMap) {
345 Iterator it = incomingMap.keySet().iterator();
346
347 while (it.hasNext()) {
348 Object tempKey = it.next();
349 IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey);
350 if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
351 ids.add(sh.getSequenceId());
352 }
353 return ids;
354 }
355 }
356
357 public void clear(boolean yes) {
358 if (!yes)
359 return;
360 incomingMap.clear();
361 highPriorityQueue.clear();
362 outgoingMap.clear();
363 queueBin.clear();
364 }
365
366
367
368
369
370
371
372
373
374 public void setSequenceLock(String sequenceId, boolean lock) {
375 IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
376 sh.setProcessLock(lock);
377 }
378
379 public Set getAllReceivedMsgNumsOfIncomingSeq(String sequenceId) {
380 IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
381 if (sh != null)
382 return sh.getAllKeys();
383 else
384 return null;
385 }
386
387 public boolean isIncomingMessageExists(String sequenceId, Long messageNo) {
388 IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
389
390 if (sh != null)
391 return sh.hasMessage(messageNo);
392 else
393 return false;
394 }
395
396 public void setOutSequence(String seqId, String outSeqId) {
397 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
398 synchronized (rsh) {
399 if (rsh == null) {
400 if (log.isDebugEnabled())
401 log.debug("ERROR: RESPONSE SEQ IS NULL");
402 return;
403 }
404 rsh.setOutSequenceId(outSeqId);
405 }
406 }
407
408 public void setOutSequenceApproved(String seqId, boolean approved) {
409 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
410 synchronized (rsh) {
411 if (rsh == null) {
412 if (log.isDebugEnabled())
413 log.debug("ERROR: RESPONSE SEQ IS NULL");
414 return;
415 }
416 rsh.setOutSeqApproved(approved);
417 }
418 }
419
420 public String getSequenceOfOutSequence(String outSequence) {
421 synchronized (outgoingMap) {
422 if (outSequence == null) {
423 return null;
424 }
425 String tempSeqId = null;
426 Iterator it = outgoingMap.keySet().iterator();
427 while (it.hasNext()) {
428 tempSeqId = (String) it.next();
429 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(tempSeqId);
430 String tempOutSequence = rsh.getOutSequenceId();
431 if (outSequence.equals(tempOutSequence)) {
432 break;
433 }
434 }
435 return tempSeqId;
436 }
437
438 }
439
440 public void displayOutgoingMap() {
441 Iterator it = outgoingMap.keySet().iterator();
442 System.out.println("------------------------------------");
443 System.out.println(" DISPLAYING RESPONSE MAP");
444 System.out.println("------------------------------------");
445 while (it.hasNext()) {
446 String s = (String) it.next();
447 System.out.println("\n Sequence id - " + s);
448 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(s);
449
450 System.out.println("out seq id:" + rsh.getOutSequenceId());
451 Iterator it1 = rsh.getAllKeys().iterator();
452 while (it1.hasNext()) {
453 Long l = (Long) it1.next();
454 String msgId = rsh.getMessageId(l);
455 System.out.println("* key -" + l.longValue() + "- MessageID -" + msgId + "-");
456 }
457 }
458 System.out.println("\n");
459 }
460
461 public void displayIncomingMap() {
462 Iterator it = incomingMap.keySet().iterator();
463 System.out.println("------------------------------------");
464 System.out.println(" DISPLAYING SEQUENCE MAP");
465 System.out.println("------------------------------------");
466 while (it.hasNext()) {
467 String s = (String) it.next();
468 System.out.println("\n Sequence id - " + s);
469
470 IncomingSequence sh = (IncomingSequence) incomingMap.get(s);
471
472 Iterator it1 = sh.getAllKeys().iterator();
473 while (it1.hasNext()) {
474 Long l = (Long) it1.next();
475 String msgId = sh.getMessageId(l);
476 System.out.println("* key -" + l.longValue() + "- MessageID -" + msgId + "-");
477 }
478 }
479 System.out.println("\n");
480 }
481
482 public void displayPriorityQueue() {
483
484 System.out.println("------------------------------------");
485 System.out.println(" DISPLAYING PRIORITY QUEUE");
486 System.out.println("------------------------------------");
487
488 Iterator it = highPriorityQueue.iterator();
489 while (it.hasNext()) {
490 RMMessageContext msg = (RMMessageContext) it.next();
491 String id = msg.getMessageID();
492 int type = msg.getMessageType();
493
494 System.out.println("Message " + id + " Type " + type);
495 }
496 System.out.println("\n");
497 }
498
499 public void markOutgoingMessageToDelete(String sequenceId, Long messageNo) {
500 String sequence = getSequenceOfOutSequence(sequenceId);
501 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(sequence);
502
503 if (rsh == null) {
504 log.error(Constants.Queue.RESPONSE_SEQ_NULL);
505 return;
506 }
507
508 synchronized (rsh) {
509
510 rsh.markMessageDeleted(messageNo);
511
512 }
513
514 }
515
516 public void movePriorityMsgToBin(String messageId) {
517
518 synchronized (highPriorityQueue) {
519 int size = highPriorityQueue.size();
520 for (int i = 0; i < size; i++) {
521 RMMessageContext msg = (RMMessageContext) highPriorityQueue.get(i);
522
523 String tempMsgId;
524 try {
525 tempMsgId = (String) msg.getMessageIdList().get(0);
526 } catch (Exception ex) {
527 tempMsgId = msg.getMessageID();
528 }
529 if (tempMsgId.equals(messageId)) {
530 highPriorityQueue.remove(i);
531 queueBin.put(messageId, msg);
532 return;
533 }
534 }
535 }
536 }
537
538 public long getNextOutgoingMessageNumber(String seq) {
539 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seq);
540 if (rsh == null) {
541 try {
542 createNewOutgoingSequence(seq);
543 } catch (QueueException q) {
544 log.error(q.getStackTrace());
545 }
546 }
547 rsh = (OutgoingSequence) outgoingMap.get(seq);
548 synchronized (rsh) {
549 Iterator keys = rsh.getAllKeys().iterator();
550
551 long msgNo = rsh.nextMessageNumber();
552 return (msgNo);
553 }
554 }
555
556 public synchronized RMMessageContext checkForResponseMessage(String requestId, String seqId) {
557 IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId);
558 if (sh == null) {
559 return null;
560 }
561 synchronized (sh) {
562 RMMessageContext msg = sh.getMessageRelatingTo(requestId);
563 return msg;
564 }
565 }
566
567 public String searchForSequenceId(String messageId) {
568 Iterator it = outgoingMap.keySet().iterator();
569
570 String key = null;
571 while (it.hasNext()) {
572 key = (String) it.next();
573 Object obj = outgoingMap.get(key);
574 if (obj != null) {
575 OutgoingSequence hash = (OutgoingSequence) obj;
576 boolean hasMsg = hash.hasMessageWithId(messageId);
577
578 if (!hasMsg)
579 key = null;
580 else
581 break;
582
583 }
584
585 }
586
587 return key;
588 }
589
590 public void setAckReceived(String seqId, long msgNo) {
591 Iterator it = outgoingMap.keySet().iterator();
592 String key = null;
593 while (it.hasNext()) {
594 key = (String) it.next();
595 Object obj = outgoingMap.get(key);
596
597 if (obj != null) {
598 OutgoingSequence hash = (OutgoingSequence) obj;
599 if (hash.getOutSequenceId().equals(seqId)) {
600 hash.setAckReceived(msgNo);
601 }
602 }
603 }
604
605 }
606
607 public RMMessageContext getLowPriorityMessageIfAcked() {
608 synchronized (lowPriorityQueue) {
609 int size = lowPriorityQueue.size();
610 RMMessageContext terminateMsg = null;
611 for (int i = 0; i < size; i++) {
612
613 RMMessageContext temp;
614 temp = (RMMessageContext) lowPriorityQueue.get(i);
615 String seqId = temp.getSequenceID();
616 OutgoingSequence hash = null;
617 hash = (OutgoingSequence) outgoingMap.get(seqId);
618 if (hash == null) {
619 log.error("ERROR: HASH NOT FOUND SEQ ID " + seqId);
620 }
621 if (hash != null) {
622 boolean complete = hash.isAckComplete();
623 if (complete)
624 terminateMsg = temp;
625 if (terminateMsg != null) {
626 terminateMsg.setSequenceID(hash.getOutSequenceId());
627 lowPriorityQueue.remove(i);
628 break;
629 }
630 }
631 }
632 return terminateMsg;
633 }
634
635 }
636
637 public void addSendMsgNo(String seqId, long msgNo) {
638 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
639 if (rsh != null) {
640
641 synchronized (rsh) {
642 rsh.addMsgToSendList(msgNo);
643 }
644 }
645 }
646
647 public boolean isSentMsg(String seqId, long msgNo) {
648 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
649
650 if (rsh == null) {
651 return false;
652 }
653 synchronized (rsh) {
654 return rsh.isMsgInSentList(msgNo);
655 }
656
657
658 }
659
660 public boolean hasLastIncomingMsgReceived(String seqId) {
661
662 IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId);
663
664 if (sh == null) {
665 return false;
666 }
667 synchronized (sh) {
668 return sh.hasLastMsgReceived();
669 }
670 }
671
672 public long getLastIncomingMsgNo(String seqId) {
673 IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId);
674 if (sh == null) {
675 return 0;
676 }
677 synchronized (sh) {
678 return sh.getLastMsgNumber();
679 }
680 }
681
682 public void addRequestedSequence(String seqId) {
683 requestedSequences.add(seqId);
684 }
685
686 public boolean isRequestedSeqPresent(String seqId) {
687 return requestedSequences.contains(seqId);
688 }
689
690 public String getKeyFromIncomingSequenceId(String seqId) {
691 synchronized (incomingMap) {
692 Iterator it = incomingMap.keySet().iterator();
693 while (it.hasNext()) {
694 String key = (String) it.next();
695 IncomingSequence is = (IncomingSequence) incomingMap.get(key);
696 String seq = is.getSequenceId();
697 if (seq == null)
698 continue;
699
700 if (seq.equals(seqId))
701 return key;
702 }
703 return null;
704 }
705 }
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735 public String getKeyFromOutgoingSequenceId(String seqId) {
736 synchronized (outgoingMap) {
737 Iterator it = outgoingMap.keySet().iterator();
738 while (it.hasNext()) {
739 String key = (String) it.next();
740 OutgoingSequence is = (OutgoingSequence) outgoingMap.get(key);
741 String seq = is.getOutSequenceId();
742 if (seq == null)
743 continue;
744
745 if (seq.equals(seqId))
746 return key;
747 }
748 return null;
749 }
750 }
751
752 public boolean isAllOutgoingTerminateSent() {
753 synchronized (outgoingMap) {
754 Iterator keys = outgoingMap.keySet().iterator();
755 boolean found = false;
756
757 while (keys.hasNext()) {
758 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(keys.next());
759 if (ogs.isTerminateSent()) {
760 found = true;
761 break;
762 }
763 }
764
765 return found;
766 }
767 }
768
769 public boolean isAllIncommingTerminateReceived() {
770 synchronized (incomingMap) {
771 Iterator keys = incomingMap.keySet().iterator();
772
773 while (keys.hasNext()) {
774 Object key = keys.next();
775 IncomingSequence ics = (IncomingSequence) incomingMap.get(key);
776 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(key);
777
778 boolean hasResponse = ogs.hasResponse();
779
780 if (hasResponse && !ics.isTerminateReceived())
781 return false;
782 }
783
784 return true;
785 }
786 }
787
788 public void setTerminateSend(String seqId) {
789 synchronized (outgoingMap) {
790 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
791 ogs.setTerminateSent(true);
792 }
793 }
794
795 public void setTerminateReceived(String seqId) {
796 IncomingSequence ics = (IncomingSequence) incomingMap.get(getKeyFromIncomingSequenceId(seqId));
797 ics.setTerminateReceived(true);
798 }
799
800 public void setAcksTo(String seqId, String acksTo) {
801
802 if (seqId == null) {
803 log.error("ERROR: seq is null in setAcksTo");
804 return;
805 }
806
807 acksToMap.put(seqId, acksTo);
808 }
809
810 public String getAcksTo(String seqId) {
811
812 if (seqId == null) {
813 log.error("ERROR: seq is null in getAcksTo");
814 return null;
815 }
816
817 return (String) acksToMap.get(seqId);
818 }
819
820
821 public void addOffer(String msgID, String offerID) {
822 if (msgID == null) {
823 log.error(" MessageID is null in addOffer");
824 }
825 offerMap.put(msgID, offerID);
826 }
827
828 public String getOffer(String msgID) {
829 if (msgID == null) {
830 log.error(" MessageID is null in getOffer");
831 return null;
832 }
833 return (String) offerMap.get(msgID);
834 }
835
836 public boolean isOutgoingTerminateSent(String seqId) {
837 synchronized (outgoingMap) {
838 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
839 if (ogs != null) {
840 if (ogs.isTerminateSent())
841 return true;
842 else
843 return false;
844 }
845 return false;
846 }
847
848 }
849
850 public boolean isIncommingTerminateReceived(String seqId) {
851 synchronized (incomingMap) {
852
853 IncomingSequence ics = (IncomingSequence) incomingMap.get(seqId);
854 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
855
856 boolean hasResponse = false;
857 if (ogs != null) {
858 hasResponse = ogs.hasResponse();
859 }
860
861 if (hasResponse && ics != null && !ics.isTerminateReceived())
862 return false;
863 else
864 return true;
865 }
866
867 }
868
869 public void updateFinalMessageArrivedTime(String sequenceId) {
870 synchronized (incomingMap) {
871 IncomingSequence ics = (IncomingSequence) incomingMap.get(sequenceId);
872 if (ics == null)
873 return;
874
875 Date d = new Date();
876 long time = d.getTime();
877 ics.setFinalMsgArrivedTime(time);
878 }
879 }
880
881 public void sendAck(String sequenceId) {
882 synchronized (incomingMap) {
883 IncomingSequence ics = (IncomingSequence) incomingMap.get(sequenceId);
884 if (ics == null)
885 return;
886
887 ics.setSendAck(true);
888 }
889 }
890
891 public void removeAllAcks(String sequenceID) {
892 synchronized (highPriorityQueue) {
893 int size = highPriorityQueue.size();
894
895 ArrayList remLst = new ArrayList();
896
897 for (int i = 0; i < size; i++) {
898 RMMessageContext msg = (RMMessageContext) highPriorityQueue.get(i);
899 if (msg.getSequenceID() != null)
900 if (msg.getSequenceID().equals(sequenceID) && msg.getMessageType() == Constants.MSG_TYPE_ACKNOWLEDGEMENT)
901 remLst.add(new Integer(i));
902 }
903
904 for (int i = 0; i < remLst.size(); i++) {
905 Integer in = (Integer) remLst.get(i);
906 highPriorityQueue.remove(in.intValue());
907 }
908 }
909 }
910
911
912 }
913