Skip to content

Commit 78c70af

Browse files
authored
fix: 增加加载缓存文件的保护机制,同一个文件最快只能每10s加载一次缓存文件 (#460)
1 parent 9b0a9cf commit 78c70af

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

polaris-plugins/polaris-plugins-registry/registry-memory/src/main/java/com/tencent/polaris/plugins/registry/memory/MessagePersistHandler.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
import java.nio.file.Path;
4949
import java.nio.file.Paths;
5050
import java.util.Map;
51+
import java.util.Objects;
52+
import java.util.concurrent.ConcurrentHashMap;
53+
import java.util.concurrent.TimeUnit;
54+
import java.util.function.BiFunction;
55+
import java.util.function.Function;
5156
import java.util.function.Supplier;
5257
import java.util.regex.Pattern;
5358

@@ -70,6 +75,9 @@ public class MessagePersistHandler {
7075

7176
private static final String PATTERN_SERVICE = "svc#%s#%s#%s.yaml";
7277

78+
// 同一个文件,两次读取之间的时间间隔为10s
79+
private static final long MESSAGE_READ_INTERVAL = TimeUnit.SECONDS.toMillis(10);
80+
7381
private final File persistDirFile;
7482

7583
private final String persistDirPath;
@@ -84,6 +92,9 @@ public class MessagePersistHandler {
8492

8593
private final JsonFormat.Parser parser = JsonFormat.parser();
8694

95+
// 缓存文件上一次加载的时间,避免频繁对同一个文件进行读取
96+
private final Map<ServiceEventKey, Long> messageLastReadTime = new ConcurrentHashMap<>();
97+
8798
public MessagePersistHandler(
8899
String persistDirPath, int maxWriteRetry, int maxReadRetry, long retryInterval) {
89100
this.maxReadRetry = maxReadRetry;
@@ -235,6 +246,31 @@ private Path doSaveService(ServiceEventKey svcEventKey, Message message) {
235246
return persistPath.toAbsolutePath();
236247
}
237248

249+
boolean shouldLoadFromStore(ServiceEventKey eventKey) {
250+
long currentTimeMs = System.currentTimeMillis();
251+
Long previousTimeMs = messageLastReadTime.putIfAbsent(eventKey, currentTimeMs);
252+
final boolean[] loadFromStore = {false};
253+
if (null != previousTimeMs) {
254+
if (currentTimeMs - previousTimeMs < MESSAGE_READ_INTERVAL) {
255+
return false;
256+
}
257+
messageLastReadTime.computeIfPresent(eventKey, new BiFunction<ServiceEventKey, Long, Long>() {
258+
@Override
259+
public Long apply(ServiceEventKey serviceEventKey, Long aLong) {
260+
if (Objects.equals(aLong, previousTimeMs)) {
261+
loadFromStore[0] = true;
262+
return currentTimeMs;
263+
}
264+
return aLong;
265+
}
266+
});
267+
} else {
268+
// first time
269+
loadFromStore[0] = true;
270+
}
271+
return loadFromStore[0];
272+
}
273+
238274
/**
239275
* 遍历缓存目录并加载之前缓存的服务信息
240276
*
@@ -243,6 +279,9 @@ private Path doSaveService(ServiceEventKey svcEventKey, Message message) {
243279
* @return 服务标识-消息对象的集合
244280
*/
245281
public Message loadPersistedServices(ServiceEventKey eventKey, Supplier<Message.Builder> builderSupplier) {
282+
if (!shouldLoadFromStore(eventKey)) {
283+
return null;
284+
}
246285
String fileName = serviceKeyToFileName(eventKey);
247286
int retryTimes = 0;
248287
Message readMessage = null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Tencent is pleased to support the open source community by making Polaris available.
3+
*
4+
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5+
*
6+
* Licensed under the BSD 3-Clause License (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* https://opensource.org/licenses/BSD-3-Clause
11+
*
12+
* Unless required by applicable law or agreed to in writing, software distributed
13+
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14+
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations under the License.
16+
*/
17+
18+
package com.tencent.polaris.plugins.registry.memory;
19+
20+
import com.tencent.polaris.api.pojo.ServiceEventKey;
21+
import com.tencent.polaris.api.pojo.ServiceKey;
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
public class MessagePersistHandlerTest {
26+
27+
@Test
28+
public void testMessagePersistHandler_shouldLoadFromStore() {
29+
MessagePersistHandler messagePersistHandler = new MessagePersistHandler(
30+
"/root", 1, 1, 1000);
31+
ServiceEventKey serviceEventKey = new ServiceEventKey(
32+
new ServiceKey("Test", "testSvc"), ServiceEventKey.EventType.SERVICE);
33+
boolean result1 = messagePersistHandler.shouldLoadFromStore(serviceEventKey);
34+
Assert.assertTrue(result1);
35+
try {
36+
Thread.sleep(1000);
37+
} catch (InterruptedException e) {
38+
throw new RuntimeException(e);
39+
}
40+
boolean result2 = messagePersistHandler.shouldLoadFromStore(serviceEventKey);
41+
Assert.assertFalse(result2);
42+
ServiceEventKey serviceEventKey1 = new ServiceEventKey(
43+
new ServiceKey("Test", "testSvc1"), ServiceEventKey.EventType.SERVICE);
44+
boolean result11 = messagePersistHandler.shouldLoadFromStore(serviceEventKey1);
45+
Assert.assertTrue(result11);
46+
try {
47+
Thread.sleep(1000);
48+
} catch (InterruptedException e) {
49+
throw new RuntimeException(e);
50+
}
51+
boolean result12 = messagePersistHandler.shouldLoadFromStore(serviceEventKey1);
52+
Assert.assertFalse(result12);
53+
try {
54+
Thread.sleep(25000);
55+
} catch (InterruptedException e) {
56+
throw new RuntimeException(e);
57+
}
58+
boolean result3 = messagePersistHandler.shouldLoadFromStore(serviceEventKey);
59+
Assert.assertTrue(result3);
60+
boolean result13 = messagePersistHandler.shouldLoadFromStore(serviceEventKey1);
61+
Assert.assertTrue(result13);
62+
}
63+
}

0 commit comments

Comments
 (0)