View Javadoc

1   /*
2    * Copyright  1999-2004 The Apache Software Foundation.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  package org.apache.sandesha.server.msgprocessors;
18  
19  import org.apache.axis.AxisFault;
20  import org.apache.axis.Message;
21  import org.apache.axis.MessageContext;
22  import org.apache.axis.components.logger.LogFactory;
23  import org.apache.commons.logging.Log;
24  import org.apache.sandesha.Constants;
25  import org.apache.sandesha.EnvelopeCreator;
26  import org.apache.sandesha.IStorageManager;
27  import org.apache.sandesha.RMMessageContext;
28  import org.apache.sandesha.storage.dao.SandeshaQueueDAO;
29  import org.apache.sandesha.ws.rm.AcknowledgementRange;
30  import org.apache.sandesha.ws.rm.SequenceAcknowledgement;
31  
32  import javax.xml.soap.SOAPEnvelope;
33  import java.util.ArrayList;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  
38  /***
39   * Processor for the acknowledgements. This will handle both processing of acknowledgements
40   * and sending acknowledgements. Sending part is required as the user wants synchronous
41   * acknowldgements to be sent from the server.
42   *
43   * @author Jaliya Eknayake
44   */
45  public final class AcknowledgementProcessor implements IRMMessageProcessor {
46      private IStorageManager storageManager = null;
47      private static final Log log = LogFactory.getLog(SandeshaQueueDAO.class.getName());
48  
49      public AcknowledgementProcessor(IStorageManager storageManager) {
50          this.storageManager = storageManager;
51      }
52  
53      public final boolean processMessage(RMMessageContext rmMessageContext) throws AxisFault {
54          SequenceAcknowledgement seqAcknowledgement = rmMessageContext.getRMHeaders()
55                  .getSequenceAcknowledgement();
56          String seqID = seqAcknowledgement.getIdentifier().getIdentifier();
57          List ackRanges = seqAcknowledgement.getAckRanges();
58          Iterator ite = ackRanges.iterator();
59  
60          while (ite.hasNext()) {
61              AcknowledgementRange ackRange = (AcknowledgementRange) ite.next();
62              long msgNumber = ackRange.getMinValue();
63              while (ackRange.getMaxValue() >= msgNumber) {
64                  if (!storageManager.isSentMsg(seqID, msgNumber)) {
65                      throw new AxisFault(new javax.xml.namespace.QName(Constants.FaultCodes.WSRM_FAULT_INVALID_ACKNOWLEDGEMENT),
66                              Constants.FaultMessages.INVALID_ACKNOWLEDGEMENT, null, null);
67                  }
68                  storageManager.setAckReceived(seqID, msgNumber);
69                  storageManager.setAcknowledged(seqID, msgNumber);
70                  msgNumber++;
71              }
72          }
73          return false;
74      }
75  
76  
77      public boolean sendAcknowledgement(RMMessageContext rmMessageContext) throws AxisFault {
78          String seqID = rmMessageContext.getSequenceID();
79  
80          long messageNumber = rmMessageContext.getRMHeaders().getSequence().getMessageNumber()
81                  .getMessageNumber();
82          String seq = storageManager.getOutgoingSeqenceIdOfIncomingMsg(rmMessageContext);
83          Map listOfMsgNumbers = storageManager.getListOfMessageNumbers(seq);
84  
85          ArrayList ackRangeList = null;
86          if (listOfMsgNumbers != null) {
87              ackRangeList = getAckRangesVector(listOfMsgNumbers);
88          } else {
89              ackRangeList = new ArrayList();
90              AcknowledgementRange ackRange = new AcknowledgementRange();
91              ackRange.setMaxValue(messageNumber);
92              ackRange.setMinValue(messageNumber);
93              ackRangeList.add(ackRange);
94          }
95          RMMessageContext rmMsgContext = getAckRMMsgCtx(rmMessageContext, ackRangeList);
96  
97          if (true ==
98                  (storageManager.getAcksTo(seqID).equals(Constants.WSA.NS_ADDRESSING_ANONYMOUS))) {
99              try {
100                 String soapMsg = rmMsgContext.getMsgContext().getResponseMessage().getSOAPEnvelope()
101                         .toString();
102                 rmMessageContext.getMsgContext().setResponseMessage(new Message(soapMsg));
103             } catch (AxisFault af) {
104                 af.setFaultCodeAsString(Constants.FaultCodes.WSRM_SERVER_INTERNAL_ERROR);
105                 throw af;
106             }
107             return true;
108         } else {
109             //Store the asynchronize ack in the queue. The name for this queue is not yet fixed.
110             storageManager.addAcknowledgement(rmMsgContext);
111             return false;
112         }
113     }
114 
115     private RMMessageContext getAckRMMsgCtx(RMMessageContext rmMessageContext,
116                                             List ackRangeList) {
117         RMMessageContext rmMsgContext = new RMMessageContext();
118         try {
119 
120             String to = storageManager.getAcksTo(rmMessageContext.getRMHeaders().getSequence().getIdentifier().getIdentifier());
121 
122             SOAPEnvelope ackEnvelope = EnvelopeCreator.createAcknowledgementEnvelope(rmMessageContext, to, ackRangeList);
123 
124             Message resMsg = new Message(ackEnvelope);
125             MessageContext msgContext = new MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
126             rmMessageContext.copyContents(rmMsgContext);
127             msgContext.setResponseMessage(resMsg);
128             rmMsgContext.setMsgContext(msgContext);
129 
130             //Get the from address to send the Ack. Doesn't matter whether we have Sync or
131             // ASync messages. If we have Sync them this property is not used.
132             rmMsgContext.setOutGoingAddress(to);
133             rmMsgContext.setMessageType(Constants.MSG_TYPE_ACKNOWLEDGEMENT);
134         } catch (Exception e) {
135             log.error(e);
136         }
137         return rmMsgContext;
138     }
139 
140     /***
141      * This method will split the input map with messages numbers to respectable
142      * message ranges and will return a vector of AcknowledgementRange.
143      *
144      * @param listOfMsgNumbers
145      * @return
146      */
147     private ArrayList getAckRangesVector(Map listOfMsgNumbers) {
148         long min;
149         long max;
150         long size = listOfMsgNumbers.size();
151         ArrayList list = new ArrayList();
152         boolean found = false;
153 
154         min = ((Long) listOfMsgNumbers.get(new Long(1))).longValue();
155         max = min;
156 
157         if (size > 1) {
158             for (long i = 1; i <= size; i++) {
159 
160                 if (i + 1 > size) {
161                     found = true;
162                     max = ((Long) listOfMsgNumbers.get(new Long(i))).longValue();
163                 } else {
164 
165                     if (1 == (((Long) listOfMsgNumbers.get(new Long(i + 1))).longValue() - ((Long) listOfMsgNumbers.get(new Long(i))).longValue())) {
166                         max = ((Long) listOfMsgNumbers.get(new Long(i + 1))).longValue();
167                         found = true;
168                     } else {
169                         found = false;
170                         max = ((Long) listOfMsgNumbers.get(new Long(i))).longValue();
171                         AcknowledgementRange ackRange = new AcknowledgementRange();
172                         ackRange.setMaxValue(max);
173                         ackRange.setMinValue(min);
174                         list.add(ackRange);
175 
176                         min = ((Long) listOfMsgNumbers.get(new Long(i + 1))).longValue();
177                     }
178 
179                 }
180             }
181             if (found) {
182                 AcknowledgementRange ackRange = new AcknowledgementRange();
183                 ackRange.setMaxValue(max);
184                 ackRange.setMinValue(min);
185                 list.add(ackRange);
186             }
187         } else {
188             AcknowledgementRange ackRange = new AcknowledgementRange();
189             ackRange.setMaxValue(max);
190             ackRange.setMinValue(min);
191             list.add(ackRange);
192         }
193         return list;
194     }
195 }