View Javadoc

1   /*
2    * Copyright  1999-2004 The Apache Software Foundation.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  package org.apache.sandesha.server;
18  
19  import org.apache.axis.SimpleChain;
20  import org.apache.axis.components.threadpool.ThreadPool;
21  import org.apache.sandesha.Constants;
22  import org.apache.sandesha.IStorageManager;
23  import org.apache.sandesha.storage.Callback;
24  
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  
28  /***
29   * This is the sender for Sandesha for both the client and the server sides. Starting of the
30   * Sender will be done either by the RMProvider or the SandeshaContext. The job of the sender is to
31   * keep on monitoring the SandeshaQueue and send any messages that are scheduled to be sent.
32   *
33   * @author Chamikar Jayalath
34   * @author Jaliya Ekanayake
35   */
36  public class Sender {
37  
38      private ThreadPool tPool = new ThreadPool(Constants.SENDER_THREADS);
39      private ArrayList threadList = new ArrayList();
40  
41      public void startSender() {
42          running = true;
43          for (int i = 0; i < Constants.SENDER_THREADS; i++) {
44              SenderWorker senderWorker = new SenderWorker(this.storageManager);
45              senderWorker.setRequestChain(this.getRequestChain());
46              senderWorker.setResponseChain(this.getResponseChain());
47              senderWorker.setRunning(true);
48              SenderWorker.setCallback(callback);
49              threadList.add(senderWorker);
50              tPool.addWorker(senderWorker);
51          }
52      }
53  
54      public void stop() {
55          Iterator ite = threadList.iterator();
56          while (ite.hasNext()) {
57              SenderWorker sWorker = (SenderWorker) ite.next();
58              sWorker.setRunning(false);
59          }
60  
61          tPool.safeShutdown();
62          running = false;
63      }
64  
65      public static Callback callback;
66  
67      public boolean isRunning() {
68          return running;
69      }
70  
71      public void setRunning(boolean running) {
72          this.running = running;
73      }
74  
75      private boolean running;
76      private IStorageManager storageManager;
77  
78  
79      public static synchronized Callback getCallback() {
80          return callback;
81      }
82  
83      public static synchronized void setCallback(Callback cb) {
84          callback = cb;
85      }
86  
87      private SimpleChain requestChain = null;
88      private SimpleChain responseChain = null;
89  
90      public SimpleChain getRequestChain() {
91          return requestChain;
92      }
93  
94      public void setRequestChain(SimpleChain requestChain) {
95          this.requestChain = requestChain;
96      }
97  
98      public SimpleChain getResponseChain() {
99          return responseChain;
100     }
101 
102     public void setResponseChain(SimpleChain responseChanin) {
103         this.responseChain = responseChanin;
104     }
105 
106     public Sender() {
107         storageManager = new ServerStorageManager();
108     }
109 
110     public Sender(IStorageManager storageManager) {
111         this.storageManager = storageManager;
112     }
113 }