1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.sandesha.client;
18
19 import org.apache.axis.components.logger.LogFactory;
20 import org.apache.axis.message.addressing.RelatesTo;
21 import org.apache.commons.logging.Log;
22 import org.apache.sandesha.Constants;
23 import org.apache.sandesha.IStorageManager;
24 import org.apache.sandesha.RMMessageContext;
25 import org.apache.sandesha.storage.Callback;
26 import org.apache.sandesha.storage.CallbackData;
27 import org.apache.sandesha.storage.dao.ISandeshaDAO;
28 import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
29 import org.apache.sandesha.ws.rm.RMHeaders;
30
31 import java.util.HashMap;
32 import java.util.Iterator;
33 import java.util.Map;
34 import java.util.Set;
35
36 /***
37 * This is the storage manager for Client side in Sandesha
38 * Provides the access points for the SandeshaQueue.
39 *
40 * @author Chamikara Jayalath
41 * @author Jaliya Ekanayake
42 */
43 public class ClientStorageManager implements IStorageManager {
44
45 protected static Log log = LogFactory.getLog(ClientStorageManager.class.getName());
46
47 private ISandeshaDAO accessor;
48 private static Callback callBack;
49
50 public void init() {
51 }
52
53 public ClientStorageManager() {
54 accessor = SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR,
55 Constants.CLIENT);
56 }
57
58 public boolean isSequenceExist(String sequenceID) {
59 return accessor.isOutgoingSequenceExists(sequenceID);
60 }
61
62 public boolean isResponseSequenceExist(String sequenceID) {
63 return accessor.isIncomingSequenceExists(sequenceID);
64 }
65
66 public Object getNextSeqToProcess() {
67 return null;
68 }
69
70 public RMMessageContext getNextMessageToProcess(Object seq) {
71 return null;
72 }
73
74 public void setAcknowledged(String seqID, long msgNumber) {
75 accessor.markOutgoingMessageToDelete(seqID, new Long(msgNumber));
76
77 }
78
79 public void addSequence(String sequenceID) {
80 boolean result = accessor.addOutgoingSequence(sequenceID);
81 if (!result)
82 log.error("Sequence was not created correctly in the in the queue");
83 }
84
85 /***
86 * This will be used both by the Sender and the SimpleAxisServer to set the
87 * create sequence responses.
88 */
89 public void addCreateSequenceResponse(RMMessageContext rmMessageContext) {
90 addPriorityMessage(rmMessageContext);
91 }
92
93 /***
94 * This will be used by the RMSender to add the create sequence request.
95 */
96 public void addCreateSequenceRequest(RMMessageContext rmMessageContext) {
97 addPriorityMessage(rmMessageContext);
98 }
99
100 /***
101 * SimpleAxisServer will use this method to add acks for the application
102 * responses received from the server side.
103 */
104 public void addAcknowledgement(RMMessageContext rmMessageContext) {
105 String sequenceID = rmMessageContext.getSequenceID();
106 if (sequenceID != null)
107 accessor.removeAllAcks(sequenceID);
108
109 addPriorityMessage(rmMessageContext);
110 }
111
112
113 private void addPriorityMessage(RMMessageContext msg) {
114 accessor.addPriorityMessage(msg);
115 }
116
117 /***
118 * Check the existance of a message.
119 */
120 public boolean isMessageExist(String sequenceID, long messageNumber) {
121 return accessor.isIncomingMessageExists(sequenceID, new Long(messageNumber));
122 }
123
124 /***
125 * Get a Map of messages.
126 */
127 public Map getListOfMessageNumbers(String sequenceID) {
128 String seq = sequenceID;
129 Set st = accessor.getAllReceivedMsgNumsOfIncomingSeq(seq);
130 Iterator it = st.iterator();
131
132 long largest = 0;
133 while (it.hasNext()) {
134 Long key = (Long) it.next();
135 if (null == key)
136 continue;
137
138 long l = key.longValue();
139 if (l > largest)
140 largest = l;
141 }
142
143 HashMap results = new HashMap();
144
145 long currentPosition = 1;
146 for (long l = 1; l <= largest; l++) {
147 boolean present = st.contains(new Long(l));
148 if (present) {
149 results.put(new Long(currentPosition), new Long(l));
150 currentPosition++;
151 }
152 }
153 return results;
154 }
155
156 /***
157 * This will be used by the sender.
158 */
159 public synchronized RMMessageContext getNextMessageToSend() {
160 RMMessageContext msg;
161 msg = accessor.getNextPriorityMessageContextToSend();
162 if (msg == null)
163 msg = accessor.getNextOutgoingMsgContextToSend();
164
165 if (null == msg) {
166 msg = accessor.getNextLowPriorityMessageContextToSend();
167
168
169 }
170 if (null != callBack && null != msg)
171 informOutgoingMessage(msg);
172
173 if (msg != null && !msg.isLocked()) {
174 msg.setLocked(true);
175 return msg;
176 } else {
177 return null;
178 }
179 }
180
181 /***
182 * This will be used by the RMSender when adding messages to the Queue.
183 * RMSender will also add a createSequenceRequest message to the prioriy
184 * queue using this temporary ID as the messageID.
185 */
186 public void setTemporaryOutSequence(String sequenceId, String outSequenceId) {
187 synchronized (this) {
188 accessor.setOutSequence(sequenceId, outSequenceId);
189 accessor.setOutSequenceApproved(sequenceId, false);
190 }
191 }
192
193 /***
194 * This will be used by the Client Listener and the Sender to set the
195 * proper sequenceID
196 */
197 public boolean setApprovedOutSequence(String oldSeqId, String newSeqId) {
198 if (oldSeqId == null) {
199 return false;
200 }
201 String sequenceID = accessor.getSequenceOfOutSequence(oldSeqId);
202 if (null == sequenceID) {
203 log.error(Constants.ErrorMessages.SET_APPROVED_OUT_SEQ);
204 return false;
205 }
206 accessor.setOutSequence(sequenceID, newSeqId);
207 accessor.setOutSequenceApproved(sequenceID, true);
208 accessor.removeCreateSequenceMsg(oldSeqId);
209 return true;
210
211 }
212
213 /***
214 * This will be used by the RMSender when adding messages. Initially it
215 * should return 1.
216 */
217 public long getNextMessageNumber(String sequenceID) {
218 long msgNo = accessor.getNextOutgoingMessageNumber(sequenceID);
219 return msgNo;
220 }
221
222 public void insertOutgoingMessage(RMMessageContext msg) {
223 String sequenceId = msg.getSequenceID();
224 accessor.addMessageToOutgoingSequence(sequenceId, msg);
225 }
226
227 public void insertIncomingMessage(RMMessageContext rmMessageContext) {
228 RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
229 RelatesTo relatesTo = (RelatesTo) rmMessageContext.getAddressingHeaders().getRelatesTo()
230 .get(0);
231 String messageId = relatesTo.getURI().toString();
232 String sequenceId = null;
233
234 sequenceId = accessor.searchForSequenceId(messageId);
235
236 boolean exists = accessor.isIncomingSequenceExists(sequenceId);
237
238 if (!exists) {
239 accessor.addIncomingSequence(sequenceId);
240 }
241
242 long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber();
243 if (messageNumber <= 0)
244 return;
245 Long msgNo = new Long(messageNumber);
246 accessor.addMessageToIncomingSequence(sequenceId, msgNo, rmMessageContext);
247 accessor.updateFinalMessageArrivedTime(sequenceId);
248 }
249
250 public RMMessageContext checkForResponseMessage(String sequenceId, String requestMsgId) {
251 RMMessageContext response = accessor.checkForResponseMessage(requestMsgId, sequenceId);
252 return response;
253
254 }
255
256 public void insertTerminateSeqMessage(RMMessageContext terminateSeqMessage) {
257 accessor.addLowPriorityMessage(terminateSeqMessage);
258 }
259
260 public void setAckReceived(String seqId, long msgNo) {
261 accessor.setAckReceived(seqId, msgNo);
262 }
263
264 public void insertFault(RMMessageContext rmMsgCtx) {
265
266 }
267
268 public void addSendMsgNo(String seqId, long msgNo) {
269 accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId), msgNo);
270 }
271
272 public void addOutgoingSequence(String sequenceId) {
273 accessor.addOutgoingSequence(sequenceId);
274 }
275
276 public void addIncomingSequence(String sequenceId) {
277 accessor.addIncomingSequence(sequenceId);
278 }
279
280 public long getLastIncomingMsgNo(String seqId) {
281 String key = accessor.getKeyFromIncomingSequenceId(seqId);
282 return accessor.getLastIncomingMsgNo(key);
283 }
284
285 public boolean hasLastIncomingMsgReceived(String seqId) {
286 String key = accessor.getKeyFromIncomingSequenceId(seqId);
287 return accessor.hasLastIncomingMsgReceived(key);
288 }
289
290 public void addRequestedSequence(String seqId) {
291 accessor.addRequestedSequence(seqId);
292 }
293
294 public boolean isRequestedSeqPresent(String seqId) {
295 return accessor.isRequestedSeqPresent(seqId);
296 }
297
298 public boolean isSentMsg(String seqId, long msgNo) {
299 return accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId), msgNo);
300 }
301
302 public String getOutgoingSeqOfMsg(String msgId) {
303 return accessor.searchForSequenceId(msgId);
304 }
305
306 public String getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg) {
307
308 RelatesTo relatesTo = (RelatesTo) msg.getAddressingHeaders().getRelatesTo().get(0);
309 String msgId = relatesTo.getURI().toString();
310 return accessor.searchForSequenceId(msgId);
311 }
312
313 public void setTerminateSend(String seqId) {
314 accessor.setTerminateSend(seqId);
315 }
316
317 public void setTerminateReceived(String seqId) {
318 accessor.setTerminateReceived(seqId);
319 }
320
321 public String getKeyFromOutgoingSeqId(String seqId) {
322 return accessor.getKeyFromOutgoingSequenceId(seqId);
323 }
324
325 public void setAcksTo(String seqId, String acksTo) {
326 accessor.setAcksTo(seqId, acksTo);
327 }
328
329 public String getAcksTo(String seqId) {
330 return accessor.getAcksTo(seqId);
331 }
332
333 public void addOffer(String msgID, String offerID) {
334 accessor.addOffer(msgID, offerID);
335 }
336
337 public String getOffer(String msgID) {
338 return accessor.getOffer(msgID);
339 }
340
341 public void setCallback(Callback cb) {
342 callBack = cb;
343 }
344
345 public void removeCallback() {
346 callBack = null;
347 }
348
349 private void informOutgoingMessage(RMMessageContext rmMsgContext) {
350
351 CallbackData cbData = new CallbackData();
352
353
354 if (null != rmMsgContext) {
355 cbData.setSequenceId(rmMsgContext.getSequenceID());
356 cbData.setMessageId(rmMsgContext.getMessageID());
357 cbData.setMessageType(rmMsgContext.getMessageType());
358 }
359
360 if (null != callBack)
361 callBack.onOutgoingMessage(cbData);
362 }
363
364 public void clearStorage() {
365 accessor.clear();
366 }
367
368 public boolean isSequenceComplete(String seqId) {
369 boolean outTerminateSent = accessor.isOutgoingTerminateSent(seqId);
370 boolean incomingTerminateReceived = accessor.isIncommingTerminateReceived(seqId);
371 return outTerminateSent && incomingTerminateReceived;
372 }
373
374 public void sendAck(String sequenceId) {
375 String keyId = accessor.getKeyFromIncomingSequenceId(sequenceId);
376 accessor.sendAck(keyId);
377 }
378
379
380 }