Skip to content

Commit 1b5edcd

Browse files
authored
[close #639] reduce lock granularity of RegionStoreClient and PDClient (#638)
1 parent 506d58f commit 1b5edcd

File tree

7 files changed

+73
-46
lines changed

7 files changed

+73
-46
lines changed

src/main/java/org/tikv/common/AbstractGRPCClient.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ public long getTimeout() {
182182

183183
private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
184184
while (true) {
185+
backOffer.checkTimeout();
186+
185187
try {
186188
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
187189
HealthGrpc.HealthBlockingStub stub =
@@ -198,10 +200,6 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin
198200
}
199201

200202
protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
201-
try {
202-
return doCheckHealth(backOffer, addressStr, hostMapping);
203-
} catch (Exception e) {
204-
return false;
205-
}
203+
return doCheckHealth(backOffer, addressStr, hostMapping);
206204
}
207205
}

src/main/java/org/tikv/common/PDClient.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,12 @@
5050
import java.util.concurrent.ConcurrentHashMap;
5151
import java.util.concurrent.ConcurrentMap;
5252
import java.util.concurrent.ExecutionException;
53+
import java.util.concurrent.ExecutorService;
5354
import java.util.concurrent.Executors;
55+
import java.util.concurrent.RejectedExecutionException;
5456
import java.util.concurrent.ScheduledExecutorService;
5557
import java.util.concurrent.TimeUnit;
58+
import java.util.concurrent.atomic.AtomicBoolean;
5659
import java.util.function.Supplier;
5760
import java.util.stream.Collectors;
5861
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -122,6 +125,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
122125
private ConcurrentMap<Long, Double> tiflashReplicaMap;
123126
private HostMapping hostMapping;
124127
private long lastUpdateLeaderTime;
128+
private final ExecutorService updateLeaderService = Executors.newSingleThreadExecutor();
129+
private final AtomicBoolean updateLeaderNotify = new AtomicBoolean();
125130

126131
public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY =
127132
HistogramUtils.buildDuration()
@@ -426,6 +431,8 @@ public void close() throws InterruptedException {
426431
if (channelFactory != null) {
427432
channelFactory.close();
428433
}
434+
435+
updateLeaderService.shutdownNow();
429436
}
430437

431438
@VisibleForTesting
@@ -462,11 +469,7 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
462469
}
463470

464471
private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
465-
try {
466-
return doGetMembers(backOffer, uri);
467-
} catch (Exception e) {
468-
return null;
469-
}
472+
return doGetMembers(backOffer, uri);
470473
}
471474

472475
// return whether the leader has changed to target address `leaderUrlStr`.
@@ -518,7 +521,26 @@ synchronized boolean createFollowerClientWrapper(
518521
return true;
519522
}
520523

521-
public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
524+
public void tryUpdateLeaderOrForwardFollower() {
525+
if (updateLeaderNotify.compareAndSet(false, true)) {
526+
try {
527+
BackOffer backOffer = defaultBackOffer();
528+
updateLeaderService.submit(
529+
() -> {
530+
try {
531+
updateLeaderOrForwardFollower(backOffer);
532+
} finally {
533+
updateLeaderNotify.set(false);
534+
}
535+
});
536+
} catch (RejectedExecutionException e) {
537+
logger.error("PDClient is shutdown", e);
538+
updateLeaderNotify.set(false);
539+
}
540+
}
541+
}
542+
543+
private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
522544
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
523545
return;
524546
}

src/main/java/org/tikv/common/operation/PDErrorHandler.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.tikv.common.PDClient;
2727
import org.tikv.common.exception.GrpcException;
2828
import org.tikv.common.exception.TiClientInternalException;
29+
import org.tikv.common.log.SlowLogSpan;
2930
import org.tikv.common.pd.PDError;
3031
import org.tikv.common.util.BackOffFunction;
3132
import org.tikv.common.util.BackOffer;
@@ -59,7 +60,12 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
5960
case PD_ERROR:
6061
backOffer.doBackOff(
6162
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
62-
client.updateLeaderOrForwardFollower(backOffer);
63+
SlowLogSpan tryUpdateLeaderSpan = backOffer.getSlowLog().start("try_update_leader");
64+
try {
65+
client.tryUpdateLeaderOrForwardFollower();
66+
} finally {
67+
tryUpdateLeaderSpan.end();
68+
}
6369
return true;
6470
case REGION_PEER_NOT_ELECTED:
6571
logger.debug(error.getMessage());
@@ -80,7 +86,12 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) {
8086
return false;
8187
}
8288
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
83-
client.updateLeaderOrForwardFollower(backOffer);
89+
SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("try_update_leader");
90+
try {
91+
client.tryUpdateLeaderOrForwardFollower();
92+
} finally {
93+
updateLeaderSpan.end();
94+
}
8495
return true;
8596
}
8697
}

