View Javadoc

1   /*
2    * Copyright  1999-2004 The Apache Software Foundation.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  
18  package org.apache.sandesha.server;
19  
20  import org.apache.axis.AxisFault;
21  import org.apache.axis.Message;
22  import org.apache.axis.SimpleChain;
23  import org.apache.axis.client.Call;
24  import org.apache.axis.client.Service;
25  import org.apache.axis.components.logger.LogFactory;
26  import org.apache.axis.components.uuid.UUIDGen;
27  import org.apache.axis.components.uuid.UUIDGenFactory;
28  import org.apache.axis.message.addressing.AddressingHeaders;
29  import org.apache.commons.logging.Log;
30  import org.apache.sandesha.Constants;
31  import org.apache.sandesha.EnvelopeCreator;
32  import org.apache.sandesha.IStorageManager;
33  import org.apache.sandesha.RMMessageContext;
34  import org.apache.sandesha.server.msgprocessors.IRMMessageProcessor;
35  import org.apache.sandesha.storage.Callback;
36  import org.apache.sandesha.storage.CallbackData;
37  import org.apache.sandesha.util.PolicyLoader;
38  import org.apache.sandesha.ws.rm.RMHeaders;
39  
40  import javax.xml.rpc.ServiceException;
41  import javax.xml.soap.SOAPEnvelope;
42  import javax.xml.soap.SOAPException;
43  
44  /***
45   * This is the worker for the Sender. Sender will start several workers depending on the
46   * Constants value SENDER_THREADS in the Constants file.
47   *
48   * @author Jaliya Ekanayake
49   * @author Chamikara Jayalath
50   */
51  public class SenderWorker implements Runnable {
52      private static final Log log = LogFactory.getLog(SenderWorker.class.getName());
53      public static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
54      public static Callback callback;
55      public boolean running = true;
56      private IStorageManager storageManager;
57  
58  
59      public static synchronized Callback getCallback() {
60          return callback;
61      }
62  
63      public static synchronized void setCallback(Callback cb) {
64          callback = cb;
65      }
66  
67      private SimpleChain requestChain = null;
68      private SimpleChain responseChain = null;
69  
70      public SimpleChain getRequestChain() {
71          return requestChain;
72      }
73  
74      public void setRequestChain(SimpleChain requestChain) {
75          this.requestChain = requestChain;
76      }
77  
78      public SimpleChain getResponseChain() {
79          return responseChain;
80      }
81  
82      public void setResponseChain(SimpleChain responseChanin) {
83          this.responseChain = responseChanin;
84      }
85  
86      public SenderWorker() {
87          storageManager = new ServerStorageManager();
88      }
89  
90      public SenderWorker(IStorageManager storageManager) {
91          this.storageManager = storageManager;
92      }
93  
94      public boolean isRunning() {
95          return running;
96      }
97  
98      public void setRunning(boolean running) {
99          this.running = running;
100     }
101 
102     public void run() {
103 
104         while (running) {
105             long startTime = System.currentTimeMillis();
106             boolean hasMessages = true;
107             //Take a messge from the storage and check whether we can send it.
108             do {
109 
110                 RMMessageContext rmMessageContext = storageManager.getNextMessageToSend();
111                 if (rmMessageContext == null) {
112                     hasMessages = false;
113                 } else {
114                     long inactivityTimeout = PolicyLoader.getInstance().getInactivityTimeout();
115                     long retransmissionInterval = PolicyLoader.getInstance()
116                             .getBaseRetransmissionInterval();
117 
118                     if (rmMessageContext.getFristProcessedTime() == 0)
119                         rmMessageContext.setFristProcessedTime(System.currentTimeMillis());
120 
121                     if ((System.currentTimeMillis() - rmMessageContext.getFristProcessedTime()) >
122                             inactivityTimeout) {
123                         log.error("Inactivity Time Out Reached for the message with <wsa:MessageID> " +
124                                 rmMessageContext.getMessageID());
125                         //Need to clear the storage only for this sequece.
126                         // storageManager.clearStorage();
127 
128                     } else if (rmMessageContext.getRetransmissionTime() <
129                             (System.currentTimeMillis() - rmMessageContext.getLastPrecessedTime())) {
130                         try {
131 
132                             rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
133 
134                             if (PolicyLoader.getInstance().getExponentialBackoff() != null) {
135                                 long newRtTime = ((long) Math.pow(retransmissionInterval / 1000,
136                                         rmMessageContext.getReTransmissionCount())) * 1000;
137                                 rmMessageContext.setRetransmissionTime(newRtTime);
138 
139                             } else {
140                                 //Let's do Binary Back Off
141                                 long rtTime = rmMessageContext.getRetransmissionTime();
142                                 rmMessageContext.setRetransmissionTime(2 * rtTime);
143 
144                             }
145                             sendMessage(rmMessageContext);
146                             rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
147 
148                             rmMessageContext.setLocked(false);
149 
150                         } catch (AxisFault e) {
151                             rmMessageContext.setLocked(false);
152                             log.error(e);
153                         } catch (SOAPException e) {
154                             rmMessageContext.setLocked(false);
155                             log.error(e);
156                         } catch (Exception e) {
157                             rmMessageContext.setLocked(false);
158                             log.error(e);
159                         }
160                     }
161                     rmMessageContext.setLocked(false);
162 
163                 }
164             } while (hasMessages);
165 
166             long timeGap = System.currentTimeMillis() - startTime;
167             if ((timeGap - Constants.SENDER_SLEEP_TIME) <= 0) {
168                 try {
169                     Thread.sleep(Constants.SENDER_SLEEP_TIME - timeGap);
170                 } catch (Exception ex) {
171                     log.error(ex);
172                 }
173             }
174         }
175     }
176 
177     private void sendMessage(RMMessageContext rmMessageContext) throws Exception {
178         switch (rmMessageContext.getMessageType()) {
179             case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
180                 {
181                     if (log.isDebugEnabled())
182                         log.debug(Constants.InfomationMessage.SENDING_CREATE_SEQ);
183                     sendCreateSequenceRequest(rmMessageContext);
184                     break;
185                 }
186             case Constants.MSG_TYPE_CREATE_SEQUENCE_RESPONSE:
187                 {
188                     if (log.isDebugEnabled())
189                         log.debug(Constants.InfomationMessage.SENDING_CREATE_SEQ_RES);
190 
191                     sendCreateSequenceResponse(rmMessageContext);
192                     break;
193                 }
194             case Constants.MSG_TYPE_TERMINATE_SEQUENCE:
195                 {
196                     if (log.isDebugEnabled())
197                         log.debug(Constants.InfomationMessage.SENDING_TERMINATE_SEQ);
198                     sendTerminateSequenceRequest(rmMessageContext);
199                     storageManager.setTerminateSend(storageManager.getKeyFromOutgoingSeqId(rmMessageContext.getSequenceID()));
200                     break;
201                 }
202             case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
203                 {
204                     if (log.isDebugEnabled())
205                         log.debug(Constants.InfomationMessage.SENDING_ACK);
206                     sendAcknowldgement(rmMessageContext);
207                     break;
208                 }
209             case Constants.MSG_TYPE_SERVICE_REQUEST:
210                 {
211                     if (log.isDebugEnabled())
212                         log.debug(Constants.InfomationMessage.SENDING_REQ);
213                     sendServiceRequest(rmMessageContext);
214                     break;
215                 }
216             case Constants.MSG_TYPE_SERVICE_RESPONSE:
217                 {
218                     if (log.isDebugEnabled())
219                         log.debug(Constants.InfomationMessage.SENDING_RES);
220                     sendServiceResponse(rmMessageContext);
221                     break;
222                 }
223         }
224     }
225 
226 
227     /***
228      * @param rmMessageContext
229      */
230     private void sendTerminateSequenceRequest(RMMessageContext rmMessageContext) throws Exception {
231         SOAPEnvelope terSeqEnv = EnvelopeCreator.createTerminatSeqMessage(rmMessageContext);
232 
233         Message terSeqMsg = new Message(terSeqEnv);
234         rmMessageContext.getMsgContext().setRequestMessage(terSeqMsg);
235 
236         Call call;
237         call = prepareCall(rmMessageContext);
238         call.invoke();
239 
240         processResponseMessage(call, rmMessageContext);
241     }
242 
243     private void sendServiceResponse(RMMessageContext rmMessageContext) throws Exception {
244         SOAPEnvelope responseEnvelope = null;
245         responseEnvelope = EnvelopeCreator.createServiceResponseEnvelope(rmMessageContext);
246 
247 
248         rmMessageContext.getMsgContext().setRequestMessage(new Message(responseEnvelope));
249         //rmMessageContext.getMsgContext().setResponseMessage(new Message(responseEnvelope));
250 
251         Service service = new Service();
252         Call call = (Call) service.createCall();
253 
254         if (rmMessageContext.getAddressingHeaders().getAction() != null) {
255             call.setSOAPActionURI(rmMessageContext.getAddressingHeaders().getAction().toString());
256         }
257 
258         call.setTargetEndpointAddress(rmMessageContext.getAddressingHeaders().getReplyTo().getAddress().toString());
259 
260         //NOTE: WE USE THE REQUEST MESSAGE TO SEND THE RESPONSE.
261         String soapMsg = rmMessageContext.getMsgContext().getRequestMessage().getSOAPPartAsString();
262 
263 
264         if (soapMsg != null)
265             call.setRequestMessage(new Message(soapMsg));
266         else {
267             call.setRequestMessage(new Message(rmMessageContext.getMsgContext().getRequestMessage().getSOAPEnvelope()));
268         }
269 
270         // rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
271         // rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
272         //We are not expecting the ack over the  same connection
273         storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
274                 rmMessageContext.getMsgNumber());
275         call.invoke();
276 
277     }
278 
279     private void sendCreateSequenceRequest(RMMessageContext rmMsgCtx) throws Exception {
280         Call call;
281 
282         SOAPEnvelope reqEnvelope = EnvelopeCreator.createCreateSequenceEnvelope(rmMsgCtx);
283         rmMsgCtx.getMsgContext().setRequestMessage(new Message(reqEnvelope));
284 
285         call = prepareCall(rmMsgCtx);
286         call.invoke();
287 
288         processResponseMessage(call, rmMsgCtx);
289 
290     }
291 
292     private void sendCreateSequenceResponse(RMMessageContext rmMessageContext) throws Exception {
293         //Here there is no concept of sending synchronous CreateSequenceRequest
294         // response.
295         //i.e. we are not expecting any response for this.
296         if (rmMessageContext.getMsgContext().getResponseMessage() == null) {
297             //The code should not come to this point.
298             log.error(Constants.ErrorMessages.NULL_REQUEST_MSG);
299         } else {
300             Call call = prepareCall(rmMessageContext);
301             call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
302             call.invoke();
303         }
304     }
305 
306     private void sendAcknowldgement(RMMessageContext rmMessageContext) throws Exception {
307         // Here there is no concept of sending synchronous CreateSequenceRequest
308         // resposne.
309         if (rmMessageContext.getMsgContext().getResponseMessage() == null) {
310             log.error(Constants.ErrorMessages.NULL_REQUEST_MSG);
311         } else {
312             Call call = prepareCall(rmMessageContext);
313             call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
314             call.invoke();
315         }
316     }
317 
318     private Call prepareCall(RMMessageContext rmMessageContext) throws ServiceException, AxisFault {
319         Service service = new Service();
320         Call call = (Call) service.createCall();
321         call.setTargetEndpointAddress(rmMessageContext.getOutGoingAddress());
322 
323         call.setClientHandlers(requestChain, responseChain);
324         if (rmMessageContext.getMsgContext().getRequestMessage() != null) {
325             String soapMsg = rmMessageContext.getMsgContext().getRequestMessage()
326                     .getSOAPPartAsString();
327             call.setRequestMessage(new Message(soapMsg));
328             if (rmMessageContext.getAddressingHeaders().getAction() != null) {
329                 call.setSOAPActionURI(rmMessageContext.getAddressingHeaders().getAction().toString());
330             }
331         }
332         return call;
333     }
334 
335     private void sendServiceRequest(RMMessageContext rmMessageContext) throws Exception {
336 
337         SOAPEnvelope requestEnvelope = null;
338         //Need to create the response envelope.
339 
340         requestEnvelope = EnvelopeCreator.createServiceRequestEnvelope(rmMessageContext);
341         rmMessageContext.getMsgContext().setRequestMessage(new Message(requestEnvelope));
342         if (rmMessageContext.getSync()) {
343             Call call;
344             call = prepareCall(rmMessageContext);
345             //CHECK THIS
346             storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
347                     rmMessageContext.getMsgNumber());
348             call.invoke();
349             processResponseMessage(call, rmMessageContext);
350 
351         } else {
352             Call call = prepareCall(rmMessageContext);
353             storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
354                     rmMessageContext.getMsgNumber());
355             call.invoke();
356             processResponseMessage(call, rmMessageContext);
357 
358         }
359     }
360 
361     private void processResponseMessage(Call call, RMMessageContext rmMessageContext)
362             throws Exception {
363 
364         if (call.getResponseMessage() != null) {
365             RMHeaders rmHeaders = new RMHeaders();
366             rmHeaders.fromSOAPEnvelope(call.getResponseMessage().getSOAPEnvelope());
367             rmMessageContext.setRMHeaders(rmHeaders);
368             AddressingHeaders addrHeaders = new AddressingHeaders(call.getResponseMessage().getSOAPEnvelope());
369             rmMessageContext.setAddressingHeaders(addrHeaders);
370             rmMessageContext.getMsgContext().setResponseMessage(call.getResponseMessage());
371             IRMMessageProcessor messagePrcessor = RMMessageProcessorIdentifier.getMessageProcessor(rmMessageContext, storageManager);
372             messagePrcessor.processMessage(rmMessageContext);
373         }
374 
375         if (getCallback() != null) {
376             CallbackData data = new CallbackData();
377             data.setMessageId(rmMessageContext.getMessageID());
378             data.setMessageType(rmMessageContext.getMessageType());
379             data.setSequenceId(rmMessageContext.getSequenceID());
380             callback.onIncomingMessage(data);
381         }
382 
383     }
384 
385 }