Skip to content

Commit b6311b2

Browse files
committed
Merge with jhberges/stompjms branch containing the heartbeat implementation
See fusesource#18 Usage with -Dstompjms.heartbeat=X,Y
2 parents 28fafb2 + 645b3f8 commit b6311b2

File tree

4 files changed

+165
-15
lines changed

4 files changed

+165
-15
lines changed

stompjms-activemq-test/src/test/java/org/fusesource/stomp/activemq/ActiveMQJmsStompTest.java

Lines changed: 100 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,37 @@
99
*/
1010
package org.fusesource.stomp.activemq;
1111

12-
import junit.framework.TestCase;
12+
import javax.jms.Connection;
13+
import javax.jms.ConnectionFactory;
14+
import javax.jms.Destination;
15+
import javax.jms.JMSException;
16+
import javax.jms.Message;
17+
import javax.jms.MessageConsumer;
18+
import javax.jms.MessageProducer;
19+
import javax.jms.Session;
20+
import javax.jms.TextMessage;
21+
1322
import org.apache.activemq.broker.BrokerService;
1423
import org.apache.activemq.broker.TransportConnector;
1524
import org.fusesource.stomp.jms.StompJmsConnectionFactory;
16-
17-
import javax.jms.*;
25+
import org.junit.After;
26+
import org.junit.Assert;
27+
import org.junit.Before;
28+
import org.junit.Ignore;
29+
import org.junit.Test;
1830

