1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.sandesha.server;
18
19 import org.apache.axis.components.logger.LogFactory;
20 import org.apache.commons.logging.Log;
21 import org.apache.sandesha.Constants;
22 import org.apache.sandesha.IStorageManager;
23 import org.apache.sandesha.RMMessageContext;
24 import org.apache.sandesha.storage.Callback;
25 import org.apache.sandesha.storage.dao.ISandeshaDAO;
26 import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
27 import org.apache.sandesha.ws.rm.RMHeaders;
28
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.Map;
32 import java.util.Set;
33
34 /***
35 * ServerStorageManager is the access point for the SandeshaQueue from server side.
36 *
37 * @author Chamikara Jayalath
38 * @author Jaliya Ekanayaka
39 */
40
41 public class ServerStorageManager implements IStorageManager {
42
43 public void setTerminateSend(String seqId) {
44
45 }
46
47 public void setTerminateReceived(String seqId) {
48
49 }
50
51 protected static Log log = LogFactory.getLog(ServerStorageManager.class.getName());
52 private ISandeshaDAO accessor;
53
54 public ServerStorageManager() {
55 accessor =
56 SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR,
57 Constants.SERVER);
58 }
59
60
61 /***
62 * A very important method. Makes life easy for the thread or thread pool
63 * that is using this. Every thread just have to create an instance of
64 * ServerStorageManager and keep calling getNextMessageToProcess() and
65 * processing messages. The method will try to give the messages from the
66 * same sequence id. But if that doesnt hv processable messages it will go for
67 * a new sequence.
68 */
69 public RMMessageContext getNextMessageToProcess(Object seq) {
70
71 if (seq == null)
72 return null;
73
74 RMMessageContext nextMsg = accessor.getNextMsgContextToProcess(seq);
75 return nextMsg;
76 }
77
78 public void setAcknowledged(String seqID, long msgNumber) {
79 accessor.markOutgoingMessageToDelete(seqID, new Long(msgNumber));
80 }
81
82 public void init() {
83 }
84
85 /***
86 * Used to find out weather the sequence with this id has already been
87 * created.
88 */
89 public boolean isSequenceExist(String sequenceID) {
90 return accessor.isIncomingSequenceExists(sequenceID);
91 }
92
93 public boolean isResponseSequenceExist(String sequenceID) {
94 return accessor.isOutgoingSequenceExists(sequenceID);
95 }
96
97 public Object getNextSeqToProcess() {
98 return accessor.getRandomSeqToProcess();
99 }
100
101
102 /***
103 * This is used to get a random message from the out queue Basically server
104 * sender will use this.
105 */
106 public synchronized RMMessageContext getNextMessageToSend() {
107 RMMessageContext msg;
108 msg = accessor.getNextPriorityMessageContextToSend();
109 if (msg == null)
110 msg = accessor.getNextOutgoingMsgContextToSend();
111 if (msg == null)
112 msg = accessor.getNextLowPriorityMessageContextToSend();
113
114 if (msg != null && !msg.isLocked()) {
115 msg.setLocked(true);
116 return msg;
117 } else {
118 return null;
119 }
120
121 }
122
123 /***
124 * Will be used to add a new Sequence Hash to the In Queue.
125 */
126 public void addSequence(String sequenceId) {
127 boolean result = accessor.addIncomingSequence(sequenceId);
128 if (!result)
129 ServerStorageManager.log.error(Constants.ErrorMessages.SEQ_IS_NOT_CREATED);
130 }
131
132 /***
133 * This gives a sorted(by keys) map of messageIds present for a sequence.
134 * This will be used to send Acks.
135 */
136 public Map getListOfMessageNumbers(String sequenceID) {
137 Set st = accessor.getAllReceivedMsgNumsOfIncomingSeq(sequenceID);
138 Iterator it = st.iterator();
139
140 long largest = 0;
141 while (it.hasNext()) {
142 Long key = (Long) it.next();
143 if (key == null)
144 continue;
145
146 long l = key.longValue();
147 if (l > largest)
148 largest = l;
149 }
150
151 HashMap results = new HashMap();
152
153 long currentPosition = 1;
154 for (long l = 1; l <= largest; l++) {
155 boolean present = st.contains(new Long(l));
156 if (present) {
157 results.put(new Long(currentPosition), new Long(l));
158 currentPosition++;
159 }
160 }
161 return results;
162 }
163
164 public boolean isMessageExist(String sequenceID, long messageNumber) {
165 synchronized (accessor) {
166 return accessor.isIncomingMessageExists(sequenceID, new Long(messageNumber));
167 }
168 }
169
170
171 public void addCreateSequenceResponse(RMMessageContext rmMessageContext) {
172 addPriorityMessage(rmMessageContext);
173 }
174
175 public void addCreateSequenceRequest(RMMessageContext rmMessageContext) {
176 addPriorityMessage(rmMessageContext);
177 }
178
179 public void addAcknowledgement(RMMessageContext rmMessageContext) {
180 String sequenceID = rmMessageContext.getSequenceID();
181 if (sequenceID != null)
182 accessor.removeAllAcks(sequenceID);
183 addPriorityMessage(rmMessageContext);
184 }
185
186 private void addPriorityMessage(RMMessageContext msg) {
187 accessor.addPriorityMessage(msg);
188 }
189
190 public void setTemporaryOutSequence(String sequenceId, String outSequenceId) {
191 accessor.setOutSequence(sequenceId, outSequenceId);
192 accessor.setOutSequenceApproved(sequenceId, false);
193 }
194
195 public boolean setApprovedOutSequence(String createSeqId, String newOutSequenceId) {
196
197 String tempOutSeq = createSeqId;
198 if (tempOutSeq == null)
199 tempOutSeq = createSeqId;
200 String sequenceID = accessor.getSequenceOfOutSequence(tempOutSeq);
201
202 if (sequenceID == null) {
203 ServerStorageManager.log.error(Constants.ErrorMessages.SET_APPROVED_OUT_SEQ);
204 return false;
205 }
206 accessor.setOutSequence(sequenceID, newOutSequenceId);
207 accessor.setOutSequenceApproved(sequenceID, true);
208 accessor.removeCreateSequenceMsg(tempOutSeq);
209 return true;
210 }
211
212 public long getNextMessageNumber(String sequenceID) {
213 long l = accessor.getNextOutgoingMessageNumber(sequenceID);
214 return l;
215 }
216
217 public void insertOutgoingMessage(RMMessageContext msg) {
218 String sequenceId = msg.getSequenceID();
219
220 boolean exists = accessor.isOutgoingSequenceExists(sequenceId);
221 if (!exists)
222 accessor.addOutgoingSequence(sequenceId);
223 accessor.addMessageToOutgoingSequence(sequenceId, msg);
224
225 }
226
227 public void insertIncomingMessage(RMMessageContext rmMessageContext) {
228 RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
229 String sequenceId = rmHeaders.getSequence().getIdentifier().getIdentifier();
230 boolean exists = accessor.isIncomingSequenceExists(sequenceId);
231 if (!exists)
232 addSequence(sequenceId);
233
234
235 long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber();
236
237 if (messageNumber <= 0)
238 return;
239
240 Long msgNo = new Long(messageNumber);
241 accessor.addMessageToIncomingSequence(sequenceId, msgNo, rmMessageContext);
242 accessor.updateFinalMessageArrivedTime(sequenceId);
243
244 }
245
246 public RMMessageContext checkForResponseMessage(String sequenceId, String requestMsgId) {
247 return null;
248 }
249
250 public void insertTerminateSeqMessage(RMMessageContext terminateSeqMessage) {
251 accessor.addLowPriorityMessage(terminateSeqMessage);
252 }
253
254 public void setAckReceived(String seqId, long msgNo) {
255 accessor.setAckReceived(seqId, msgNo);
256
257 }
258
259 public void insertFault(RMMessageContext rmMsgCtx) {
260 }
261
262
263 public void addSendMsgNo(String seqId, long msgNo) {
264 accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId), msgNo);
265 }
266
267 public boolean isSentMsg(String seqId, long msgNo) {
268 return accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId), msgNo);
269 }
270
271
272 public void addOutgoingSequence(String sequenceId) {
273 accessor.addOutgoingSequence(sequenceId);
274 }
275
276 public void addIncomingSequence(String sequenceId) {
277 accessor.addIncomingSequence(sequenceId);
278 }
279
280 public String getOutgoingSeqOfMsg(String msgId) {
281 return null;
282 }
283
284 public void addRequestedSequence(String seqId) {
285 accessor.addRequestedSequence(seqId);
286 }
287
288 public boolean isRequestedSeqPresent(String seqId) {
289 return accessor.isRequestedSeqPresent(seqId);
290 }
291
292 public String getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg) {
293
294 return msg.getSequenceID();
295 }
296
297 public long getLastIncomingMsgNo(String seqId) {
298 return accessor.getLastIncomingMsgNo(seqId);
299 }
300
301 public boolean hasLastIncomingMsgReceived(String seqId) {
302 return accessor.hasLastIncomingMsgReceived(seqId);
303 }
304
305 public String getKeyFromOutgoingSeqId(String seqId) {
306 return null;
307 }
308
309 public void setAcksTo(String seqId, String acksTo) {
310 accessor.setAcksTo(seqId, acksTo);
311 }
312
313 public String getAcksTo(String seqId) {
314 return accessor.getAcksTo(seqId);
315 }
316
317 public void setCallback(Callback cb) {
318 }
319
320 public void removeCallback() {
321 }
322
323 public void addOffer(String msgID, String offerID) {
324
325 }
326
327 public String getOffer(String msgID) {
328 return null;
329 }
330
331 public void clearStorage() {
332 accessor.clear();
333 }
334
335 public boolean isSequenceComplete(String seqId) {
336 return false;
337 }
338
339 public void sendAck(String sequenceId) {
340 accessor.sendAck(sequenceId);
341 }
342 }