View Javadoc

1   /*
2   * Copyright 1999-2004 The Apache Software Foundation.
3   *
4   * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5   * use this file except in compliance with the License. You may obtain a copy of
6   * the License at
7   *
8   * http://www.apache.org/licenses/LICENSE-2.0
9   *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13  * License for the specific language governing permissions and limitations under
14  * the License.
15  *
16  */
17  package org.apache.sandesha.client;
18  
19  import org.apache.axis.AxisFault;
20  import org.apache.axis.Message;
21  import org.apache.axis.MessageContext;
22  import org.apache.axis.client.Call;
23  import org.apache.axis.components.logger.LogFactory;
24  import org.apache.axis.components.uuid.UUIDGen;
25  import org.apache.axis.components.uuid.UUIDGenFactory;
26  import org.apache.axis.handlers.BasicHandler;
27  import org.apache.axis.message.addressing.AddressingHeaders;
28  import org.apache.commons.logging.Log;
29  import org.apache.sandesha.Constants;
30  import org.apache.sandesha.IStorageManager;
31  import org.apache.sandesha.RMMessageContext;
32  import org.apache.sandesha.RMReport;
33  import org.apache.sandesha.util.PolicyLoader;
34  import org.apache.sandesha.util.RMMessageCreator;
35  import org.apache.sandesha.ws.rm.RMHeaders;
36  
37  /***
38   * In the client side of axis there is a flexibility of using custom sender to send SOAP messages.
39   * However axis's use of senders are mainly to handle transport related funtionalites.
40   * <code>RMSender</code>has to be used by the users who wish to use WS-ReliableMessaging capability
41   * in their clients.<P>
42   * The main funtionality of <code>RMSender</code> is to insert the messages coming from client and
43   * also the generated messages to the <code>SandeshaQueue</code>.
44   * If the message coming in from the client is request/response in nature then <code>RMSender</code>
45   * will wait polling <code>SandeshaQueue</code> till it gets an appropriate response.
46   * Due to the above reason, if the client is sending several messages
47   * (of request/response in nature) to be sent reliably, they will be sent reliably by
48   * <b>Sandesha</b> however the client will wait at each message till it gets the response, to send
49   * the next. To avoid this client can use callbacks provided by axis
50   * in <code>org.apache.axis.client.async</code> package.
51   */
52  public class RMSender extends BasicHandler {
53  
54      private IStorageManager storageManager;
55      private static final Log log = LogFactory.getLog(RMSender.class.getName());
56      private final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
57      private static Boolean lock = new Boolean(false);
58  
59      /***
60       * This is the main method that is invoked by the axis engine. This method will add the reqest
61       * messages from the client to <code>SandeshaQueue</code> with the generated messages such as
62       * Create Sequence message and Terminate Sequence message.
63       *
64       * @param msgContext
65       * @throws AxisFault
66       */
67  
68      public void invoke(MessageContext msgContext) throws AxisFault {
69  
70          storageManager = new ClientStorageManager();
71  
72          try {
73              RMMessageContext reqMsgCtx = null;
74              String tempSeqID = null;
75  
76              reqMsgCtx = getRMMessageContext(msgContext);
77  
78              tempSeqID = reqMsgCtx.getSequenceID();
79  
80              reqMsgCtx = processRequestMessage(reqMsgCtx, reqMsgCtx.getSync());
81  
82              if (reqMsgCtx.isHasResponse()) {
83                  RMMessageContext responseMessageContext = null;
84                  long startingTime = System.currentTimeMillis();
85                  long inactivityTimeOut = PolicyLoader.getInstance().getInactivityTimeout();
86  
87                  while (responseMessageContext == null) {
88                      synchronized (lock) {
89                          responseMessageContext =
90                                  checkTheQueueForResponse(tempSeqID, reqMsgCtx.getMessageID());
91                          if ((System.currentTimeMillis() - startingTime) >= inactivityTimeOut) {
92                              reqMsgCtx.getCtx().stopClientByForce();
93                          }
94                          Thread.sleep(Constants.CLIENT_RESPONSE_CHECKING_INTERVAL);
95                      }
96                  }
97  
98                  //setting RMReport;
99                  if (responseMessageContext != null) {
100                     String oldSeqId = reqMsgCtx.getOldSequenceID();
101                     if (oldSeqId != null) {
102                         Call call = (Call) reqMsgCtx.getCtx().getCallMap().get(reqMsgCtx.getOldSequenceID());
103 
104                         if (call != null) {
105                             RMReport report = (RMReport) call.getProperty(Constants.ClientProperties.REPORT);
106                             report.incrementReturnedMsgCount();
107                         }
108                     }
109                 }
110 
111                 //We need these steps to filter all addressing and rm related headers.
112                 Message resMsg = responseMessageContext.getMsgContext().getRequestMessage();
113                 RMHeaders.removeHeaders(resMsg.getSOAPEnvelope());
114                 AddressingHeaders addHeaders = new AddressingHeaders(resMsg.getSOAPEnvelope(),
115                         null, true, false, false, null);
116 
117                 msgContext.setResponseMessage(resMsg);
118             } else {
119                 msgContext.setResponseMessage(null);
120             }
121 
122         } catch (Exception ex) {
123             log.error(ex);
124 
125             throw new AxisFault(ex.getLocalizedMessage());
126 
127         }
128     }
129 
130     /***
131      * This method will process the first request message.
132      *
133      * @param reqRMMsgContext
134      * @param sync
135      * @return
136      * @throws Exception
137      */
138     private RMMessageContext processRequestMessage(RMMessageContext reqRMMsgContext,
139                                                    boolean sync) throws Exception {
140         synchronized (lock) {
141 
142             if (!storageManager.isSequenceExist(reqRMMsgContext.getSequenceID())) {
143                 String msgID = Constants.UUID + uuidGen.nextUUID();
144                 String offerID = null;
145                 if (reqRMMsgContext.isHasResponse() && reqRMMsgContext.isSendOffer()) {
146                     offerID = Constants.UUID + uuidGen.nextUUID();
147                     storageManager.addRequestedSequence(offerID);
148                     storageManager.addOffer(msgID, offerID);
149                 }
150 
151                 RMMessageContext createSeqRMMsgContext = RMMessageCreator.createCreateSeqMsg(reqRMMsgContext, Constants.CLIENT, msgID, offerID);
152                 storageManager.addOutgoingSequence(reqRMMsgContext.getSequenceID());
153                 storageManager.setTemporaryOutSequence(reqRMMsgContext.getSequenceID(),
154                         createSeqRMMsgContext.getMessageID());
155 
156                 createSeqRMMsgContext.setSync(sync);
157                 storageManager.addCreateSequenceRequest(createSeqRMMsgContext);
158                 processMessage(reqRMMsgContext);
159 
160             } else {
161                 processMessage(reqRMMsgContext);
162             }
163 
164         }
165 
166 
167         return reqRMMsgContext;
168     }
169 
170     private RMMessageContext processMessage(RMMessageContext reqRMMsgContext)
171             throws Exception {
172         if (reqRMMsgContext.isLastMessage()) {
173             storageManager.insertTerminateSeqMessage(RMMessageCreator.createTerminateSeqMsg(reqRMMsgContext, Constants.CLIENT));
174         }
175         RMMessageContext serviceRequestMsg = RMMessageCreator.createServiceRequestMessage(reqRMMsgContext);
176         storageManager.insertOutgoingMessage(serviceRequestMsg);
177         return reqRMMsgContext;
178     }
179 
180     private RMMessageContext checkTheQueueForResponse(String sequenceId, String reqMessageID) {
181         return storageManager.checkForResponseMessage(sequenceId, reqMessageID);
182     }
183 
184     private RMMessageContext getRMMessageContext(MessageContext msgCtx) throws Exception {
185         //Get a copy of the MessageContext. This is required when sending multiple messages from
186         //one call object
187         MessageContext newMsgContext = RMMessageCreator.cloneMsgContext(msgCtx);
188         RMMessageContext requestMesssageContext = new RMMessageContext();
189         Call call = (Call) newMsgContext.getProperty(MessageContext.CALL);
190 
191         requestMesssageContext = ClientPropertyValidator.validate(call);
192         requestMesssageContext.setOutGoingAddress((String) msgCtx.getProperty(MessageContext.TRANS_URL));
193         requestMesssageContext.setMsgContext(newMsgContext);
194         return requestMesssageContext;
195     }
196 
197 
198 }
199 
200 
201