1931
/**
2032
* <p>
2133
* </p>
2234
*
2335
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
2436
*/
25-
public class ActiveMQJmsStompTest extends TestCase {
37+
public class ActiveMQJmsStompTest {
2638
BrokerService broker;
2739
int port;
2840

29-
@Override
30-
protected void setUp() throws Exception {
31-
super.setUp();
41+
@Before
42+
public void setUp() throws Exception {
3243
broker = new BrokerService();
3344
broker.setPersistent(false);
3445
TransportConnector connector = broker.addConnector("stomp://0.0.0.0:0");
@@ -37,11 +48,10 @@ protected void setUp() throws Exception {
3748
port = connector.getConnectUri().getPort();
3849
}
3950

40-
@Override
41-
protected void tearDown() throws Exception {
51+
@After
52+
public void tearDown() throws Exception {
4253
broker.stop();
4354
broker.waitUntilStopped();
44-
super.tearDown();
4555
}
4656

4757
protected ConnectionFactory createConnectionFactory() throws Exception {
@@ -50,6 +60,7 @@ protected ConnectionFactory createConnectionFactory() throws Exception {
5060
return result;
5161
}
5262

63+
@Test
5364
public void testDurableSubs() throws Exception {
5465
Connection connection1 = createConnectionFactory().createConnection();
5566
connection1.setClientID("client1");
@@ -113,9 +124,85 @@ public void testQueueSendReceiveSingleConnection() throws Exception {
113124
connection1.close();
114125
}
115126

116-
private void assertTextMessageReceived(String expected, MessageConsumer sub) throws JMSException {
127+
@Test
128+
@Ignore("Test added as support for AMQ-4493 - chained request/replies over ActiveMQ. The test *will* fail as of 2013-08-21")
129+
public void testChainedRequestReply() throws Exception {
130+
final String firstTopic = "mytopic";
131+
final String secondTopic = "secondtopic";
132+
// First block sending "1" and expecting "123" as reply.
133+
Connection connectionSource = createConnectionFactory().createConnection();
134+
connectionSource.start();
135+
Session sessionSource = connectionSource.createSession(false, Session.CLIENT_ACKNOWLEDGE);
136+
MessageProducer producer = sessionSource.createProducer(sessionSource.createTopic(firstTopic));
137+
138+
Destination replyToProducer = sessionSource.createTemporaryQueue();
139+
MessageConsumer sourceReplySubscription = sessionSource.createConsumer(replyToProducer);
140+
141+
// Second block receiving "1" and using "mytopic" to append "3" after it's own "2"
142+
final Connection connectionIntermediate = createConnectionFactory().createConnection();
143+
connectionIntermediate.start();
144+
final Session sessionIntermediate = connectionIntermediate.createSession(false, Session.CLIENT_ACKNOWLEDGE);
145+
final MessageConsumer consumerIntermediate = sessionIntermediate.createConsumer(sessionIntermediate.createTopic(firstTopic));
146+
Thread intermediateThread = new Thread(new Runnable() {
147+
148+
public void run() {
149+
try {
150+
Message message = consumerIntermediate.receive();
151+
System.out.println("Producer -> Intermediate: " + message);
152+
Destination replyToIntermediate = sessionIntermediate.createTemporaryQueue();
153+
MessageProducer intermediateProducer = sessionIntermediate.createProducer(sessionIntermediate.createTopic(secondTopic));
154+
TextMessage secondMessage = sessionIntermediate.createTextMessage(((TextMessage) message).getText() + "2");
155+
intermediateProducer.send(secondMessage);
156+
157+
MessageConsumer intermediateReplyConsumer = sessionIntermediate.createConsumer(replyToIntermediate);
158+
Message finalResponse = intermediateReplyConsumer.receive();
159+
MessageProducer replyProducer = sessionIntermediate.createProducer(message.getJMSDestination());
160+
161+
System.out.println("Intermediate -> Producer: " + finalResponse);
162+
replyProducer.send(finalResponse);
163+
} catch (JMSException e) {
164+
Assert.fail(e.getMessage());
165+
}
166+
}});
167+
intermediateThread.setDaemon(true);
168+
intermediateThread.start();
169+
170+
// Final block receiving "12" appending "3" and replying
171+
final Connection connectionFinal = createConnectionFactory().createConnection();
172+
connectionIntermediate.start();
173+
final Session sessionFinal = connectionIntermediate.createSession(false, Session.CLIENT_ACKNOWLEDGE);
174+
final MessageConsumer consumerFinal = sessionIntermediate.createConsumer(sessionIntermediate.createTopic(secondTopic));
175+
Thread finalThread = new Thread(new Runnable() {
176+
177+
public void run() {
178+
try {
179+
Message message = consumerFinal.receive();
180+
System.out.println("Intermediate -> Final: " + message);
181+
TextMessage finalResponse = sessionFinal.createTextMessage(((TextMessage) message).getText() + "3");
182+
183+
MessageProducer replyProducer = sessionFinal.createProducer(message.getJMSDestination());
184+
System.out.println("Final -> Intermediate: " + finalResponse);
185+
replyProducer.send(finalResponse);
186+
} catch (JMSException e) {
187+
Assert.fail(e.getMessage());
188+
}
189+
}});
190+
finalThread.setDaemon(true);
191+
finalThread.start();
192+
193+
TextMessage sourceTextMessage = sessionSource.createTextMessage("1");
194+
sourceTextMessage.setJMSReplyTo(replyToProducer);
195+
producer.send(sourceTextMessage);
196+
197+
assertTextMessageReceived("123", sourceReplySubscription);
198+
connectionSource.close();
199+
connectionIntermediate.close();
200+
connectionFinal.close();
201+
}
202+
203+
private void assertTextMessageReceived(final String expected, final MessageConsumer sub) throws JMSException {
117204
Message msg = sub.receive(1000*5);
118-
assertNotNull("A message was not received.", msg);
119-
assertEquals(expected, ((TextMessage)msg).getText());
205+
Assert.assertNotNull("A message was not received.", msg);
206+
Assert.assertEquals(expected, ((TextMessage)msg).getText());
120207
}
121208
}

stompjms-client/src/main/java/org/fusesource/stomp/client/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public interface Constants {
6464
final AsciiBuffer CONTENT_TYPE = ascii("content-type");
6565
final AsciiBuffer TRANSFORMATION = ascii("transformation");
6666
final AsciiBuffer TRANSFORMATION_ERROR = ascii("transformation-error");
67+
final AsciiBuffer HEARTBEAT = ascii("heart-beat");
6768

6869
/**
6970
* This header is used to instruct ActiveMQ to construct the message

stompjms-client/src/main/java/org/fusesource/stomp/client/Stomp.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class Stomp {
4343

4444
private static final long KEEP_ALIVE = Long.parseLong(System.getProperty("stompjms.thread.keep_alive", ""+1000));
4545
private static final long STACK_SIZE = Long.parseLong(System.getProperty("stompjms.thread.stack_size", ""+1024*512));
46+
public static final String HEARTBEAT_INTERVAL = System.getProperty("stompjms.heartbeat", "0,0");
4647

4748
private static ThreadPoolExecutor blockingThreadPool;
4849
public synchronized static ThreadPoolExecutor getBlockingThreadPool() {
@@ -161,6 +162,7 @@ public void onTransportConnected() {
161162
if (clientId != null) {
162163
frame.addHeader(CLIENT_ID, StompFrame.encodeHeader(clientId));
163164
}
165+
frame.addHeader(HEARTBEAT, StompFrame.encodeHeader(HEARTBEAT_INTERVAL));
164166
if( customHeaders!=null ) {
165167
for (Object key : customHeaders.keySet()) {
166168
frame.addHeader(StompFrame.encodeHeader(key.toString()), StompFrame.encodeHeader(customHeaders.get(key).toString()));

stompjms-client/src/main/java/org/fusesource/stomp/jms/StompChannel.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,16 @@
1010

1111
package org.fusesource.stomp.jms;
1212

13+
import org.fusesource.hawtbuf.AsciiBuffer;
14+
import org.fusesource.hawtdispatch.Task;
15+
import org.fusesource.stomp.client.Callback;
16+
import org.fusesource.stomp.client.CallbackConnection;
17+
import org.fusesource.stomp.client.Promise;
18+
import org.fusesource.stomp.client.ProtocolException;
19+
import org.fusesource.stomp.client.Stomp;
20+
import org.fusesource.stomp.codec.StompFrame;
21+
import org.fusesource.stomp.jms.message.StompJmsMessage;
22+
import org.fusesource.stomp.jms.util.StompTranslator;
1323
import static org.fusesource.hawtdispatch.Dispatch.NOOP;
1424
import static org.fusesource.stomp.client.Constants.ABORT;
1525
import static org.fusesource.stomp.client.Constants.ACK;
@@ -23,6 +33,7 @@
2333
import static org.fusesource.stomp.client.Constants.ID;
2434
import static org.fusesource.stomp.client.Constants.MESSAGE;
2535
import static org.fusesource.stomp.client.Constants.MESSAGE_ID;
36+
import static org.fusesource.stomp.client.Constants.NEWLINE;
2637
import static org.fusesource.stomp.client.Constants.SELECTOR;
2738
import static org.fusesource.stomp.client.Constants.SEND;
2839
import static org.fusesource.stomp.client.Constants.SERVER;
@@ -31,11 +42,18 @@
3142
import static org.fusesource.stomp.client.Constants.SUBSCRIPTION;
3243
import static org.fusesource.stomp.client.Constants.TRANSACTION;
3344

45+
import javax.jms.ExceptionListener;
46+
import javax.jms.JMSException;
47+
import javax.net.ssl.SSLContext;
48+
3449
import java.io.IOException;
3550
import java.net.URI;
3651
import java.util.Map;
3752
import java.util.UUID;
53+
import java.util.concurrent.Callable;
3854
import java.util.concurrent.CountDownLatch;
55+
import java.util.concurrent.ScheduledFuture;
56+
import java.util.concurrent.ScheduledThreadPoolExecutor;
3957
import java.util.concurrent.TimeUnit;
4058
import java.util.concurrent.atomic.AtomicBoolean;
4159
import java.util.concurrent.atomic.AtomicInteger;
@@ -84,6 +102,7 @@ public class StompChannel {
84102
StompServerAdaptor serverAdaptor;
85103
String clientId;
86104
private long disconnectTimeout = 10000;
105+
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
87106

88107
public AsciiBuffer sessionId() {
89108
return sessionId;
@@ -107,10 +126,20 @@ public StompChannel copy() {
107126

108127
CountDownLatch connectedLatch = new CountDownLatch(1);
109128

129+
private ScheduledFuture<?> heartBeatFuture;
130+
110131
public void connect() throws JMSException {
111132
if (this.connected.compareAndSet(false, true)) {
112133
try {
113-
final Promise<CallbackConnection> future = new Promise<CallbackConnection>();
134+
final Promise<CallbackConnection> future = new Promise<CallbackConnection>() {
135+
136+
@Override
137+
public void onSuccess(final CallbackConnection value) {
138+
rescheduleHeartBeat();
139+
super.onSuccess(value);
140+
}
141+
142+
};
114143
Stomp stomp = new Stomp(brokerURI);
115144
stomp.setLogin(userName);
116145
stomp.setPasscode(password);
@@ -196,7 +225,7 @@ public void onSuccess(StompFrame value) {
196225
});
197226
}
198227
});
199-
228+
stopHeartbeats();
200229
// Wait for the disconnect to finish..
201230
try {
202231
cd.await(getDisconnectTimeout(), TimeUnit.MILLISECONDS);
@@ -225,6 +254,7 @@ public void sendMessage(StompJmsMessage copy, AsciiBuffer txid, boolean sync) th
225254
} else {
226255
sendFrame(frame);
227256
}
257+
rescheduleHeartBeat();
228258
} catch (IOException e) {
229259
throw StompJmsExceptionSupport.create(e);
230260
}
@@ -357,6 +387,7 @@ public void onFailure(Throwable value) {
357387
@Override
358388
public void onSuccess(Void value) {
359389
writeBufferRemaining.getAndAdd(size);
390+
rescheduleHeartBeat();
360391
}
361392
});
362393
}
@@ -368,6 +399,7 @@ public void onSuccess(Void value) {
368399
@Override
369400
public void onSuccess(Void value) {
370401
writeBufferRemaining.getAndAdd(size);
402+
rescheduleHeartBeat();
371403
super.onSuccess(value);
372404
}
373405
};
@@ -604,4 +636,32 @@ public SSLContext getSslContext() {
604636
public void setSslContext(SSLContext sslContext) {
605637
this.sslContext = sslContext;
606638
}
639+
640+
private void rescheduleHeartBeat() {
641+
stopHeartbeats();
642+
heartBeatFuture = scheduledThreadPoolExecutor.schedule(new Callable<Void>() {
643+
public Void call() throws Exception {
644+
StompFrame heartbeatFrame = new StompFrame(SEND);
645+
heartbeatFrame.addHeader(DESTINATION, AsciiBuffer.ascii(channelId));
646+
heartbeatFrame.content(NEWLINE);
647+
heartbeatFrame.addContentLengthHeader();
648+
sendFrame(heartbeatFrame);
649+
rescheduleHeartBeat();
650+
return null;
651+
}
652+
},
653+
getSendInterval(Stomp.HEARTBEAT_INTERVAL),
654+
TimeUnit.MILLISECONDS);
655+
}
656+
657+
private long getSendInterval(String heartbeatInterval) {
658+
return Long.parseLong(heartbeatInterval.split(",")[0]);
659+
}
660+
661+
private synchronized void stopHeartbeats() {
662+
if (null != heartBeatFuture && !(heartBeatFuture.isCancelled() || heartBeatFuture.isDone())) {
663+
heartBeatFuture.cancel(false);
664+
heartBeatFuture = null;
665+
}
666+
}
607667
}

0 commit comments

Comments
 (0)