View Javadoc

1   /*
2    * Copyright  1999-2004 The Apache Software Foundation.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  
18  package org.apache.sandesha.storage.queue;
19  
20  import org.apache.axis.components.logger.LogFactory;
21  import org.apache.axis.components.uuid.UUIDGen;
22  import org.apache.axis.components.uuid.UUIDGenFactory;
23  import org.apache.commons.logging.Log;
24  import org.apache.sandesha.Constants;
25  import org.apache.sandesha.RMMessageContext;
26  import org.apache.sandesha.util.PolicyLoader;
27  
28  import java.util.*;
29  
30  
31  /*
32   * Created on Aug 4, 2004 at 4:49:49 PM
33   */
34  
35  /***
36   * @author Chamikara Jayalath
37   * @author Jaliya Ekanayaka
38   */
39  
40  public class SandeshaQueue {
41  
42      private static SandeshaQueue clientQueue = null;
43      private static SandeshaQueue serverQueue = null;
44      HashMap incomingMap; //In comming messages.
45      HashMap outgoingMap; //Response messages
46      ArrayList highPriorityQueue; // Acks and create seq. responses.
47      HashMap queueBin; // Messaged processed from out queue will be moved
48      ArrayList lowPriorityQueue;
49      private List requestedSequences;
50      HashMap acksToMap;
51      HashMap offerMap;
52      private static final Log log = LogFactory.getLog(SandeshaQueue.class.getName());
53  
54      public static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
55  
56      private SandeshaQueue() {
57          incomingMap = new HashMap();
58          outgoingMap = new HashMap();
59          highPriorityQueue = new ArrayList();
60          queueBin = new HashMap();
61          lowPriorityQueue = new ArrayList();
62          requestedSequences = new ArrayList();
63          acksToMap = new HashMap();
64          offerMap = new HashMap();
65      }
66  
67      public static SandeshaQueue getInstance(byte endPoint) {
68          if (endPoint == Constants.CLIENT) {
69              if (clientQueue == null) {
70                  clientQueue = new SandeshaQueue();
71              }
72              return clientQueue;
73          } else {
74              if (serverQueue == null) {
75                  serverQueue = new SandeshaQueue();
76              }
77              return serverQueue;
78          }
79  
80      }
81  
82      public boolean addMessageToIncomingSequence(String seqId, Long messageNo,
83                                                  RMMessageContext msgCon) throws QueueException {
84          boolean successful = false;
85  
86          if (seqId == null || msgCon == null)
87              throw new QueueException(Constants.Queue.ADD_ERROR);
88  
89          if (isIncomingSequenceExists(seqId)) {
90              IncomingSequence seqHash = (IncomingSequence) incomingMap.get(seqId);
91  
92              synchronized (seqHash) {
93                  if (seqHash == null)
94                      throw new QueueException(Constants.Queue.QUEUE_INCONSIS);
95  
96                  if (seqHash.hasMessage(messageNo))
97                      throw new QueueException(Constants.Queue.MESSAGE_EXISTS);
98  
99                  if (msgCon.isLastMessage())
100                     seqHash.setLastMsg(msgCon.getMsgNumber());
101 
102                 seqHash.setSequenceId(msgCon.getSequenceID());
103                 seqHash.putNewMessage(messageNo, msgCon);
104                 successful = true;
105             }
106         }
107 
108         return successful;
109     }
110 
111     public boolean addMessageToOutgoingSequence(String seqId, RMMessageContext msgCon)
112             throws QueueException {
113         boolean successful = false;
114 
115         if (seqId == null || msgCon == null)
116             throw new QueueException(Constants.Queue.ADD_ERROR);
117 
118         if (isOutgoingSequenceExists(seqId)) {
119             OutgoingSequence resSeqHash = (OutgoingSequence) outgoingMap.get(seqId);
120 
121             synchronized (resSeqHash) {
122                 if (resSeqHash == null)
123                     throw new QueueException(Constants.Queue.QUEUE_INCONSIS);
124                 resSeqHash.putNewMessage(msgCon);
125                 successful = true;
126 
127                 //if last message
128                 if (msgCon.isLastMessage())
129                     resSeqHash.setLastMsg(msgCon.getMsgNumber());
130 
131                 if (msgCon.isHasResponse())
132                     resSeqHash.setHasResponse(true);
133             }
134         }
135         return successful;
136     }
137 
138     public boolean isIncomingSequenceExists(String seqId) {
139         synchronized (incomingMap) {
140             return incomingMap.containsKey(seqId);
141         }
142     }
143 
144     public synchronized boolean isOutgoingSequenceExists(String resSeqId) {
145         synchronized (outgoingMap) {
146             return outgoingMap.containsKey(resSeqId);
147         }
148     }
149 
150     public RMMessageContext nextIncomingMessageToProcess(Object sequence) throws QueueException {
151         if (sequence == null)
152             return null;
153 
154         AbstractSequence absSeq = (AbstractSequence) sequence;
155 
156         IncomingSequence sh = (IncomingSequence) incomingMap.get(absSeq.getSequenceId());
157         synchronized (sh) {
158             if (sh == null)
159                 throw new QueueException(Constants.Queue.SEQUENCE_ABSENT);
160 
161             if (!sh.hasProcessableMessages())
162                 return null;
163 
164             RMMessageContext msgCon = sh.getNextMessageToProcess();
165             return msgCon;
166         }
167     }
168 
169     public RMMessageContext nextOutgoingMessageToSend() throws QueueException {
170         RMMessageContext msg = null;
171         synchronized (outgoingMap) {
172             Iterator it = outgoingMap.keySet().iterator();
173 
174             whileLoop: while (it.hasNext()) {
175                 RMMessageContext tempMsg;
176                 String tempKey = (String) it.next();
177                 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(tempKey);
178                 if (rsh.isOutSeqApproved()) {
179                     tempMsg = rsh.getNextMessageToSend();
180                     if (tempMsg != null) {
181                         msg = tempMsg;
182                         msg.setSequenceID(rsh.getOutSequenceId());
183                         msg.setOldSequenceID(rsh.getSequenceId());
184                         break whileLoop;
185                     }
186                 }
187             }
188         }
189         return msg;
190     }
191 
192     public void createNewIncomingSequence(String sequenceId) throws QueueException {
193         if (sequenceId == null)
194             throw new QueueException(Constants.Queue.SEQUENCE_ID_NULL);
195 
196         synchronized (incomingMap) {
197             IncomingSequence sh = new IncomingSequence(sequenceId);
198             incomingMap.put(sequenceId, sh);
199 
200         }
201     }
202 
203     public void createNewOutgoingSequence(String sequenceId) throws QueueException {
204         if (sequenceId == null)
205             throw new QueueException(Constants.Queue.SEQUENCE_ID_NULL);
206 
207         synchronized (outgoingMap) {
208             OutgoingSequence rsh = new OutgoingSequence(sequenceId);
209             outgoingMap.put(sequenceId, rsh);
210         }
211 
212     }
213 
214     /***
215      * Adds a new message to the responses queue.
216      */
217     public void addPriorityMessage(RMMessageContext msg) throws QueueException {
218         synchronized (highPriorityQueue) {
219             if (msg == null)
220                 throw new QueueException(Constants.Queue.MESSAGE_ID_NULL);
221 
222             highPriorityQueue.add(msg);
223         }
224     }
225 
226     public void addLowPriorityMessage(RMMessageContext msg) throws QueueException {
227         synchronized (lowPriorityQueue) {
228             if (msg == null)
229                 throw new QueueException(Constants.Queue.MESSAGE_ID_NULL);
230             lowPriorityQueue.add(msg);
231         }
232     }
233 
234 
235     public RMMessageContext nextPriorityMessageToSend() throws QueueException {
236 
237         synchronized (highPriorityQueue) {
238 
239 
240             if (highPriorityQueue.size() <= 0)
241                 return null;
242 
243             RMMessageContext msg = null;
244             int size = highPriorityQueue.size();
245             synchronized (highPriorityQueue) {
246                 forLoop: //Label
247                 for (int i = 0; i < size; i++) {
248                     RMMessageContext tempMsg = (RMMessageContext) highPriorityQueue.get(i);
249                     if (tempMsg != null) {
250                         switch (tempMsg.getMessageType()) {
251                             //Create seq messages will not be removed.
252                             case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
253                                 long lastSentTime = tempMsg.getLastSentTime();
254                                 Date d = new Date();
255                                 long currentTime = d.getTime();
256                                 if (currentTime >=
257                                         lastSentTime + Constants.RETRANSMISSION_INTERVAL) {
258 
259                                     String newCreateSeqId = Constants.UUID + uuidGen.nextUUID();
260                                     tempMsg.setMessageID(newCreateSeqId);
261 
262                                     tempMsg.setLastSentTime(currentTime);
263                                     msg = tempMsg;
264                                     break forLoop;
265 
266 
267                                 }
268                                 break;
269                             case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
270                                 
271                                 //acks are send in the folowing manner.
272                                 //If a ack the system has asked to send a ack (sequence.sendAck==true)
273                                 //then send it immediately.
274                                 //Also send a ack when a interval (ACKNOWLEDGEMENT_INTERVAL) has passed
275                                 //since last message arrived.
276                                 
277                                 String sequenceId = tempMsg.getSequenceID();
278                                 if (sequenceId == null)
279                                     continue;
280 
281                                 String key = getKeyFromIncomingSequenceId(sequenceId);
282                                 IncomingSequence sequence = (IncomingSequence) incomingMap.get(key);
283                                 if (sequence == null)
284                                     continue;
285 
286                                 d = new Date();
287                                 currentTime = d.getTime();
288 
289                                 if (sequence.isSendAck()) {
290 
291                                     tempMsg.setLastSentTime(currentTime);
292                                     msg = tempMsg;
293                                     sequence.setSendAck(false);
294                                     sequence.setFinalAckedTime(currentTime);
295                                     break forLoop;
296 
297                                 } else {
298                                     long ackInterval = PolicyLoader.getInstance()
299                                             .getAcknowledgementInterval();
300                                     long finalAckedTime = sequence.getFinalAckedTime();
301                                     long finalMsgArrivedTime = sequence.getFinalMsgArrivedTime();
302 
303                                     if ((finalMsgArrivedTime > finalAckedTime) &&
304                                             (currentTime > finalMsgArrivedTime + ackInterval))
305                                         sequence.setSendAck(true);
306                                 }
307 
308                                 break;
309                             default:
310                                 highPriorityQueue.remove(i);
311                                 queueBin.put(tempMsg.getMessageID(), tempMsg);
312                                 msg = tempMsg;
313                                 break forLoop;
314                         }
315                     }
316                 }
317             }
318 
319 
320             return msg;
321 
322         }
323     }
324 
325     public List nextAllSeqsToProcess() {
326         List seqs = new ArrayList();
327 
328         synchronized (incomingMap) {
329             Iterator it = incomingMap.keySet().iterator();
330 
331             while (it.hasNext()) {
332                 Object tempKey = it.next();
333                 IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey);
334                 if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
335                     seqs.add(sh);
336             }
337             return seqs;
338         }
339     }
340 
341     public List nextAllSeqIdsToProcess() {
342         List ids = new ArrayList();
343 
344         synchronized (incomingMap) {
345             Iterator it = incomingMap.keySet().iterator();
346 
347             while (it.hasNext()) {
348                 Object tempKey = it.next();
349                 IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey);
350                 if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
351                     ids.add(sh.getSequenceId());
352             }
353             return ids;
354         }
355     }
356 
357     public void clear(boolean yes) {
358         if (!yes)
359             return;
360         incomingMap.clear();
361         highPriorityQueue.clear();
362         outgoingMap.clear();
363         queueBin.clear();
364     }
365 
366     // --Commented out by Inspection START (6/8/05 1:19 PM):
367     //    public void removeIncomingSequence(String sequenceId, boolean yes) {
368     //        if (!yes)
369     //            return;
370     //        incomingMap.remove(sequenceId);
371     //    }
372     // --Commented out by Inspection STOP (6/8/05 1:19 PM)
373 
374     public void setSequenceLock(String sequenceId, boolean lock) {
375         IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
376         sh.setProcessLock(lock);
377     }
378 
379     public Set getAllReceivedMsgNumsOfIncomingSeq(String sequenceId) {
380         IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
381         if (sh != null)
382             return sh.getAllKeys();
383         else
384             return null;
385     }
386 
387     public boolean isIncomingMessageExists(String sequenceId, Long messageNo) {
388         IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
389         //sh can be null if there are no messages at the initial point.
390         if (sh != null)
391             return sh.hasMessage(messageNo);
392         else
393             return false;
394     }
395 
396     public void setOutSequence(String seqId, String outSeqId) {
397         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
398         synchronized (rsh) {
399             if (rsh == null) {
400                 if (log.isDebugEnabled())
401                     log.debug("ERROR: RESPONSE SEQ IS NULL");
402                 return;
403             }
404             rsh.setOutSequenceId(outSeqId);
405         }
406     }
407 
408     public void setOutSequenceApproved(String seqId, boolean approved) {
409         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
410         synchronized (rsh) {
411             if (rsh == null) {
412                 if (log.isDebugEnabled())
413                     log.debug("ERROR: RESPONSE SEQ IS NULL");
414                 return;
415             }
416             rsh.setOutSeqApproved(approved);
417         }
418     }
419 
420     public String getSequenceOfOutSequence(String outSequence) {
421         synchronized (outgoingMap) {
422             if (outSequence == null) {
423                 return null;
424             }
425             String tempSeqId = null;
426             Iterator it = outgoingMap.keySet().iterator();
427             while (it.hasNext()) {
428                 tempSeqId = (String) it.next();
429                 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(tempSeqId);
430                 String tempOutSequence = rsh.getOutSequenceId();
431                 if (outSequence.equals(tempOutSequence)) {
432                     break;
433                 }
434             }
435             return tempSeqId;
436         }
437 
438     }
439 
440     public void displayOutgoingMap() {
441         Iterator it = outgoingMap.keySet().iterator();
442         System.out.println("------------------------------------");
443         System.out.println("      DISPLAYING RESPONSE MAP");
444         System.out.println("------------------------------------");
445         while (it.hasNext()) {
446             String s = (String) it.next();
447             System.out.println("\n Sequence id - " + s);
448             OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(s);
449 
450             System.out.println("out seq id:" + rsh.getOutSequenceId());
451             Iterator it1 = rsh.getAllKeys().iterator();
452             while (it1.hasNext()) {
453                 Long l = (Long) it1.next();
454                 String msgId = rsh.getMessageId(l);
455                 System.out.println("* key -" + l.longValue() + "- MessageID -" + msgId + "-");
456             }
457         }
458         System.out.println("\n");
459     }
460 
461     public void displayIncomingMap() {
462         Iterator it = incomingMap.keySet().iterator();
463         System.out.println("------------------------------------");
464         System.out.println("       DISPLAYING SEQUENCE MAP");
465         System.out.println("------------------------------------");
466         while (it.hasNext()) {
467             String s = (String) it.next();
468             System.out.println("\n Sequence id - " + s);
469 
470             IncomingSequence sh = (IncomingSequence) incomingMap.get(s);
471 
472             Iterator it1 = sh.getAllKeys().iterator();
473             while (it1.hasNext()) {
474                 Long l = (Long) it1.next();
475                 String msgId = sh.getMessageId(l);
476                 System.out.println("* key -" + l.longValue() + "- MessageID -" + msgId + "-");
477             }
478         }
479         System.out.println("\n");
480     }
481 
482     public void displayPriorityQueue() {
483 
484         System.out.println("------------------------------------");
485         System.out.println("       DISPLAYING PRIORITY QUEUE");
486         System.out.println("------------------------------------");
487 
488         Iterator it = highPriorityQueue.iterator();
489         while (it.hasNext()) {
490             RMMessageContext msg = (RMMessageContext) it.next();
491             String id = msg.getMessageID();
492             int type = msg.getMessageType();
493 
494             System.out.println("Message " + id + "  Type " + type);
495         }
496         System.out.println("\n");
497     }
498 
499     public void markOutgoingMessageToDelete(String sequenceId, Long messageNo) {
500         String sequence = getSequenceOfOutSequence(sequenceId);
501         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(sequence);
502 
503         if (rsh == null) {
504             log.error(Constants.Queue.RESPONSE_SEQ_NULL);
505             return;
506         }
507 
508         synchronized (rsh) {
509             //Deleting retuns the deleted message.
510             rsh.markMessageDeleted(messageNo);
511             //If we jave already deleted then no message to return.
512         }
513 
514     }
515 
516     public void movePriorityMsgToBin(String messageId) {
517 
518         synchronized (highPriorityQueue) {
519             int size = highPriorityQueue.size();
520             for (int i = 0; i < size; i++) {
521                 RMMessageContext msg = (RMMessageContext) highPriorityQueue.get(i);
522 
523                 String tempMsgId;
524                 try {
525                     tempMsgId = (String) msg.getMessageIdList().get(0);
526                 } catch (Exception ex) {
527                     tempMsgId = msg.getMessageID();
528                 }
529                 if (tempMsgId.equals(messageId)) {
530                     highPriorityQueue.remove(i);
531                     queueBin.put(messageId, msg);
532                     return;
533                 }
534             }
535         }
536     }
537 
538     public long getNextOutgoingMessageNumber(String seq) {
539         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seq);
540         if (rsh == null) { //saquence not created yet.
541             try {
542                 createNewOutgoingSequence(seq);
543             } catch (QueueException q) {
544                 log.error(q.getStackTrace());
545             }
546         }
547         rsh = (OutgoingSequence) outgoingMap.get(seq);
548         synchronized (rsh) {
549             Iterator keys = rsh.getAllKeys().iterator();
550 
551             long msgNo = rsh.nextMessageNumber();
552             return (msgNo);
553         }
554     }
555 
556     public synchronized RMMessageContext checkForResponseMessage(String requestId, String seqId) {
557         IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId);
558         if (sh == null) {
559             return null;
560         }
561         synchronized (sh) {
562             RMMessageContext msg = sh.getMessageRelatingTo(requestId);
563             return msg;
564         }
565     }
566 
567     public String searchForSequenceId(String messageId) {
568         Iterator it = outgoingMap.keySet().iterator();
569 
570         String key = null;
571         while (it.hasNext()) {
572             key = (String) it.next();
573             Object obj = outgoingMap.get(key);
574             if (obj != null) {
575                 OutgoingSequence hash = (OutgoingSequence) obj;
576                 boolean hasMsg = hash.hasMessageWithId(messageId);
577 
578                 if (!hasMsg)
579                     key = null;
580                 else
581                     break;
582 
583             }
584 
585         }
586 
587         return key;
588     }
589 
590     public void setAckReceived(String seqId, long msgNo) {
591         Iterator it = outgoingMap.keySet().iterator();
592         String key = null;
593         while (it.hasNext()) {
594             key = (String) it.next();
595             Object obj = outgoingMap.get(key);
596 
597             if (obj != null) {
598                 OutgoingSequence hash = (OutgoingSequence) obj;
599                 if (hash.getOutSequenceId().equals(seqId)) {
600                     hash.setAckReceived(msgNo);
601                 }
602             }
603         }
604 
605     }
606 
607     public RMMessageContext getLowPriorityMessageIfAcked() {
608         synchronized (lowPriorityQueue) {
609             int size = lowPriorityQueue.size();
610             RMMessageContext terminateMsg = null;
611             for (int i = 0; i < size; i++) {
612 
613                 RMMessageContext temp;
614                 temp = (RMMessageContext) lowPriorityQueue.get(i);
615                 String seqId = temp.getSequenceID();
616                 OutgoingSequence hash = null;
617                 hash = (OutgoingSequence) outgoingMap.get(seqId);
618                 if (hash == null) {
619                     log.error("ERROR: HASH NOT FOUND SEQ ID " + seqId);
620                 }
621                 if (hash != null) {
622                     boolean complete = hash.isAckComplete();
623                     if (complete)
624                         terminateMsg = temp;
625                     if (terminateMsg != null) {
626                         terminateMsg.setSequenceID(hash.getOutSequenceId());
627                         lowPriorityQueue.remove(i);
628                         break;
629                     }
630                 }
631             }
632             return terminateMsg;
633         }
634 
635     }
636 
637     public void addSendMsgNo(String seqId, long msgNo) {
638         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
639         if (rsh != null) {
640 
641             synchronized (rsh) {
642                 rsh.addMsgToSendList(msgNo);
643             }
644         }
645     }
646 
647     public boolean isSentMsg(String seqId, long msgNo) {
648         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
649 
650         if (rsh == null) {
651             return false;
652         }
653         synchronized (rsh) {
654             return rsh.isMsgInSentList(msgNo);
655         }
656 
657 
658     }
659 
660     public boolean hasLastIncomingMsgReceived(String seqId) {
661 
662         IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId);
663 
664         if (sh == null) {
665             return false;
666         }
667         synchronized (sh) {
668             return sh.hasLastMsgReceived();
669         }
670     }
671 
672     public long getLastIncomingMsgNo(String seqId) {
673         IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId);
674         if (sh == null) {
675             return 0;
676         }
677         synchronized (sh) {
678             return sh.getLastMsgNumber();
679         }
680     }
681 
682     public void addRequestedSequence(String seqId) {
683         requestedSequences.add(seqId);
684     }
685 
686     public boolean isRequestedSeqPresent(String seqId) {
687         return requestedSequences.contains(seqId);
688     }
689 
690     public String getKeyFromIncomingSequenceId(String seqId) {
691         synchronized (incomingMap) {
692             Iterator it = incomingMap.keySet().iterator();
693             while (it.hasNext()) {
694                 String key = (String) it.next();
695                 IncomingSequence is = (IncomingSequence) incomingMap.get(key);
696                 String seq = is.getSequenceId();
697                 if (seq == null)
698                     continue;
699 
700                 if (seq.equals(seqId))
701                     return key;
702             }
703             return null;
704         }
705     }
706 
707     /*public String getKeyFromOutgoingSequenceId(String seqId) {
708 
709         synchronized (outgoingMap) {
710             System.out.println(" getKeyFromOutgoingSequenceId Received "+seqId);
711             String key = null;
712             Iterator it = outgoingMap.keySet().iterator();
713 
714             while (it.hasNext()) {
715                 key = (String) it.next();
716                 OutgoingSequence os = (OutgoingSequence) outgoingMap.get(key);
717 
718                 String seq = os.getSequenceId();
719                 if (seq == null)
720                     continue;
721 
722                 if (seq.equals(seqId)) {
723                      System.out.println(" getKeyFromOutgoingSequenceId Found "+key);
724                     return key;
725 
726                 }
727             }
728             System.out.println(" getKeyFromOutgoingSequenceId Found "+key);
729             return key;
730         }
731 
732 
733     }*/
734 
735     public String getKeyFromOutgoingSequenceId(String seqId) {
736         synchronized (outgoingMap) {
737             Iterator it = outgoingMap.keySet().iterator();
738             while (it.hasNext()) {
739                 String key = (String) it.next();
740                 OutgoingSequence is = (OutgoingSequence) outgoingMap.get(key);
741                 String seq = is.getOutSequenceId();
742                 if (seq == null)
743                     continue;
744 
745                 if (seq.equals(seqId))
746                     return key;
747             }
748             return null;
749         }
750     }
751 
752     public boolean isAllOutgoingTerminateSent() {
753         synchronized (outgoingMap) {
754             Iterator keys = outgoingMap.keySet().iterator();
755             boolean found = false;
756 
757             while (keys.hasNext()) {
758                 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(keys.next());
759                 if (ogs.isTerminateSent()) {
760                     found = true;
761                     break;
762                 }
763             }
764 
765             return found;
766         }
767     }
768 
769     public boolean isAllIncommingTerminateReceived() {
770         synchronized (incomingMap) {
771             Iterator keys = incomingMap.keySet().iterator();
772 
773             while (keys.hasNext()) {
774                 Object key = keys.next();
775                 IncomingSequence ics = (IncomingSequence) incomingMap.get(key);
776                 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(key);
777 
778                 boolean hasResponse = ogs.hasResponse();
779 
780                 if (hasResponse && !ics.isTerminateReceived())
781                     return false;
782             }
783 
784             return true;
785         }
786     }
787 
788     public void setTerminateSend(String seqId) {
789         synchronized (outgoingMap) {
790             OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
791             ogs.setTerminateSent(true);
792         }
793     }
794 
795     public void setTerminateReceived(String seqId) {
796         IncomingSequence ics = (IncomingSequence) incomingMap.get(getKeyFromIncomingSequenceId(seqId));
797         ics.setTerminateReceived(true);
798     }
799 
800     public void setAcksTo(String seqId, String acksTo) {
801 
802         if (seqId == null) {
803             log.error("ERROR: seq is null in setAcksTo");
804             return;
805         }
806 
807         acksToMap.put(seqId, acksTo);
808     }
809 
810     public String getAcksTo(String seqId) {
811 
812         if (seqId == null) {
813             log.error("ERROR: seq is null in getAcksTo");
814             return null;
815         }
816 
817         return (String) acksToMap.get(seqId);
818     }
819 
820 
821     public void addOffer(String msgID, String offerID) {
822         if (msgID == null) {
823             log.error(" MessageID is null in addOffer");
824         }
825         offerMap.put(msgID, offerID);
826     }
827 
828     public String getOffer(String msgID) {
829         if (msgID == null) {
830             log.error(" MessageID is null in getOffer");
831             return null;
832         }
833         return (String) offerMap.get(msgID);
834     }
835 
836     public boolean isOutgoingTerminateSent(String seqId) {
837         synchronized (outgoingMap) {
838             OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
839             if (ogs != null) {
840                 if (ogs.isTerminateSent())
841                     return true;
842                 else
843                     return false;
844             }
845             return false;
846         }
847 
848     }
849 
850     public boolean isIncommingTerminateReceived(String seqId) {
851         synchronized (incomingMap) {
852 
853             IncomingSequence ics = (IncomingSequence) incomingMap.get(seqId);
854             OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
855 
856             boolean hasResponse = false;
857             if (ogs != null) {
858                 hasResponse = ogs.hasResponse();
859             }
860 
861             if (hasResponse && ics != null && !ics.isTerminateReceived())
862                 return false;
863             else
864                 return true;
865         }
866 
867     }
868 
869     public void updateFinalMessageArrivedTime(String sequenceId) {
870         synchronized (incomingMap) {
871             IncomingSequence ics = (IncomingSequence) incomingMap.get(sequenceId);
872             if (ics == null)
873                 return;
874 
875             Date d = new Date();
876             long time = d.getTime();
877             ics.setFinalMsgArrivedTime(time);
878         }
879     }
880 
881     public void sendAck(String sequenceId) {
882         synchronized (incomingMap) {
883             IncomingSequence ics = (IncomingSequence) incomingMap.get(sequenceId);
884             if (ics == null)
885                 return;
886 
887             ics.setSendAck(true);
888         }
889     }
890 
891     public void removeAllAcks(String sequenceID) {
892         synchronized (highPriorityQueue) {
893             int size = highPriorityQueue.size();
894 
895             ArrayList remLst = new ArrayList();
896 
897             for (int i = 0; i < size; i++) {
898                 RMMessageContext msg = (RMMessageContext) highPriorityQueue.get(i);
899                 if (msg.getSequenceID() != null)
900                     if (msg.getSequenceID().equals(sequenceID) && msg.getMessageType() == Constants.MSG_TYPE_ACKNOWLEDGEMENT)
901                         remLst.add(new Integer(i));
902             }
903 
904             for (int i = 0; i < remLst.size(); i++) {
905                 Integer in = (Integer) remLst.get(i);
906                 highPriorityQueue.remove(in.intValue());
907             }
908         }
909     }
910 
911 
912 }
913