Skip to content

Commit d951b0e

Browse files
author
liujianjun.ljj
committed
fix shared channel unavailable issue
1 parent a073684 commit d951b0e

File tree

3 files changed

+79
-21
lines changed

3 files changed

+79
-21
lines changed

remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/ReferenceCountManagedChannel.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package com.alipay.sofa.rpc.transport.triple;
1818

19+
import com.alipay.sofa.rpc.log.Logger;
20+
import com.alipay.sofa.rpc.log.LoggerFactory;
1921
import io.grpc.CallOptions;
2022
import io.grpc.ClientCall;
2123
import io.grpc.ConnectivityState;
@@ -30,6 +32,8 @@
3032
*/
3133
public class ReferenceCountManagedChannel extends ManagedChannel {
3234

35+
private final static Logger LOGGER = LoggerFactory.getLogger(ReferenceCountManagedChannel.class);
36+
3337
private final AtomicInteger referenceCount = new AtomicInteger(0);
3438

3539
private ManagedChannel grpcChannel;
@@ -48,7 +52,13 @@ public void incrementAndGetCount() {
4852
@Override
4953
public ManagedChannel shutdown() {
5054
if (referenceCount.decrementAndGet() <= 0) {
51-
return grpcChannel.shutdown();
55+
ManagedChannel shutdown = grpcChannel.shutdown();
56+
try {
57+
shutdown.awaitTermination(5, TimeUnit.SECONDS);
58+
} catch (InterruptedException e) {
59+
LOGGER.warn("Triple channel shut down interrupted.");
60+
}
61+
return shutdown;
5262
}
5363
return grpcChannel;
5464
}

remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientTransport.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import com.alipay.sofa.rpc.event.EventBus;
3636
import com.alipay.sofa.rpc.ext.Extension;
3737
import com.alipay.sofa.rpc.interceptor.ClientHeaderClientInterceptor;
38-
import com.alipay.sofa.rpc.log.Logger;
39-
import com.alipay.sofa.rpc.log.LoggerFactory;
4038
import com.alipay.sofa.rpc.message.ResponseFuture;
4139
import com.alipay.sofa.rpc.server.triple.TripleContants;
4240
import com.alipay.sofa.rpc.transport.AbstractChannel;
@@ -63,10 +61,14 @@
6361
@Extension("tri")
6462
public class TripleClientTransport extends ClientTransport {
6563

66-
private final static Logger LOGGER = LoggerFactory.getLogger(TripleClientTransport.class);
67-
64+
/**
65+
* use clientTransportConfig.getProviderInfo() instead of
66+
*/
67+
@Deprecated
6868
protected ProviderInfo providerInfo;
6969

70+
protected ClientTransportConfig clientTransportConfig;
71+
7072
protected ManagedChannel channel;
7173

7274
protected InetSocketAddress localAddress;
@@ -76,8 +78,14 @@ public class TripleClientTransport extends ClientTransport {
7678
protected TripleInvoker tripleClientInvoker;
7779

7880
/* <address, gRPC channels> */
81+
/**
82+
* use URL_CONNECTION_MAP instead to cache ReferenceCountManagedChannel
83+
*/
84+
@Deprecated
7985
protected final static ConcurrentMap<String, ReferenceCountManagedChannel> channelMap = new ConcurrentHashMap<>();
8086

87+
protected final static ConcurrentMap<ClientTransportConfig, ReferenceCountManagedChannel> URL_CONNECTION_MAP = new ConcurrentHashMap<>();
88+
8189
protected final Object lock = new Object();
8290

8391
protected static int KEEP_ALIVE_INTERVAL = SofaConfigs.getOrCustomDefault(
@@ -92,6 +100,7 @@ public class TripleClientTransport extends ClientTransport {
92100
public TripleClientTransport(ClientTransportConfig transportConfig) {
93101
super(transportConfig);
94102
providerInfo = transportConfig.getProviderInfo();
103+
clientTransportConfig = transportConfig;
95104
connect();
96105
remoteAddress = InetSocketAddress.createUnresolved(providerInfo.getHost(), providerInfo.getPort());
97106
localAddress = InetSocketAddress.createUnresolved(NetUtils.getLocalIpv4(), 0);// 端口不准
@@ -102,8 +111,7 @@ public void connect() {
102111
if (isAvailable()) {
103112
return;
104113
}
105-
ProviderInfo providerInfo = transportConfig.getProviderInfo();
106-
channel = getSharedChannel(providerInfo);
114+
channel = getSharedChannel(transportConfig);
107115
tripleClientInvoker = buildClientInvoker();
108116
}
109117

@@ -114,17 +122,13 @@ protected TripleClientInvoker buildClientInvoker() {
114122
@Override
115123
public void disconnect() {
116124
if (channel != null) {
117-
try {
118-
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
119-
} catch (InterruptedException e) {
120-
LOGGER.warn("Triple channel shut down interrupted.");
121-
}
125+
channel.shutdown();
122126
if (channel.isShutdown()) {
123127
channel = null;
124-
channelMap.remove(providerInfo.toString());
128+
URL_CONNECTION_MAP.remove(clientTransportConfig);
125129
}
126130
} else {
127-
channelMap.remove(providerInfo.toString());
131+
URL_CONNECTION_MAP.remove(clientTransportConfig);
128132
}
129133
}
130134

@@ -251,9 +255,8 @@ public InetSocketAddress localAddress() {
251255
/**
252256
* Get shared channel connection
253257
*/
254-
private ReferenceCountManagedChannel getSharedChannel(ProviderInfo url) {
255-
String key = url.toString();
256-
ReferenceCountManagedChannel channel = channelMap.get(key);
258+
private ReferenceCountManagedChannel getSharedChannel(ClientTransportConfig config) {
259+
ReferenceCountManagedChannel channel = URL_CONNECTION_MAP.get(config);
257260

258261
if (channelAvailable(channel)) {
259262
channel.incrementAndGetCount();
@@ -263,13 +266,13 @@ private ReferenceCountManagedChannel getSharedChannel(ProviderInfo url) {
263266
}
264267

265268
synchronized (lock) {
266-
channel = channelMap.get(key);
269+
channel = URL_CONNECTION_MAP.get(config);
267270
// double check
268271
if (channelAvailable(channel)) {
269272
channel.incrementAndGetCount();
270273
} else {
271-
channel = new ReferenceCountManagedChannel(initChannel(url));
272-
channelMap.put(key, channel);
274+
channel = new ReferenceCountManagedChannel(initChannel(config.getProviderInfo()));
275+
URL_CONNECTION_MAP.put(config, channel);
273276
}
274277
}
275278

remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/transport/triple/TripleClientTransportTest.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
*/
1717
package com.alipay.sofa.rpc.transport.triple;
1818

19+
import com.alipay.sofa.rpc.client.ProviderInfo;
20+
import com.alipay.sofa.rpc.config.ConsumerConfig;
21+
import com.alipay.sofa.rpc.server.triple.HelloService;
22+
import com.alipay.sofa.rpc.transport.ClientTransportConfig;
23+
import com.alipay.sofa.rpc.transport.ClientTransportFactory;
1924
import org.junit.Assert;
2025
import org.junit.Test;
2126

2227
/**
23-
*
2428
* @author junyuan
2529
* @version TripleClientTransportTest.java, v 0.1 2024-08-01 17:12 junyuan Exp $
2630
*/
@@ -32,4 +36,45 @@ public void testInit() {
3236
Assert.assertEquals(TripleClientTransport.KEEP_ALIVE_INTERVAL, 0);
3337

3438
}
39+
40+
@Test
41+
public void test() {
42+
//模拟两个 reference 去消费同一份推送数据
43+
ProviderInfo providerInfo = new ProviderInfo();
44+
providerInfo.setHost("127.0.0.1");
45+
providerInfo.setPort(55555);
46+
47+
ConsumerConfig<?> consumerConfig1 = new ConsumerConfig<>();
48+
consumerConfig1.setProtocol("tri");
49+
consumerConfig1.setInterfaceId(HelloService.class.getName());
50+
ClientTransportConfig config1 = providerToClientConfig(consumerConfig1, providerInfo);
51+
TripleClientTransport clientTransport1 = (TripleClientTransport) ClientTransportFactory.getClientTransport(config1);
52+
53+
ConsumerConfig<?> consumerConfig2 = new ConsumerConfig<>();
54+
consumerConfig2.setProtocol("tri");
55+
consumerConfig2.setInterfaceId(HelloService.class.getName());
56+
ClientTransportConfig config2 = providerToClientConfig(consumerConfig2, providerInfo);
57+
TripleClientTransport clientTransport2 = (TripleClientTransport) ClientTransportFactory.getClientTransport(config2);
58+
59+
Assert.assertNotNull(TripleClientTransport.URL_CONNECTION_MAP.get(config1));
60+
Assert.assertNotNull(TripleClientTransport.URL_CONNECTION_MAP.get(config2));
61+
Assert.assertNotEquals(TripleClientTransport.URL_CONNECTION_MAP.get(config1), TripleClientTransport.URL_CONNECTION_MAP.get(config2));
62+
clientTransport1.destroy();
63+
Assert.assertNull(TripleClientTransport.URL_CONNECTION_MAP.get(config1));
64+
Assert.assertNotNull(TripleClientTransport.URL_CONNECTION_MAP.get(config2));
65+
clientTransport2.destroy();
66+
Assert.assertNull(TripleClientTransport.URL_CONNECTION_MAP.get(config2));
67+
}
68+
69+
private ClientTransportConfig providerToClientConfig(ConsumerConfig<?> consumerConfig, ProviderInfo providerInfo) {
70+
return new ClientTransportConfig()
71+
.setConsumerConfig(consumerConfig)
72+
.setProviderInfo(providerInfo)
73+
.setContainer(consumerConfig.getProtocol())
74+
.setConnectTimeout(consumerConfig.getConnectTimeout())
75+
.setInvokeTimeout(consumerConfig.getTimeout())
76+
.setDisconnectTimeout(consumerConfig.getDisconnectTimeout())
77+
.setConnectionNum(consumerConfig.getConnectionNum())
78+
.setChannelListeners(consumerConfig.getOnConnect());
79+
}
3580
}

0 commit comments

Comments
 (0)