1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.sandesha.client;
18
19 import org.apache.axis.AxisFault;
20 import org.apache.axis.Message;
21 import org.apache.axis.MessageContext;
22 import org.apache.axis.client.Call;
23 import org.apache.axis.components.logger.LogFactory;
24 import org.apache.axis.components.uuid.UUIDGen;
25 import org.apache.axis.components.uuid.UUIDGenFactory;
26 import org.apache.axis.handlers.BasicHandler;
27 import org.apache.axis.message.addressing.AddressingHeaders;
28 import org.apache.commons.logging.Log;
29 import org.apache.sandesha.Constants;
30 import org.apache.sandesha.IStorageManager;
31 import org.apache.sandesha.RMMessageContext;
32 import org.apache.sandesha.RMReport;
33 import org.apache.sandesha.util.PolicyLoader;
34 import org.apache.sandesha.util.RMMessageCreator;
35 import org.apache.sandesha.ws.rm.RMHeaders;
36
37 /***
38 * In the client side of axis there is a flexibility of using custom sender to send SOAP messages.
39 * However axis's use of senders are mainly to handle transport related funtionalites.
40 * <code>RMSender</code>has to be used by the users who wish to use WS-ReliableMessaging capability
41 * in their clients.<P>
42 * The main funtionality of <code>RMSender</code> is to insert the messages coming from client and
43 * also the generated messages to the <code>SandeshaQueue</code>.
44 * If the message coming in from the client is request/response in nature then <code>RMSender</code>
45 * will wait polling <code>SandeshaQueue</code> till it gets an appropriate response.
46 * Due to the above reason, if the client is sending several messages
47 * (of request/response in nature) to be sent reliably, they will be sent reliably by
48 * <b>Sandesha</b> however the client will wait at each message till it gets the response, to send
49 * the next. To avoid this client can use callbacks provided by axis
50 * in <code>org.apache.axis.client.async</code> package.
51 */
52 public class RMSender extends BasicHandler {
53
54 private IStorageManager storageManager;
55 private static final Log log = LogFactory.getLog(RMSender.class.getName());
56 private final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
57 private static Boolean lock = new Boolean(false);
58
59 /***
60 * This is the main method that is invoked by the axis engine. This method will add the reqest
61 * messages from the client to <code>SandeshaQueue</code> with the generated messages such as
62 * Create Sequence message and Terminate Sequence message.
63 *
64 * @param msgContext
65 * @throws AxisFault
66 */
67
68 public void invoke(MessageContext msgContext) throws AxisFault {
69
70 storageManager = new ClientStorageManager();
71
72 try {
73 RMMessageContext reqMsgCtx = null;
74 String tempSeqID = null;
75
76 reqMsgCtx = getRMMessageContext(msgContext);
77
78 tempSeqID = reqMsgCtx.getSequenceID();
79
80 reqMsgCtx = processRequestMessage(reqMsgCtx, reqMsgCtx.getSync());
81
82 if (reqMsgCtx.isHasResponse()) {
83 RMMessageContext responseMessageContext = null;
84 long startingTime = System.currentTimeMillis();
85 long inactivityTimeOut = PolicyLoader.getInstance().getInactivityTimeout();
86
87 while (responseMessageContext == null) {
88 synchronized (lock) {
89 responseMessageContext =
90 checkTheQueueForResponse(tempSeqID, reqMsgCtx.getMessageID());
91 if ((System.currentTimeMillis() - startingTime) >= inactivityTimeOut) {
92 reqMsgCtx.getCtx().stopClientByForce();
93 }
94 Thread.sleep(Constants.CLIENT_RESPONSE_CHECKING_INTERVAL);
95 }
96 }
97
98
99 if (responseMessageContext != null) {
100 String oldSeqId = reqMsgCtx.getOldSequenceID();
101 if (oldSeqId != null) {
102 Call call = (Call) reqMsgCtx.getCtx().getCallMap().get(reqMsgCtx.getOldSequenceID());
103
104 if (call != null) {
105 RMReport report = (RMReport) call.getProperty(Constants.ClientProperties.REPORT);
106 report.incrementReturnedMsgCount();
107 }
108 }
109 }
110
111
112 Message resMsg = responseMessageContext.getMsgContext().getRequestMessage();
113 RMHeaders.removeHeaders(resMsg.getSOAPEnvelope());
114 AddressingHeaders addHeaders = new AddressingHeaders(resMsg.getSOAPEnvelope(),
115 null, true, false, false, null);
116
117 msgContext.setResponseMessage(resMsg);
118 } else {
119 msgContext.setResponseMessage(null);
120 }
121
122 } catch (Exception ex) {
123 log.error(ex);
124
125 throw new AxisFault(ex.getLocalizedMessage());
126
127 }
128 }
129
130 /***
131 * This method will process the first request message.
132 *
133 * @param reqRMMsgContext
134 * @param sync
135 * @return
136 * @throws Exception
137 */
138 private RMMessageContext processRequestMessage(RMMessageContext reqRMMsgContext,
139 boolean sync) throws Exception {
140 synchronized (lock) {
141
142 if (!storageManager.isSequenceExist(reqRMMsgContext.getSequenceID())) {
143 String msgID = Constants.UUID + uuidGen.nextUUID();
144 String offerID = null;
145 if (reqRMMsgContext.isHasResponse() && reqRMMsgContext.isSendOffer()) {
146 offerID = Constants.UUID + uuidGen.nextUUID();
147 storageManager.addRequestedSequence(offerID);
148 storageManager.addOffer(msgID, offerID);
149 }
150
151 RMMessageContext createSeqRMMsgContext = RMMessageCreator.createCreateSeqMsg(reqRMMsgContext, Constants.CLIENT, msgID, offerID);
152 storageManager.addOutgoingSequence(reqRMMsgContext.getSequenceID());
153 storageManager.setTemporaryOutSequence(reqRMMsgContext.getSequenceID(),
154 createSeqRMMsgContext.getMessageID());
155
156 createSeqRMMsgContext.setSync(sync);
157 storageManager.addCreateSequenceRequest(createSeqRMMsgContext);
158 processMessage(reqRMMsgContext);
159
160 } else {
161 processMessage(reqRMMsgContext);
162 }
163
164 }
165
166
167 return reqRMMsgContext;
168 }
169
170 private RMMessageContext processMessage(RMMessageContext reqRMMsgContext)
171 throws Exception {
172 if (reqRMMsgContext.isLastMessage()) {
173 storageManager.insertTerminateSeqMessage(RMMessageCreator.createTerminateSeqMsg(reqRMMsgContext, Constants.CLIENT));
174 }
175 RMMessageContext serviceRequestMsg = RMMessageCreator.createServiceRequestMessage(reqRMMsgContext);
176 storageManager.insertOutgoingMessage(serviceRequestMsg);
177 return reqRMMsgContext;
178 }
179
180 private RMMessageContext checkTheQueueForResponse(String sequenceId, String reqMessageID) {
181 return storageManager.checkForResponseMessage(sequenceId, reqMessageID);
182 }
183
184 private RMMessageContext getRMMessageContext(MessageContext msgCtx) throws Exception {
185
186
187 MessageContext newMsgContext = RMMessageCreator.cloneMsgContext(msgCtx);
188 RMMessageContext requestMesssageContext = new RMMessageContext();
189 Call call = (Call) newMsgContext.getProperty(MessageContext.CALL);
190
191 requestMesssageContext = ClientPropertyValidator.validate(call);
192 requestMesssageContext.setOutGoingAddress((String) msgCtx.getProperty(MessageContext.TRANS_URL));
193 requestMesssageContext.setMsgContext(newMsgContext);
194 return requestMesssageContext;
195 }
196
197
198 }
199
200
201