Skip to content

Commit 7e6ca00

Browse files
EvenLjjliujianjun.ljj
andauthored
fix triple shared channel remove when rpc sendMsg issue (#1521)
* fix shared channel remove when rpc sendMag issue * fix shared channel remove when rpc sendMag issue * fix shared channel remove when rpc sendMag issue --------- Co-authored-by: liujianjun.ljj <[email protected]>
1 parent 3b89ae3 commit 7e6ca00

File tree

3 files changed

+87
-18
lines changed

3 files changed

+87
-18
lines changed

remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/ReferenceCountManagedChannel.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package com.alipay.sofa.rpc.transport.triple;
1818

19+
import com.alipay.sofa.rpc.log.Logger;
20+
import com.alipay.sofa.rpc.log.LoggerFactory;
1921
import io.grpc.CallOptions;
2022
import io.grpc.ClientCall;
2123
import io.grpc.ConnectivityState;
@@ -30,9 +32,11 @@
3032
*/
3133
public class ReferenceCountManagedChannel extends ManagedChannel {
3234

33-
private final AtomicInteger referenceCount = new AtomicInteger(0);
35+
private final static Logger LOGGER = LoggerFactory.getLogger(ReferenceCountManagedChannel.class);
3436

35-
private ManagedChannel grpcChannel;
37+
private final AtomicInteger referenceCount = new AtomicInteger(0);
38+
39+
private final ManagedChannel grpcChannel;
3640

3741
public ReferenceCountManagedChannel(ManagedChannel delegated) {
3842
this.grpcChannel = delegated;
@@ -47,8 +51,18 @@ public void incrementAndGetCount() {
4751

4852
@Override
4953
public ManagedChannel shutdown() {
50-
if (referenceCount.decrementAndGet() <= 0) {
51-
return grpcChannel.shutdown();
54+
int remainReferenceCount = referenceCount.decrementAndGet();
55+
try {
56+
if (remainReferenceCount <= 0) {
57+
ManagedChannel shutdown = grpcChannel.shutdown();
58+
shutdown.awaitTermination(5, TimeUnit.SECONDS);
59+
return shutdown;
60+
}
61+
} catch (InterruptedException e) {
62+
LOGGER.warn("Triple channel shut down interrupted.");
63+
} finally {
64+
LOGGER.info("ReferenceCountManagedChannel {} shutdown remain referenceCount: {}", this,
65+
remainReferenceCount);
5266
}
5367
return grpcChannel;
5468
}

remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientTransport.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import com.alipay.sofa.rpc.event.EventBus;
3636
import com.alipay.sofa.rpc.ext.Extension;
3737
import com.alipay.sofa.rpc.interceptor.ClientHeaderClientInterceptor;
38-
import com.alipay.sofa.rpc.log.Logger;
39-
import com.alipay.sofa.rpc.log.LoggerFactory;
4038
import com.alipay.sofa.rpc.message.ResponseFuture;
4139
import com.alipay.sofa.rpc.server.triple.TripleContants;
4240
import com.alipay.sofa.rpc.transport.AbstractChannel;
@@ -63,8 +61,6 @@
6361
@Extension("tri")
6462
public class TripleClientTransport extends ClientTransport {
6563

66-
private final static Logger LOGGER = LoggerFactory.getLogger(TripleClientTransport.class);
67-
6864
protected ProviderInfo providerInfo;
6965

7066
protected ManagedChannel channel;
@@ -78,7 +74,7 @@ public class TripleClientTransport extends ClientTransport {
7874
/* <address, gRPC channels> */
7975
protected final static ConcurrentMap<String, ReferenceCountManagedChannel> channelMap = new ConcurrentHashMap<>();
8076

81-
protected final Object lock = new Object();
77+
protected final static Object lock = new Object();
8278

8379
protected static int KEEP_ALIVE_INTERVAL = SofaConfigs.getOrCustomDefault(
8480
RpcConfigKeys.TRIPLE_CLIENT_KEEP_ALIVE_INTERVAL,
@@ -114,17 +110,11 @@ protected TripleClientInvoker buildClientInvoker() {
114110
@Override
115111
public void disconnect() {
116112
if (channel != null) {
117-
try {
118-
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
119-
} catch (InterruptedException e) {
120-
LOGGER.warn("Triple channel shut down interrupted.");
121-
}
113+
channel.shutdown();
122114
if (channel.isShutdown()) {
123-
channel = null;
124-
channelMap.remove(providerInfo.toString());
115+
channelMap.remove(providerInfo.toString(), (ReferenceCountManagedChannel) channel);
125116
}
126-
} else {
127-
channelMap.remove(providerInfo.toString());
117+
channel = null;
128118
}
129119
}
130120

@@ -269,6 +259,7 @@ private ReferenceCountManagedChannel getSharedChannel(ProviderInfo url) {
269259
channel.incrementAndGetCount();
270260
} else {
271261
channel = new ReferenceCountManagedChannel(initChannel(url));
262+
channel.incrementAndGetCount();
272263
channelMap.put(key, channel);
273264
}
274265
}

remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/transport/triple/TripleClientTransportTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
*/
1717
package com.alipay.sofa.rpc.transport.triple;
1818

19+
import com.alipay.sofa.rpc.client.ProviderInfo;
20+
import com.alipay.sofa.rpc.config.ConsumerConfig;
21+
import com.alipay.sofa.rpc.server.triple.HelloService;
22+
import com.alipay.sofa.rpc.transport.ClientTransportConfig;
23+
import com.alipay.sofa.rpc.transport.ClientTransportFactory;
1924
import org.junit.Assert;
2025
import org.junit.Test;
2126

@@ -32,4 +37,63 @@ public void testInit() {
3237
Assert.assertEquals(TripleClientTransport.KEEP_ALIVE_INTERVAL, 0);
3338

3439
}
40+
41+
@Test
42+
public void test() {
43+
//模拟两个 reference 去消费同一份推送数据
44+
ProviderInfo providerInfo = new ProviderInfo();
45+
providerInfo.setHost("127.0.0.1");
46+
providerInfo.setPort(55555);
47+
48+
ConsumerConfig<?> consumerConfig1 = new ConsumerConfig<>();
49+
consumerConfig1.setProtocol("tri");
50+
consumerConfig1.setInterfaceId(HelloService.class.getName());
51+
ClientTransportConfig config1 = providerToClientConfig(consumerConfig1, providerInfo);
52+
TripleClientTransport clientTransport1 = (TripleClientTransport) ClientTransportFactory.getClientTransport(config1);
53+
54+
ConsumerConfig<?> consumerConfig2 = new ConsumerConfig<>();
55+
consumerConfig2.setProtocol("tri");
56+
consumerConfig2.setInterfaceId(HelloService.class.getName());
57+
ClientTransportConfig config2 = providerToClientConfig(consumerConfig2, providerInfo);
58+
TripleClientTransport clientTransport2 = (TripleClientTransport) ClientTransportFactory.getClientTransport(config2);
59+
60+
Assert.assertNotNull(TripleClientTransport.channelMap.get("127.0.0.1:55555"));
61+
Assert.assertTrue(clientTransport1.isAvailable());
62+
Assert.assertTrue(clientTransport2.isAvailable());
63+
Assert.assertEquals(clientTransport1.channel, clientTransport2.channel);
64+
65+
clientTransport1.destroy();
66+
Assert.assertNull(clientTransport1.channel);
67+
Assert.assertFalse(clientTransport1.isAvailable());
68+
Assert.assertTrue(clientTransport2.isAvailable());
69+
Assert.assertNotNull(TripleClientTransport.channelMap.get("127.0.0.1:55555"));
70+
71+
clientTransport1.connect();
72+
Assert.assertTrue(clientTransport1.isAvailable());
73+
Assert.assertEquals(clientTransport1.channel, clientTransport2.channel);
74+
75+
clientTransport2.destroy();
76+
Assert.assertNull(clientTransport2.channel);
77+
Assert.assertTrue(clientTransport1.isAvailable());
78+
Assert.assertFalse(clientTransport2.isAvailable());
79+
Assert.assertNotNull(TripleClientTransport.channelMap.get("127.0.0.1:55555"));
80+
81+
clientTransport1.destroy();
82+
Assert.assertNull(clientTransport1.channel);
83+
Assert.assertFalse(clientTransport1.isAvailable());
84+
Assert.assertFalse(clientTransport2.isAvailable());
85+
Assert.assertNull(TripleClientTransport.channelMap.get("127.0.0.1:55555"));
86+
}
87+
88+
private ClientTransportConfig providerToClientConfig(ConsumerConfig<?> consumerConfig, ProviderInfo providerInfo) {
89+
return new ClientTransportConfig()
90+
.setConsumerConfig(consumerConfig)
91+
.setProviderInfo(providerInfo)
92+
.setContainer(consumerConfig.getProtocol())
93+
.setConnectTimeout(consumerConfig.getConnectTimeout())
94+
.setInvokeTimeout(consumerConfig.getTimeout())
95+
.setDisconnectTimeout(consumerConfig.getDisconnectTimeout())
96+
.setConnectionNum(consumerConfig.getConnectionNum())
97+
.setChannelListeners(consumerConfig.getOnConnect());
98+
}
3599
}

0 commit comments

Comments
 (0)