1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.sandesha;
19
20 import org.apache.axis.AxisFault;
21 import org.apache.axis.SimpleChain;
22 import org.apache.axis.client.Call;
23 import org.apache.axis.components.logger.LogFactory;
24 import org.apache.commons.logging.Log;
25 import org.apache.sandesha.client.ClientHandlerUtil;
26 import org.apache.sandesha.client.ClientListener;
27 import org.apache.sandesha.client.ClientStorageManager;
28 import org.apache.sandesha.server.InvokeStrategy;
29 import org.apache.sandesha.server.InvokerFactory;
30 import org.apache.sandesha.server.Sender;
31 import org.apache.sandesha.server.ServerStorageManager;
32 import org.apache.sandesha.util.PolicyLoader;
33 import org.apache.sandesha.util.PropertyLoader;
34
35 import java.io.IOException;
36 import java.net.InetAddress;
37 import java.net.UnknownHostException;
38 import java.util.ArrayList;
39 import java.util.HashMap;
40 import java.util.Iterator;
41
42 /***
43 * SandeshaContext will keep track of different Call objects that the user may use inside
44 * a single client instance. SandeshaContext provides the user with an API to initialize and
45 * end sequences. With the "endSequence(Call call) method the user is provide with the option
46 * of accepting a RMReport which contains the overall status of the message transfer.
47 */
48 public class SandeshaContext {
49
50 private static final Log log = LogFactory.getLog(SandeshaContext.class.getName());
51
52 private static boolean rmInvokerStarted = false;
53 private static boolean cleintSenderStarted = false;
54 private static boolean serverSenderStarted = false;
55 private static boolean listenerStarted = false;
56 private static ClientListener clientListner = null;
57 private static Sender cleintSender;
58 private static Sender serverSender;
59 private static boolean insideServer;
60
61 private static HashMap seqMap = new HashMap();
62 private HashMap callMap = new HashMap();
63 private long key;
64
65 private String toURL;
66 private String sourceURL;
67 private String replyToURL;
68
69 private String faultToURL;
70 private String fromURL;
71 private String acksToURL;
72 private boolean sendOffer;
73 private long messageNumber;
74 private boolean sync;
75
76 private RMReport report;
77
78 public String getReplyToURL() {
79 return replyToURL;
80 }
81
82 public void setReplyToURL(String replyToURL) {
83 this.replyToURL = replyToURL;
84 }
85
86 public boolean isSync() {
87 return sync;
88 }
89
90 public void setSync(boolean sync) {
91 this.sync = sync;
92 }
93
94 public long getMessageNumber() {
95 return messageNumber;
96 }
97
98 public void setMessageNumber(long messageNumber) {
99 this.messageNumber = messageNumber;
100 }
101
102 public boolean isSendOffer() {
103 return sendOffer;
104 }
105
106 public void setSendOffer(boolean sendOffer) {
107 this.sendOffer = sendOffer;
108 }
109
110 public final String getAcksToURL() {
111 return acksToURL;
112 }
113
114 public void setAcksToURL(String acksToURL) {
115 this.acksToURL = acksToURL;
116 }
117
118 public String getFromURL() {
119 return fromURL;
120 }
121
122 public void setFromURL(String fromURL) {
123 this.fromURL = fromURL;
124 }
125
126 public final String getFaultURL() {
127 return faultToURL;
128 }
129
130 public void setFaultToURL(String faultURL) {
131 this.faultToURL = faultURL;
132 }
133
134 public String getSourceURL() {
135 return sourceURL;
136 }
137
138 public void setSourceURL(String sourceURL) {
139 this.sourceURL = sourceURL;
140 }
141
142 public String getToURL() {
143 return toURL;
144 }
145
146 public void setToURL(String toURL) {
147 this.toURL = toURL;
148 }
149
150 public SandeshaContext() throws AxisFault {
151 messageNumber = 0;
152 key = System.currentTimeMillis();
153 SandeshaContext.insideServer = false;
154 init(true);
155 startListener();
156 seqMap.put(new Long(key), this);
157 report = new RMReport();
158 }
159
160 public SandeshaContext(int sync) throws AxisFault {
161 this.sync = true;
162 messageNumber = 0;
163 key = System.currentTimeMillis();
164 SandeshaContext.insideServer = false;
165 init(true);
166 seqMap.put(new Long(key), this);
167 report = new RMReport();
168 }
169
170 public SandeshaContext(boolean insideServer) throws AxisFault {
171 messageNumber = 0;
172 key = System.currentTimeMillis();
173 SandeshaContext.insideServer = insideServer;
174 init(true);
175 seqMap.put(new Long(key), this);
176 report = new RMReport();
177 }
178
179 public SandeshaContext(boolean insideServer, int sync) throws AxisFault {
180 this.sync = true;
181 messageNumber = 0;
182 key = System.currentTimeMillis();
183 SandeshaContext.insideServer = insideServer;
184 init(true);
185 seqMap.put(new Long(key), this);
186 report = new RMReport();
187 }
188
189 public void initCall(Call call, String targetUrl, String action, short MEP) throws AxisFault {
190 if (toURL != null)
191 call.setProperty(Constants.ClientProperties.TO, toURL);
192 if (sourceURL != null)
193 call.setProperty(Constants.ClientProperties.SOURCE_URL, sourceURL);
194 if (faultToURL != null)
195 call.setProperty(Constants.ClientProperties.FAULT_TO, faultToURL);
196 if (fromURL != null)
197 call.setProperty(Constants.ClientProperties.FROM, fromURL);
198 if (replyToURL != null)
199 call.setProperty(Constants.ClientProperties.REPLY_TO, replyToURL);
200 if (acksToURL != null)
201 call.setProperty(Constants.ClientProperties.ACKS_TO, acksToURL);
202
203 call.setProperty(Constants.ClientProperties.SEND_OFFER, Boolean.valueOf(sendOffer));
204 call.setProperty(Constants.ClientProperties.SYNC, Boolean.valueOf(sync));
205 call.setProperty(Constants.CONTEXT, this);
206
207 String key = initialize(call, targetUrl, action, MEP);
208 callMap.put(key, call);
209 }
210
211 public final HashMap getCallMap() {
212 return callMap;
213 }
214
215 public void setCallMap(HashMap callMap) {
216 this.callMap = callMap;
217 }
218
219 public static IStorageManager init(boolean client) throws AxisFault {
220 if (client) {
221 IStorageManager storageManager = new ClientStorageManager();
222 if (!cleintSenderStarted) {
223 startClientSender(storageManager);
224 }
225 return storageManager;
226 } else {
227 if (!serverSenderStarted) {
228 startServerSender();
229 }
230 if (!rmInvokerStarted) {
231 InvokeStrategy strategy = null;
232 try {
233 strategy = InvokerFactory.getInstance().createInvokerStrategy();
234 } catch (Exception e) {
235 log.error(e);
236 throw new AxisFault("Could not start the Invoker.");
237 }
238 strategy.start();
239 rmInvokerStarted = true;
240 }
241 return new ServerStorageManager();
242 }
243 }
244
245 private static void startClientSender(IStorageManager storageManager) throws AxisFault {
246 if (log.isDebugEnabled()) {
247 log.debug(Constants.InfomationMessage.SENDER_STARTED);
248 }
249
250 cleintSender = new Sender(storageManager);
251 SimpleChain reqChain = null;
252 SimpleChain resChain = null;
253 try {
254 reqChain = getRequestChain();
255 resChain = getResponseChain();
256 } catch (Exception e) {
257 throw new AxisFault(e.getMessage());
258 }
259 if (reqChain != null)
260 cleintSender.setRequestChain(reqChain);
261 if (resChain != null)
262 cleintSender.setResponseChain(resChain);
263 cleintSender.startSender();
264 cleintSenderStarted = true;
265 }
266
267 private static void startServerSender() {
268 if (log.isDebugEnabled()) {
269 log.debug(Constants.InfomationMessage.SENDER_STARTED);
270 }
271 serverSender = new Sender();
272 serverSender.startSender();
273 serverSenderStarted = true;
274 }
275
276 private void validateProperties(Call call, String targetUrl, String action, short MEP)
277 throws AxisFault {
278 if (action == null)
279 throw new AxisFault("Please sepeicfy Action");
280 if (targetUrl == null)
281 throw new AxisFault("TargetUrl cannot be null");
282 if (call == null)
283 throw new AxisFault("Call cannot be null");
284 if (!(MEP == Constants.ClientProperties.IN_ONLY || MEP == Constants.ClientProperties.IN_OUT))
285 throw new AxisFault("Invalid MEP");
286 }
287
288 public final RMReport endSequence() throws AxisFault {
289
290 IStorageManager storageManager = new ClientStorageManager();
291 long startingTime = System.currentTimeMillis();
292 long inactivityTimeOut = PolicyLoader.getInstance().getInactivityTimeout();
293
294 Iterator ite = callMap.keySet().iterator();
295
296 while (ite.hasNext()) {
297 String key = (String) ite.next();
298 Call tempCall = (Call) callMap.get(key);
299 String seqId = (String) tempCall.getProperty(Constants.ClientProperties.CALL_KEY);
300 while (!storageManager.isSequenceComplete(seqId)) {
301 try {
302 if (log.isDebugEnabled()) {
303 log.debug(Constants.InfomationMessage.WAITING_TO_STOP_CLIENT);
304 }
305 Thread.sleep(Constants.CLIENT_WAIT_PERIOD_FOR_COMPLETE);
306 if ((System.currentTimeMillis() - startingTime) >= inactivityTimeOut) {
307 stopClientByForce();
308 this.report.setError("Inactivity Time Out Reached. Sequence not complete");
309 }
310 } catch (InterruptedException e) {
311 log.error(e);
312 }
313 }
314 }
315
316 if (this.report.getError() == null) {
317 this.report.setAllAcked(true);
318 }
319
320 seqMap.remove(new Long(key));
321 if (seqMap.isEmpty()) {
322 if (listenerStarted) {
323 clientListner.stop();
324 listenerStarted = false;
325 }
326 cleintSender.stop();
327 cleintSenderStarted = false;
328 storageManager.clearStorage();
329 }
330
331 return this.report;
332
333 }
334
335
336 public void stopClientByForce() throws AxisFault {
337 if (listenerStarted) {
338 clientListner.stop();
339 listenerStarted = false;
340 }
341 cleintSender.stop();
342 cleintSenderStarted = false;
343 throw new AxisFault("Inactivity Timeout Reached, No Response from the Server");
344 }
345
346 private String initialize(Call call, String targetUrl, String action, short MEP)
347 throws AxisFault {
348 validateProperties(call, targetUrl, action, MEP);
349 String keyOfCall = this.key + action;
350 call.setTargetEndpointAddress(targetUrl);
351 call.setProperty(Constants.ClientProperties.ACTION, action);
352 call.setTransport(new RMTransport(targetUrl, ""));
353 call.setProperty(Constants.ClientProperties.MEP, new Short(MEP));
354 call.setProperty(Constants.ClientProperties.CALL_KEY, keyOfCall);
355 call.setProperty(Constants.ClientProperties.REPORT, this.report);
356
357 if (!insideServer) {
358 InetAddress addr = null;
359 try {
360 addr = InetAddress.getLocalHost();
361 } catch (UnknownHostException e) {
362 log.error(e);
363 }
364
365 String sourceURL = null;
366
367 sourceURL = Constants.HTTP + Constants.COLON + Constants.SLASH +
368 Constants.SLASH + addr.getHostAddress() + Constants.COLON +
369 PropertyLoader.getClientSideListenerPort() + Constants.URL_RM_SERVICE;
370
371
372 call.setProperty(Constants.ClientProperties.SOURCE_URL, sourceURL);
373 }
374 return keyOfCall;
375 }
376
377
378 private static void startListener() {
379 if (!insideServer) {
380 if (!listenerStarted) {
381 listenerStarted = true;
382 try {
383 clientListner = new ClientListener(PropertyLoader.getClientSideListenerPort());
384 clientListner.start();
385 } catch (IOException e) {
386 log.error(e);
387 }
388 }
389 }
390
391 }
392
393
394 private static SimpleChain getRequestChain() {
395 ArrayList arr = PropertyLoader.getRequestHandlerNames();
396 return ClientHandlerUtil.getHandlerChain(arr);
397 }
398
399
400 private static SimpleChain getResponseChain() {
401 ArrayList arr = PropertyLoader.getResponseHandlerNames();
402 return ClientHandlerUtil.getHandlerChain(arr);
403 }
404
405
406 public void setLastMessage(Call call) {
407 call.setProperty(Constants.ClientProperties.LAST_MESSAGE, Boolean.valueOf(true));
408 }
409
410 public boolean isLastMessage(Call call) {
411 return ((Boolean) call.getProperty(Constants.ClientProperties.LAST_MESSAGE)).booleanValue();
412 }
413
414 public long getMessageNumber(Call call) {
415 return ((Long) call.getProperty(Constants.ClientProperties.MSG_NUMBER)).longValue();
416 }
417
418 public void setMessageNumber(Call call, long msgNumber) {
419 call.setProperty(Constants.ClientProperties.MSG_NUMBER, new Long(msgNumber));
420 }
421 }