src/main/java/org/tikv/common/region/RegionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, TiStoreTy
182182
public Pair<TiRegion, TiStore> getRegionStorePairByKey(
183183
ByteString key, TiStoreType storeType, BackOffer backOffer) {
184184
TiRegion region = getRegionByKey(key, backOffer);
185-
if (!region.isValid()) {
185+
if (region == null || !region.isValid()) {
186186
throw new TiClientInternalException("Region invalid: " + region);
187187
}
188188

src/main/java/org/tikv/common/region/RegionStoreClient.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1408,38 +1408,34 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store
14081408
this);
14091409
}
14101410

1411-
public synchronized RegionStoreClient build(TiRegion region, TiStore store)
1412-
throws GrpcException {
1411+
public RegionStoreClient build(TiRegion region, TiStore store) throws GrpcException {
14131412
return build(region, store, TiStoreType.TiKV);
14141413
}
14151414

1416-
public synchronized RegionStoreClient build(ByteString key) throws GrpcException {
1415+
public RegionStoreClient build(ByteString key) throws GrpcException {
14171416
return build(key, TiStoreType.TiKV);
14181417
}
14191418

1420-
public synchronized RegionStoreClient build(ByteString key, BackOffer backOffer)
1421-
throws GrpcException {
1419+
public RegionStoreClient build(ByteString key, BackOffer backOffer) throws GrpcException {
14221420
return build(key, TiStoreType.TiKV, backOffer);
14231421
}
14241422

1425-
public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType)
1426-
throws GrpcException {
1423+
public RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException {
14271424
return build(key, storeType, defaultBackOff());
14281425
}
14291426

1430-
public synchronized RegionStoreClient build(
1431-
ByteString key, TiStoreType storeType, BackOffer backOffer) throws GrpcException {
1427+
public RegionStoreClient build(ByteString key, TiStoreType storeType, BackOffer backOffer)
1428+
throws GrpcException {
14321429
Pair<TiRegion, TiStore> pair =
14331430
regionManager.getRegionStorePairByKey(key, storeType, backOffer);
14341431
return build(pair.first, pair.second, storeType);
14351432
}
14361433

1437-
public synchronized RegionStoreClient build(TiRegion region) throws GrpcException {
1434+
public RegionStoreClient build(TiRegion region) throws GrpcException {
14381435
return build(region, defaultBackOff());
14391436
}
14401437

1441-
public synchronized RegionStoreClient build(TiRegion region, BackOffer backOffer)
1442-
throws GrpcException {
1438+
public RegionStoreClient build(TiRegion region, BackOffer backOffer) throws GrpcException {
14431439
TiStore store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
14441440
return build(region, store, TiStoreType.TiKV);
14451441
}

src/test/java/org/tikv/common/PDClientV2MockTest.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,16 @@ public void testGetRegionById() throws Exception {
8888
String start = "getRegionById";
8989
String end = "getRegionByIdEnd";
9090
leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, end));
91-
try (PDClient client = createClient()) {
92-
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
93-
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
94-
Assert.assertEquals(end, r.getEndKey().toStringUtf8());
95-
}
91+
PDClient client = createClient();
92+
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
93+
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
94+
Assert.assertEquals(end, r.getEndKey().toStringUtf8());
9695

9796
leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, ""));
98-
try (PDClient client = createClient()) {
99-
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
100-
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
101-
Assert.assertEquals("", r.getEndKey().toStringUtf8());
102-
}
97+
98+
r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
99+
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
100+
Assert.assertEquals("", r.getEndKey().toStringUtf8());
103101
}
104102

105103
@Test
@@ -113,15 +111,14 @@ public void testScanRegions() throws Exception {
113111
.addRegions(Pdpb.Region.newBuilder().setRegion(makeRegion(start, end)).build())
114112
.build());
115113

116-
try (PDClient client = createClient()) {
117-
List<Region> regions =
118-
client.scanRegions(
119-
ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1);
114+
PDClient client = createClient();
115+
List<Region> regions =
116+
client.scanRegions(
117+
ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1);
120118

121-
for (Region r : regions) {
122-
Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8());
123-
Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8());
124-
}
119+
for (Region r : regions) {
120+
Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8());
121+
Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8());
125122
}
126123
}
127124
}

src/test/java/org/tikv/common/TimeoutTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,12 @@ public void testTimeoutInTime() {
5252
try (RawKVClient client = createClient()) {
5353
pdServers.get(0).stop();
5454
long start = System.currentTimeMillis();
55-
client.get(ByteString.copyFromUtf8("key"));
55+
try {
56+
client.get(ByteString.copyFromUtf8("key"));
57+
} catch (Exception ignore) {
58+
}
5659
long end = System.currentTimeMillis();
57-
Assert.assertTrue(end - start < session.getConf().getRawKVReadTimeoutInMS() * 2L);
60+
Assert.assertTrue(end - start < (session.getConf().getRawKVReadTimeoutInMS() * 1.5));
5861
}
5962
}
6063
}

0 commit comments

Comments
 (0)