1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.sandesha.server.msgprocessors;
18
19 import org.apache.axis.AxisFault;
20 import org.apache.axis.Message;
21 import org.apache.axis.MessageContext;
22 import org.apache.axis.components.logger.LogFactory;
23 import org.apache.commons.logging.Log;
24 import org.apache.sandesha.Constants;
25 import org.apache.sandesha.EnvelopeCreator;
26 import org.apache.sandesha.IStorageManager;
27 import org.apache.sandesha.RMMessageContext;
28 import org.apache.sandesha.storage.dao.SandeshaQueueDAO;
29 import org.apache.sandesha.ws.rm.AcknowledgementRange;
30 import org.apache.sandesha.ws.rm.SequenceAcknowledgement;
31
32 import javax.xml.soap.SOAPEnvelope;
33 import java.util.ArrayList;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37
38 /***
39 * Processor for the acknowledgements. This will handle both processing of acknowledgements
40 * and sending acknowledgements. Sending part is required as the user wants synchronous
41 * acknowldgements to be sent from the server.
42 *
43 * @author Jaliya Eknayake
44 */
45 public final class AcknowledgementProcessor implements IRMMessageProcessor {
46 private IStorageManager storageManager = null;
47 private static final Log log = LogFactory.getLog(SandeshaQueueDAO.class.getName());
48
49 public AcknowledgementProcessor(IStorageManager storageManager) {
50 this.storageManager = storageManager;
51 }
52
53 public final boolean processMessage(RMMessageContext rmMessageContext) throws AxisFault {
54 SequenceAcknowledgement seqAcknowledgement = rmMessageContext.getRMHeaders()
55 .getSequenceAcknowledgement();
56 String seqID = seqAcknowledgement.getIdentifier().getIdentifier();
57 List ackRanges = seqAcknowledgement.getAckRanges();
58 Iterator ite = ackRanges.iterator();
59
60 while (ite.hasNext()) {
61 AcknowledgementRange ackRange = (AcknowledgementRange) ite.next();
62 long msgNumber = ackRange.getMinValue();
63 while (ackRange.getMaxValue() >= msgNumber) {
64 if (!storageManager.isSentMsg(seqID, msgNumber)) {
65 throw new AxisFault(new javax.xml.namespace.QName(Constants.FaultCodes.WSRM_FAULT_INVALID_ACKNOWLEDGEMENT),
66 Constants.FaultMessages.INVALID_ACKNOWLEDGEMENT, null, null);
67 }
68 storageManager.setAckReceived(seqID, msgNumber);
69 storageManager.setAcknowledged(seqID, msgNumber);
70 msgNumber++;
71 }
72 }
73 return false;
74 }
75
76
77 public boolean sendAcknowledgement(RMMessageContext rmMessageContext) throws AxisFault {
78 String seqID = rmMessageContext.getSequenceID();
79
80 long messageNumber = rmMessageContext.getRMHeaders().getSequence().getMessageNumber()
81 .getMessageNumber();
82 String seq = storageManager.getOutgoingSeqenceIdOfIncomingMsg(rmMessageContext);
83 Map listOfMsgNumbers = storageManager.getListOfMessageNumbers(seq);
84
85 ArrayList ackRangeList = null;
86 if (listOfMsgNumbers != null) {
87 ackRangeList = getAckRangesVector(listOfMsgNumbers);
88 } else {
89 ackRangeList = new ArrayList();
90 AcknowledgementRange ackRange = new AcknowledgementRange();
91 ackRange.setMaxValue(messageNumber);
92 ackRange.setMinValue(messageNumber);
93 ackRangeList.add(ackRange);
94 }
95 RMMessageContext rmMsgContext = getAckRMMsgCtx(rmMessageContext, ackRangeList);
96
97 if (true ==
98 (storageManager.getAcksTo(seqID).equals(Constants.WSA.NS_ADDRESSING_ANONYMOUS))) {
99 try {
100 String soapMsg = rmMsgContext.getMsgContext().getResponseMessage().getSOAPEnvelope()
101 .toString();
102 rmMessageContext.getMsgContext().setResponseMessage(new Message(soapMsg));
103 } catch (AxisFault af) {
104 af.setFaultCodeAsString(Constants.FaultCodes.WSRM_SERVER_INTERNAL_ERROR);
105 throw af;
106 }
107 return true;
108 } else {
109
110 storageManager.addAcknowledgement(rmMsgContext);
111 return false;
112 }
113 }
114
115 private RMMessageContext getAckRMMsgCtx(RMMessageContext rmMessageContext,
116 List ackRangeList) {
117 RMMessageContext rmMsgContext = new RMMessageContext();
118 try {
119
120 String to = storageManager.getAcksTo(rmMessageContext.getRMHeaders().getSequence().getIdentifier().getIdentifier());
121
122 SOAPEnvelope ackEnvelope = EnvelopeCreator.createAcknowledgementEnvelope(rmMessageContext, to, ackRangeList);
123
124 Message resMsg = new Message(ackEnvelope);
125 MessageContext msgContext = new MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
126 rmMessageContext.copyContents(rmMsgContext);
127 msgContext.setResponseMessage(resMsg);
128 rmMsgContext.setMsgContext(msgContext);
129
130
131
132 rmMsgContext.setOutGoingAddress(to);
133 rmMsgContext.setMessageType(Constants.MSG_TYPE_ACKNOWLEDGEMENT);
134 } catch (Exception e) {
135 log.error(e);
136 }
137 return rmMsgContext;
138 }
139
140 /***
141 * This method will split the input map with messages numbers to respectable
142 * message ranges and will return a vector of AcknowledgementRange.
143 *
144 * @param listOfMsgNumbers
145 * @return
146 */
147 private ArrayList getAckRangesVector(Map listOfMsgNumbers) {
148 long min;
149 long max;
150 long size = listOfMsgNumbers.size();
151 ArrayList list = new ArrayList();
152 boolean found = false;
153
154 min = ((Long) listOfMsgNumbers.get(new Long(1))).longValue();
155 max = min;
156
157 if (size > 1) {
158 for (long i = 1; i <= size; i++) {
159
160 if (i + 1 > size) {
161 found = true;
162 max = ((Long) listOfMsgNumbers.get(new Long(i))).longValue();
163 } else {
164
165 if (1 == (((Long) listOfMsgNumbers.get(new Long(i + 1))).longValue() - ((Long) listOfMsgNumbers.get(new Long(i))).longValue())) {
166 max = ((Long) listOfMsgNumbers.get(new Long(i + 1))).longValue();
167 found = true;
168 } else {
169 found = false;
170 max = ((Long) listOfMsgNumbers.get(new Long(i))).longValue();
171 AcknowledgementRange ackRange = new AcknowledgementRange();
172 ackRange.setMaxValue(max);
173 ackRange.setMinValue(min);
174 list.add(ackRange);
175
176 min = ((Long) listOfMsgNumbers.get(new Long(i + 1))).longValue();
177 }
178
179 }
180 }
181 if (found) {
182 AcknowledgementRange ackRange = new AcknowledgementRange();
183 ackRange.setMaxValue(max);
184 ackRange.setMinValue(min);
185 list.add(ackRange);
186 }
187 } else {
188 AcknowledgementRange ackRange = new AcknowledgementRange();
189 ackRange.setMaxValue(max);
190 ackRange.setMinValue(min);
191 list.add(ackRange);
192 }
193 return list;
194 }
195 }