View Javadoc

1   /*
2    * Copyright  1999-2004 The Apache Software Foundation.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  package org.apache.sandesha.server;
18  
19  import org.apache.axis.components.logger.LogFactory;
20  import org.apache.commons.logging.Log;
21  import org.apache.sandesha.Constants;
22  import org.apache.sandesha.IStorageManager;
23  import org.apache.sandesha.RMMessageContext;
24  import org.apache.sandesha.storage.Callback;
25  import org.apache.sandesha.storage.dao.ISandeshaDAO;
26  import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
27  import org.apache.sandesha.ws.rm.RMHeaders;
28  
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.Map;
32  import java.util.Set;
33  
34  /***
35   * ServerStorageManager is the access point for the SandeshaQueue from server side.
36   *
37   * @author Chamikara Jayalath
38   * @author Jaliya Ekanayaka
39   */
40  
41  public class ServerStorageManager implements IStorageManager {
42  
43      public void setTerminateSend(String seqId) {
44  
45      }
46  
47      public void setTerminateReceived(String seqId) {
48  
49      }
50  
51      protected static Log log = LogFactory.getLog(ServerStorageManager.class.getName());
52      private ISandeshaDAO accessor;
53  
54      public ServerStorageManager() {
55          accessor =
56                  SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR,
57                          Constants.SERVER);
58      }
59  
60  
61      /***
62       * A very important method. Makes life easy for the thread or thread pool
63       * that is using this. Every thread just have to create an instance of
64       * ServerStorageManager and keep calling getNextMessageToProcess() and
65       * processing messages. The method will try to give the messages from the
66       * same sequence id. But if that doesnt hv processable messages it will go for
67       * a new sequence.
68       */
69      public RMMessageContext getNextMessageToProcess(Object seq) {
70  
71          if (seq == null)
72              return null;
73  
74          RMMessageContext nextMsg = accessor.getNextMsgContextToProcess(seq);
75          return nextMsg;
76      }
77  
78      public void setAcknowledged(String seqID, long msgNumber) {
79          accessor.markOutgoingMessageToDelete(seqID, new Long(msgNumber));
80      }
81  
82      public void init() {
83      }
84  
85      /***
86       * Used to find out weather the sequence with this id has already been
87       * created.
88       */
89      public boolean isSequenceExist(String sequenceID) {
90          return accessor.isIncomingSequenceExists(sequenceID);
91      }
92  
93      public boolean isResponseSequenceExist(String sequenceID) {
94          return accessor.isOutgoingSequenceExists(sequenceID);
95      }
96  
97      public Object getNextSeqToProcess() {
98          return accessor.getRandomSeqToProcess();
99      }
100 
101 
102     /***
103      * This is used to get a random message from the out queue Basically server
104      * sender will use this.
105      */
106     public synchronized RMMessageContext getNextMessageToSend() {
107         RMMessageContext msg;
108         msg = accessor.getNextPriorityMessageContextToSend();
109         if (msg == null)
110             msg = accessor.getNextOutgoingMsgContextToSend();
111         if (msg == null)
112             msg = accessor.getNextLowPriorityMessageContextToSend();
113 
114         if (msg != null && !msg.isLocked()) {
115             msg.setLocked(true);
116             return msg;
117         } else {
118             return null;
119         }
120 
121     }
122 
123     /***
124      * Will be used to add a new Sequence Hash to the In Queue.
125      */
126     public void addSequence(String sequenceId) {
127         boolean result = accessor.addIncomingSequence(sequenceId);
128         if (!result)
129             ServerStorageManager.log.error(Constants.ErrorMessages.SEQ_IS_NOT_CREATED);
130     }
131 
132     /***
133      * This gives a sorted(by keys) map of messageIds present for a sequence.
134      * This will be used to send Acks.
135      */
136     public Map getListOfMessageNumbers(String sequenceID) {
137         Set st = accessor.getAllReceivedMsgNumsOfIncomingSeq(sequenceID);
138         Iterator it = st.iterator();
139         //To find the largest id present
140         long largest = 0;
141         while (it.hasNext()) {
142             Long key = (Long) it.next();
143             if (key == null)
144                 continue;
145 
146             long l = key.longValue();
147             if (l > largest)
148                 largest = l;
149         }
150 
151         HashMap results = new HashMap();
152         //Add Keys to the results in order.
153         long currentPosition = 1;
154         for (long l = 1; l <= largest; l++) {
155             boolean present = st.contains(new Long(l));
156             if (present) {
157                 results.put(new Long(currentPosition), new Long(l));
158                 currentPosition++;
159             }
160         }
161         return results;
162     }
163 
164     public boolean isMessageExist(String sequenceID, long messageNumber) {
165         synchronized (accessor) {
166             return accessor.isIncomingMessageExists(sequenceID, new Long(messageNumber));
167         }
168     }
169 
170 
171     public void addCreateSequenceResponse(RMMessageContext rmMessageContext) {
172         addPriorityMessage(rmMessageContext);
173     }
174 
175     public void addCreateSequenceRequest(RMMessageContext rmMessageContext) {
176         addPriorityMessage(rmMessageContext);
177     }
178 
179     public void addAcknowledgement(RMMessageContext rmMessageContext) {
180         String sequenceID = rmMessageContext.getSequenceID();
181         if (sequenceID != null)
182             accessor.removeAllAcks(sequenceID);
183         addPriorityMessage(rmMessageContext);
184     }
185 
186     private void addPriorityMessage(RMMessageContext msg) {
187         accessor.addPriorityMessage(msg);
188     }
189 
190     public void setTemporaryOutSequence(String sequenceId, String outSequenceId) {
191         accessor.setOutSequence(sequenceId, outSequenceId);
192         accessor.setOutSequenceApproved(sequenceId, false);
193     }
194 
195     public boolean setApprovedOutSequence(String createSeqId, String newOutSequenceId) {
196 
197         String tempOutSeq = createSeqId;
198         if (tempOutSeq == null)
199             tempOutSeq = createSeqId;
200         String sequenceID = accessor.getSequenceOfOutSequence(tempOutSeq);
201 
202         if (sequenceID == null) {
203             ServerStorageManager.log.error(Constants.ErrorMessages.SET_APPROVED_OUT_SEQ);
204             return false;
205         }
206         accessor.setOutSequence(sequenceID, newOutSequenceId);
207         accessor.setOutSequenceApproved(sequenceID, true);
208         accessor.removeCreateSequenceMsg(tempOutSeq);
209         return true;
210     }
211 
212     public long getNextMessageNumber(String sequenceID) {
213         long l = accessor.getNextOutgoingMessageNumber(sequenceID);
214         return l;
215     }
216 
217     public void insertOutgoingMessage(RMMessageContext msg) {
218         String sequenceId = msg.getSequenceID();
219 
220         boolean exists = accessor.isOutgoingSequenceExists(sequenceId);
221         if (!exists)
222             accessor.addOutgoingSequence(sequenceId);
223         accessor.addMessageToOutgoingSequence(sequenceId, msg);
224 
225     }
226 
227     public void insertIncomingMessage(RMMessageContext rmMessageContext) {
228         RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
229         String sequenceId = rmHeaders.getSequence().getIdentifier().getIdentifier();
230         boolean exists = accessor.isIncomingSequenceExists(sequenceId);
231         if (!exists)
232             addSequence(sequenceId); //Creating new sequence
233 
234         //TODO: add getRmHeaders method to MessageContext
235         long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber();
236 
237         if (messageNumber <= 0)
238             return;
239 
240         Long msgNo = new Long(messageNumber);
241         accessor.addMessageToIncomingSequence(sequenceId, msgNo, rmMessageContext);
242         accessor.updateFinalMessageArrivedTime(sequenceId);
243 
244     }
245 
246     public RMMessageContext checkForResponseMessage(String sequenceId, String requestMsgId) {
247         return null;
248     }
249 
250     public void insertTerminateSeqMessage(RMMessageContext terminateSeqMessage) {
251         accessor.addLowPriorityMessage(terminateSeqMessage);
252     }
253 
254     public void setAckReceived(String seqId, long msgNo) {
255         accessor.setAckReceived(seqId, msgNo);
256 
257     }
258 
259     public void insertFault(RMMessageContext rmMsgCtx) {
260     }
261 
262 
263     public void addSendMsgNo(String seqId, long msgNo) {
264         accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId), msgNo);
265     }
266 
267     public boolean isSentMsg(String seqId, long msgNo) {
268         return accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId), msgNo);
269     }
270 
271 
272     public void addOutgoingSequence(String sequenceId) {
273         accessor.addOutgoingSequence(sequenceId);
274     }
275 
276     public void addIncomingSequence(String sequenceId) {
277         accessor.addIncomingSequence(sequenceId);
278     }
279 
280     public String getOutgoingSeqOfMsg(String msgId) {
281         return null;
282     }
283 
284     public void addRequestedSequence(String seqId) {
285         accessor.addRequestedSequence(seqId);
286     }
287 
288     public boolean isRequestedSeqPresent(String seqId) {
289         return accessor.isRequestedSeqPresent(seqId);
290     }
291 
292     public String getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg) {
293 
294         return msg.getSequenceID();
295     }
296 
297     public long getLastIncomingMsgNo(String seqId) {
298         return accessor.getLastIncomingMsgNo(seqId);
299     }
300 
301     public boolean hasLastIncomingMsgReceived(String seqId) {
302         return accessor.hasLastIncomingMsgReceived(seqId);
303     }
304 
305     public String getKeyFromOutgoingSeqId(String seqId) {
306         return null;
307     }
308 
309     public void setAcksTo(String seqId, String acksTo) {
310         accessor.setAcksTo(seqId, acksTo);
311     }
312 
313     public String getAcksTo(String seqId) {
314         return accessor.getAcksTo(seqId);
315     }
316 
317     public void setCallback(Callback cb) {
318     }
319 
320     public void removeCallback() {
321     }
322 
323     public void addOffer(String msgID, String offerID) {
324 
325     }
326 
327     public String getOffer(String msgID) {
328         return null;
329     }
330 
331     public void clearStorage() {
332         accessor.clear();
333     }
334 
335     public boolean isSequenceComplete(String seqId) {
336         return false;
337     }
338 
339     public void sendAck(String sequenceId) {
340         accessor.sendAck(sequenceId);
341     }
342 }