1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.sandesha.server;
19
20 import org.apache.axis.AxisFault;
21 import org.apache.axis.Message;
22 import org.apache.axis.SimpleChain;
23 import org.apache.axis.client.Call;
24 import org.apache.axis.client.Service;
25 import org.apache.axis.components.logger.LogFactory;
26 import org.apache.axis.components.uuid.UUIDGen;
27 import org.apache.axis.components.uuid.UUIDGenFactory;
28 import org.apache.axis.message.addressing.AddressingHeaders;
29 import org.apache.commons.logging.Log;
30 import org.apache.sandesha.Constants;
31 import org.apache.sandesha.EnvelopeCreator;
32 import org.apache.sandesha.IStorageManager;
33 import org.apache.sandesha.RMMessageContext;
34 import org.apache.sandesha.server.msgprocessors.IRMMessageProcessor;
35 import org.apache.sandesha.storage.Callback;
36 import org.apache.sandesha.storage.CallbackData;
37 import org.apache.sandesha.util.PolicyLoader;
38 import org.apache.sandesha.ws.rm.RMHeaders;
39
40 import javax.xml.rpc.ServiceException;
41 import javax.xml.soap.SOAPEnvelope;
42 import javax.xml.soap.SOAPException;
43
44 /***
45 * This is the worker for the Sender. Sender will start several workers depending on the
46 * Constants value SENDER_THREADS in the Constants file.
47 *
48 * @author Jaliya Ekanayake
49 * @author Chamikara Jayalath
50 */
51 public class SenderWorker implements Runnable {
52 private static final Log log = LogFactory.getLog(SenderWorker.class.getName());
53 public static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
54 public static Callback callback;
55 public boolean running = true;
56 private IStorageManager storageManager;
57
58
59 public static synchronized Callback getCallback() {
60 return callback;
61 }
62
63 public static synchronized void setCallback(Callback cb) {
64 callback = cb;
65 }
66
67 private SimpleChain requestChain = null;
68 private SimpleChain responseChain = null;
69
70 public SimpleChain getRequestChain() {
71 return requestChain;
72 }
73
74 public void setRequestChain(SimpleChain requestChain) {
75 this.requestChain = requestChain;
76 }
77
78 public SimpleChain getResponseChain() {
79 return responseChain;
80 }
81
82 public void setResponseChain(SimpleChain responseChanin) {
83 this.responseChain = responseChanin;
84 }
85
86 public SenderWorker() {
87 storageManager = new ServerStorageManager();
88 }
89
90 public SenderWorker(IStorageManager storageManager) {
91 this.storageManager = storageManager;
92 }
93
94 public boolean isRunning() {
95 return running;
96 }
97
98 public void setRunning(boolean running) {
99 this.running = running;
100 }
101
102 public void run() {
103
104 while (running) {
105 long startTime = System.currentTimeMillis();
106 boolean hasMessages = true;
107
108 do {
109
110 RMMessageContext rmMessageContext = storageManager.getNextMessageToSend();
111 if (rmMessageContext == null) {
112 hasMessages = false;
113 } else {
114 long inactivityTimeout = PolicyLoader.getInstance().getInactivityTimeout();
115 long retransmissionInterval = PolicyLoader.getInstance()
116 .getBaseRetransmissionInterval();
117
118 if (rmMessageContext.getFristProcessedTime() == 0)
119 rmMessageContext.setFristProcessedTime(System.currentTimeMillis());
120
121 if ((System.currentTimeMillis() - rmMessageContext.getFristProcessedTime()) >
122 inactivityTimeout) {
123 log.error("Inactivity Time Out Reached for the message with <wsa:MessageID> " +
124 rmMessageContext.getMessageID());
125
126
127
128 } else if (rmMessageContext.getRetransmissionTime() <
129 (System.currentTimeMillis() - rmMessageContext.getLastPrecessedTime())) {
130 try {
131
132 rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
133
134 if (PolicyLoader.getInstance().getExponentialBackoff() != null) {
135 long newRtTime = ((long) Math.pow(retransmissionInterval / 1000,
136 rmMessageContext.getReTransmissionCount())) * 1000;
137 rmMessageContext.setRetransmissionTime(newRtTime);
138
139 } else {
140
141 long rtTime = rmMessageContext.getRetransmissionTime();
142 rmMessageContext.setRetransmissionTime(2 * rtTime);
143
144 }
145 sendMessage(rmMessageContext);
146 rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
147
148 rmMessageContext.setLocked(false);
149
150 } catch (AxisFault e) {
151 rmMessageContext.setLocked(false);
152 log.error(e);
153 } catch (SOAPException e) {
154 rmMessageContext.setLocked(false);
155 log.error(e);
156 } catch (Exception e) {
157 rmMessageContext.setLocked(false);
158 log.error(e);
159 }
160 }
161 rmMessageContext.setLocked(false);
162
163 }
164 } while (hasMessages);
165
166 long timeGap = System.currentTimeMillis() - startTime;
167 if ((timeGap - Constants.SENDER_SLEEP_TIME) <= 0) {
168 try {
169 Thread.sleep(Constants.SENDER_SLEEP_TIME - timeGap);
170 } catch (Exception ex) {
171 log.error(ex);
172 }
173 }
174 }
175 }
176
177 private void sendMessage(RMMessageContext rmMessageContext) throws Exception {
178 switch (rmMessageContext.getMessageType()) {
179 case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
180 {
181 if (log.isDebugEnabled())
182 log.debug(Constants.InfomationMessage.SENDING_CREATE_SEQ);
183 sendCreateSequenceRequest(rmMessageContext);
184 break;
185 }
186 case Constants.MSG_TYPE_CREATE_SEQUENCE_RESPONSE:
187 {
188 if (log.isDebugEnabled())
189 log.debug(Constants.InfomationMessage.SENDING_CREATE_SEQ_RES);
190
191 sendCreateSequenceResponse(rmMessageContext);
192 break;
193 }
194 case Constants.MSG_TYPE_TERMINATE_SEQUENCE:
195 {
196 if (log.isDebugEnabled())
197 log.debug(Constants.InfomationMessage.SENDING_TERMINATE_SEQ);
198 sendTerminateSequenceRequest(rmMessageContext);
199 storageManager.setTerminateSend(storageManager.getKeyFromOutgoingSeqId(rmMessageContext.getSequenceID()));
200 break;
201 }
202 case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
203 {
204 if (log.isDebugEnabled())
205 log.debug(Constants.InfomationMessage.SENDING_ACK);
206 sendAcknowldgement(rmMessageContext);
207 break;
208 }
209 case Constants.MSG_TYPE_SERVICE_REQUEST:
210 {
211 if (log.isDebugEnabled())
212 log.debug(Constants.InfomationMessage.SENDING_REQ);
213 sendServiceRequest(rmMessageContext);
214 break;
215 }
216 case Constants.MSG_TYPE_SERVICE_RESPONSE:
217 {
218 if (log.isDebugEnabled())
219 log.debug(Constants.InfomationMessage.SENDING_RES);
220 sendServiceResponse(rmMessageContext);
221 break;
222 }
223 }
224 }
225
226
227 /***
228 * @param rmMessageContext
229 */
230 private void sendTerminateSequenceRequest(RMMessageContext rmMessageContext) throws Exception {
231 SOAPEnvelope terSeqEnv = EnvelopeCreator.createTerminatSeqMessage(rmMessageContext);
232
233 Message terSeqMsg = new Message(terSeqEnv);
234 rmMessageContext.getMsgContext().setRequestMessage(terSeqMsg);
235
236 Call call;
237 call = prepareCall(rmMessageContext);
238 call.invoke();
239
240 processResponseMessage(call, rmMessageContext);
241 }
242
243 private void sendServiceResponse(RMMessageContext rmMessageContext) throws Exception {
244 SOAPEnvelope responseEnvelope = null;
245 responseEnvelope = EnvelopeCreator.createServiceResponseEnvelope(rmMessageContext);
246
247
248 rmMessageContext.getMsgContext().setRequestMessage(new Message(responseEnvelope));
249
250
251 Service service = new Service();
252 Call call = (Call) service.createCall();
253
254 if (rmMessageContext.getAddressingHeaders().getAction() != null) {
255 call.setSOAPActionURI(rmMessageContext.getAddressingHeaders().getAction().toString());
256 }
257
258 call.setTargetEndpointAddress(rmMessageContext.getAddressingHeaders().getReplyTo().getAddress().toString());
259
260
261 String soapMsg = rmMessageContext.getMsgContext().getRequestMessage().getSOAPPartAsString();
262
263
264 if (soapMsg != null)
265 call.setRequestMessage(new Message(soapMsg));
266 else {
267 call.setRequestMessage(new Message(rmMessageContext.getMsgContext().getRequestMessage().getSOAPEnvelope()));
268 }
269
270
271
272
273 storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
274 rmMessageContext.getMsgNumber());
275 call.invoke();
276
277 }
278
279 private void sendCreateSequenceRequest(RMMessageContext rmMsgCtx) throws Exception {
280 Call call;
281
282 SOAPEnvelope reqEnvelope = EnvelopeCreator.createCreateSequenceEnvelope(rmMsgCtx);
283 rmMsgCtx.getMsgContext().setRequestMessage(new Message(reqEnvelope));
284
285 call = prepareCall(rmMsgCtx);
286 call.invoke();
287
288 processResponseMessage(call, rmMsgCtx);
289
290 }
291
292 private void sendCreateSequenceResponse(RMMessageContext rmMessageContext) throws Exception {
293
294
295
296 if (rmMessageContext.getMsgContext().getResponseMessage() == null) {
297
298 log.error(Constants.ErrorMessages.NULL_REQUEST_MSG);
299 } else {
300 Call call = prepareCall(rmMessageContext);
301 call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
302 call.invoke();
303 }
304 }
305
306 private void sendAcknowldgement(RMMessageContext rmMessageContext) throws Exception {
307
308
309 if (rmMessageContext.getMsgContext().getResponseMessage() == null) {
310 log.error(Constants.ErrorMessages.NULL_REQUEST_MSG);
311 } else {
312 Call call = prepareCall(rmMessageContext);
313 call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
314 call.invoke();
315 }
316 }
317
318 private Call prepareCall(RMMessageContext rmMessageContext) throws ServiceException, AxisFault {
319 Service service = new Service();
320 Call call = (Call) service.createCall();
321 call.setTargetEndpointAddress(rmMessageContext.getOutGoingAddress());
322
323 call.setClientHandlers(requestChain, responseChain);
324 if (rmMessageContext.getMsgContext().getRequestMessage() != null) {
325 String soapMsg = rmMessageContext.getMsgContext().getRequestMessage()
326 .getSOAPPartAsString();
327 call.setRequestMessage(new Message(soapMsg));
328 if (rmMessageContext.getAddressingHeaders().getAction() != null) {
329 call.setSOAPActionURI(rmMessageContext.getAddressingHeaders().getAction().toString());
330 }
331 }
332 return call;
333 }
334
335 private void sendServiceRequest(RMMessageContext rmMessageContext) throws Exception {
336
337 SOAPEnvelope requestEnvelope = null;
338
339
340 requestEnvelope = EnvelopeCreator.createServiceRequestEnvelope(rmMessageContext);
341 rmMessageContext.getMsgContext().setRequestMessage(new Message(requestEnvelope));
342 if (rmMessageContext.getSync()) {
343 Call call;
344 call = prepareCall(rmMessageContext);
345
346 storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
347 rmMessageContext.getMsgNumber());
348 call.invoke();
349 processResponseMessage(call, rmMessageContext);
350
351 } else {
352 Call call = prepareCall(rmMessageContext);
353 storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
354 rmMessageContext.getMsgNumber());
355 call.invoke();
356 processResponseMessage(call, rmMessageContext);
357
358 }
359 }
360
361 private void processResponseMessage(Call call, RMMessageContext rmMessageContext)
362 throws Exception {
363
364 if (call.getResponseMessage() != null) {
365 RMHeaders rmHeaders = new RMHeaders();
366 rmHeaders.fromSOAPEnvelope(call.getResponseMessage().getSOAPEnvelope());
367 rmMessageContext.setRMHeaders(rmHeaders);
368 AddressingHeaders addrHeaders = new AddressingHeaders(call.getResponseMessage().getSOAPEnvelope());
369 rmMessageContext.setAddressingHeaders(addrHeaders);
370 rmMessageContext.getMsgContext().setResponseMessage(call.getResponseMessage());
371 IRMMessageProcessor messagePrcessor = RMMessageProcessorIdentifier.getMessageProcessor(rmMessageContext, storageManager);
372 messagePrcessor.processMessage(rmMessageContext);
373 }
374
375 if (getCallback() != null) {
376 CallbackData data = new CallbackData();
377 data.setMessageId(rmMessageContext.getMessageID());
378 data.setMessageType(rmMessageContext.getMessageType());
379 data.setSequenceId(rmMessageContext.getSequenceID());
380 callback.onIncomingMessage(data);
381 }
382
383 }
384
385 }