View Javadoc

1   /*
2    * Copyright  1999-2004 The Apache Software Foundation.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  package org.apache.sandesha.client;
18  
19  import org.apache.axis.components.logger.LogFactory;
20  import org.apache.axis.message.addressing.RelatesTo;
21  import org.apache.commons.logging.Log;
22  import org.apache.sandesha.Constants;
23  import org.apache.sandesha.IStorageManager;
24  import org.apache.sandesha.RMMessageContext;
25  import org.apache.sandesha.storage.Callback;
26  import org.apache.sandesha.storage.CallbackData;
27  import org.apache.sandesha.storage.dao.ISandeshaDAO;
28  import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
29  import org.apache.sandesha.ws.rm.RMHeaders;
30  
31  import java.util.HashMap;
32  import java.util.Iterator;
33  import java.util.Map;
34  import java.util.Set;
35  
36  /***
37   * This is the storage manager for Client side in Sandesha
38   * Provides the access points for the SandeshaQueue.
39   *
40   * @author Chamikara Jayalath
41   * @author Jaliya Ekanayake
42   */
43  public class ClientStorageManager implements IStorageManager {
44  
45      protected static Log log = LogFactory.getLog(ClientStorageManager.class.getName());
46  
47      private ISandeshaDAO accessor;
48      private static Callback callBack;
49  
50      public void init() {
51      }
52  
53      public ClientStorageManager() {
54          accessor = SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR,
55                  Constants.CLIENT);
56      }
57  
58      public boolean isSequenceExist(String sequenceID) {
59          return accessor.isOutgoingSequenceExists(sequenceID);
60      }
61  
62      public boolean isResponseSequenceExist(String sequenceID) {
63          return accessor.isIncomingSequenceExists(sequenceID);
64      }
65  
66      public Object getNextSeqToProcess() {
67          return null;
68      }
69  
70      public RMMessageContext getNextMessageToProcess(Object seq) {
71          return null;
72      }
73  
74      public void setAcknowledged(String seqID, long msgNumber) {
75          accessor.markOutgoingMessageToDelete(seqID, new Long(msgNumber));
76  
77      }
78  
79      public void addSequence(String sequenceID) {
80          boolean result = accessor.addOutgoingSequence(sequenceID);
81          if (!result)
82              log.error("Sequence was not created correctly in the in the queue");
83      }
84  
85      /***
86       * This will be used both by the Sender and the SimpleAxisServer to set the
87       * create sequence responses.
88       */
89      public void addCreateSequenceResponse(RMMessageContext rmMessageContext) {
90          addPriorityMessage(rmMessageContext);
91      }
92  
93      /***
94       * This will be used by the RMSender to add the create sequence request.
95       */
96      public void addCreateSequenceRequest(RMMessageContext rmMessageContext) {
97          addPriorityMessage(rmMessageContext);
98      }
99  
100     /***
101      * SimpleAxisServer will use this method to add acks for the application
102      * responses received from the server side.
103      */
104     public void addAcknowledgement(RMMessageContext rmMessageContext) {
105         String sequenceID = rmMessageContext.getSequenceID();
106         if (sequenceID != null)
107             accessor.removeAllAcks(sequenceID);
108 
109         addPriorityMessage(rmMessageContext);
110     }
111 
112     //private method
113     private void addPriorityMessage(RMMessageContext msg) {
114         accessor.addPriorityMessage(msg);
115     }
116 
117     /***
118      * Check the existance of a message.
119      */
120     public boolean isMessageExist(String sequenceID, long messageNumber) {
121         return accessor.isIncomingMessageExists(sequenceID, new Long(messageNumber));
122     }
123 
124     /***
125      * Get a Map of messages.
126      */
127     public Map getListOfMessageNumbers(String sequenceID) {
128         String seq = sequenceID;
129         Set st = accessor.getAllReceivedMsgNumsOfIncomingSeq(seq);
130         Iterator it = st.iterator();
131         //To find the largest id present
132         long largest = 0;
133         while (it.hasNext()) {
134             Long key = (Long) it.next();
135             if (null == key)
136                 continue;
137 
138             long l = key.longValue();
139             if (l > largest)
140                 largest = l;
141         }
142 
143         HashMap results = new HashMap();
144         //Add Keys to the results in order.
145         long currentPosition = 1;
146         for (long l = 1; l <= largest; l++) {
147             boolean present = st.contains(new Long(l));
148             if (present) {
149                 results.put(new Long(currentPosition), new Long(l));
150                 currentPosition++;
151             }
152         }
153         return results;
154     }
155 
156     /***
157      * This will be used by the sender.
158      */
159     public synchronized RMMessageContext getNextMessageToSend() {
160         RMMessageContext msg;
161         msg = accessor.getNextPriorityMessageContextToSend();
162         if (msg == null)
163             msg = accessor.getNextOutgoingMsgContextToSend();
164 
165         if (null == msg) {
166             msg = accessor.getNextLowPriorityMessageContextToSend();
167 
168             // checks whether all the request messages have been acked
169         }
170         if (null != callBack && null != msg)
171             informOutgoingMessage(msg);
172 
173         if (msg != null && !msg.isLocked()) {
174             msg.setLocked(true);
175             return msg;
176         } else {
177             return null;
178         }
179     }
180 
181     /***
182      * This will be used by the RMSender when adding messages to the Queue.
183      * RMSender will also add a createSequenceRequest message to the prioriy
184      * queue using this temporary ID as the messageID.
185      */
186     public void setTemporaryOutSequence(String sequenceId, String outSequenceId) {
187         synchronized (this) {
188             accessor.setOutSequence(sequenceId, outSequenceId);
189             accessor.setOutSequenceApproved(sequenceId, false);
190         }
191     }
192 
193     /***
194      * This will be used by the Client Listener and the Sender to set the
195      * proper sequenceID
196      */
197     public boolean setApprovedOutSequence(String oldSeqId, String newSeqId) {
198         if (oldSeqId == null) {
199             return false;
200         }
201         String sequenceID = accessor.getSequenceOfOutSequence(oldSeqId);
202         if (null == sequenceID) {
203             log.error(Constants.ErrorMessages.SET_APPROVED_OUT_SEQ);
204             return false;
205         }
206         accessor.setOutSequence(sequenceID, newSeqId);
207         accessor.setOutSequenceApproved(sequenceID, true);
208         accessor.removeCreateSequenceMsg(oldSeqId);
209         return true;
210 
211     }
212 
213     /***
214      * This will be used by the RMSender when adding messages. Initially it
215      * should return 1.
216      */
217     public long getNextMessageNumber(String sequenceID) {
218         long msgNo = accessor.getNextOutgoingMessageNumber(sequenceID);
219         return msgNo;
220     }
221 
222     public void insertOutgoingMessage(RMMessageContext msg) {
223         String sequenceId = msg.getSequenceID();
224         accessor.addMessageToOutgoingSequence(sequenceId, msg);
225     }
226 
227     public void insertIncomingMessage(RMMessageContext rmMessageContext) {
228         RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
229         RelatesTo relatesTo = (RelatesTo) rmMessageContext.getAddressingHeaders().getRelatesTo()
230                 .get(0);
231         String messageId = relatesTo.getURI().toString();
232         String sequenceId = null;
233 
234         sequenceId = accessor.searchForSequenceId(messageId);
235 
236         boolean exists = accessor.isIncomingSequenceExists(sequenceId);
237 
238         if (!exists) {
239             accessor.addIncomingSequence(sequenceId);
240         }
241 
242         long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber();
243         if (messageNumber <= 0)
244             return;
245         Long msgNo = new Long(messageNumber);
246         accessor.addMessageToIncomingSequence(sequenceId, msgNo, rmMessageContext);
247         accessor.updateFinalMessageArrivedTime(sequenceId);
248     }
249 
250     public RMMessageContext checkForResponseMessage(String sequenceId, String requestMsgId) {
251         RMMessageContext response = accessor.checkForResponseMessage(requestMsgId, sequenceId);
252         return response;
253 
254     }
255 
256      public void insertTerminateSeqMessage(RMMessageContext terminateSeqMessage) {
257         accessor.addLowPriorityMessage(terminateSeqMessage);
258     }
259 
260     public void setAckReceived(String seqId, long msgNo) {
261         accessor.setAckReceived(seqId, msgNo);
262     }
263 
264     public void insertFault(RMMessageContext rmMsgCtx) {
265 
266     }
267 
268     public void addSendMsgNo(String seqId, long msgNo) {
269         accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId), msgNo);
270     }
271 
272     public void addOutgoingSequence(String sequenceId) {
273         accessor.addOutgoingSequence(sequenceId);
274     }
275 
276     public void addIncomingSequence(String sequenceId) {
277         accessor.addIncomingSequence(sequenceId);
278     }
279 
280     public long getLastIncomingMsgNo(String seqId) {
281         String key = accessor.getKeyFromIncomingSequenceId(seqId);
282         return accessor.getLastIncomingMsgNo(key);
283     }
284 
285     public boolean hasLastIncomingMsgReceived(String seqId) {
286         String key = accessor.getKeyFromIncomingSequenceId(seqId);
287         return accessor.hasLastIncomingMsgReceived(key);
288     }
289 
290     public void addRequestedSequence(String seqId) {
291         accessor.addRequestedSequence(seqId);
292     }
293 
294     public boolean isRequestedSeqPresent(String seqId) {
295         return accessor.isRequestedSeqPresent(seqId);
296     }
297 
298     public boolean isSentMsg(String seqId, long msgNo) {
299         return accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId), msgNo);
300     }
301 
302     public String getOutgoingSeqOfMsg(String msgId) {
303         return accessor.searchForSequenceId(msgId);
304     }
305 
306     public String getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg) {
307         //String msgId = msg.getMessageID();
308         RelatesTo relatesTo = (RelatesTo) msg.getAddressingHeaders().getRelatesTo().get(0);
309         String msgId = relatesTo.getURI().toString();
310         return accessor.searchForSequenceId(msgId);
311     }
312 
313     public void setTerminateSend(String seqId) {
314         accessor.setTerminateSend(seqId);
315     }
316 
317     public void setTerminateReceived(String seqId) {
318         accessor.setTerminateReceived(seqId);
319     }
320 
321     public String getKeyFromOutgoingSeqId(String seqId) {
322         return accessor.getKeyFromOutgoingSequenceId(seqId);
323     }
324 
325     public void setAcksTo(String seqId, String acksTo) {
326         accessor.setAcksTo(seqId, acksTo);
327     }
328 
329     public String getAcksTo(String seqId) {
330         return accessor.getAcksTo(seqId);
331     }
332 
333     public void addOffer(String msgID, String offerID) {
334         accessor.addOffer(msgID, offerID);
335     }
336 
337     public String getOffer(String msgID) {
338         return accessor.getOffer(msgID);
339     }
340 
341     public void setCallback(Callback cb) {
342         callBack = cb;
343     }
344 
345     public void removeCallback() {
346         callBack = null;
347     }
348 
349     private void informOutgoingMessage(RMMessageContext rmMsgContext) {
350 
351         CallbackData cbData = new CallbackData();
352 
353         //  setting callback data;
354         if (null != rmMsgContext) {
355             cbData.setSequenceId(rmMsgContext.getSequenceID());
356             cbData.setMessageId(rmMsgContext.getMessageID());
357             cbData.setMessageType(rmMsgContext.getMessageType());
358         }
359 
360         if (null != callBack)
361             callBack.onOutgoingMessage(cbData);
362     }
363 
364     public void clearStorage() {
365         accessor.clear();
366     }
367 
368     public boolean isSequenceComplete(String seqId) {
369         boolean outTerminateSent = accessor.isOutgoingTerminateSent(seqId);
370         boolean incomingTerminateReceived = accessor.isIncommingTerminateReceived(seqId);
371         return outTerminateSent && incomingTerminateReceived;
372     }
373 
374     public void sendAck(String sequenceId) {
375         String keyId = accessor.getKeyFromIncomingSequenceId(sequenceId);
376         accessor.sendAck(keyId);
377     }
378 
379 
380 }