Skip to content

Commit 5d85287

Browse files
authored
ClientPool supports different ConnectConfig for different key (#1593)
Signed-off-by: yhmo <[email protected]>
1 parent 81f2bc3 commit 5d85287

File tree

8 files changed

+451
-136
lines changed

8 files changed

+451
-136
lines changed

docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ version: '3.5'
33
services:
44
standalone:
55
container_name: milvus-javasdk-standalone-1
6-
image: milvusdb/milvus:v2.5.14
6+
image: milvusdb/milvus:v2.5.17
77
command: [ "milvus", "run", "standalone" ]
88
environment:
99
- COMMON_STORAGETYPE=local
@@ -24,7 +24,7 @@ services:
2424

2525
standaloneslave:
2626
container_name: milvus-javasdk-standalone-2
27-
image: milvusdb/milvus:v2.5.14
27+
image: milvusdb/milvus:v2.5.17
2828
command: [ "milvus", "run", "standalone" ]
2929
environment:
3030
- COMMON_STORAGETYPE=local

examples/src/main/java/io/milvus/v1/ClientPoolExample.java

Lines changed: 214 additions & 76 deletions
Large diffs are not rendered by default.

examples/src/main/java/io/milvus/v2/ClientPoolExample.java

Lines changed: 190 additions & 45 deletions
Large diffs are not rendered by default.

sdk-core/src/main/java/io/milvus/pool/ClientPool.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

10-
import java.time.Duration;
11-
1210
public class ClientPool<C, T> {
1311
protected static final Logger logger = LoggerFactory.getLogger(ClientPool.class);
1412
protected GenericKeyedObjectPool<String, T> clientPool;
@@ -40,6 +38,10 @@ protected ClientPool(PoolConfig config, PoolClientFactory clientFactory) {
4038
this.clientPool = new GenericKeyedObjectPool<String, T>(clientFactory, poolConfig);
4139
}
4240

41+
public void configForKey(String key, C config) {
42+
this.clientFactory.configForKey(key, config);
43+
}
44+
4345
/**
4446
* Get a client object which is idle from the pool.
4547
* Once the client is hold by the caller, it will be marked as active state and cannot be fetched by other caller.

sdk-core/src/main/java/io/milvus/pool/PoolClientFactory.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,22 @@
1010

1111
import java.lang.reflect.Constructor;
1212
import java.lang.reflect.Method;
13+
import java.util.concurrent.ConcurrentHashMap;
14+
import java.util.concurrent.ConcurrentMap;
1315

1416
public class PoolClientFactory<C, T> extends BaseKeyedPooledObjectFactory<String, T> {
1517
protected static final Logger logger = LoggerFactory.getLogger(PoolClientFactory.class);
16-
private final C config;
18+
private final C configDefault;
19+
private ConcurrentMap<String, C> configForKeys = new ConcurrentHashMap<>();
1720
private Constructor<?> constructor;
1821
private Method closeMethod;
1922
private Method verifyMethod;
2023

21-
public PoolClientFactory(C config, String clientClassName) throws ClassNotFoundException, NoSuchMethodException {
22-
this.config = config;
24+
public PoolClientFactory(C configDefault, String clientClassName) throws ClassNotFoundException, NoSuchMethodException {
25+
this.configDefault = configDefault;
2326
try {
2427
Class<?> clientCls = Class.forName(clientClassName);
25-
Class<?> configCls = Class.forName(config.getClass().getName());
28+
Class<?> configCls = Class.forName(configDefault.getClass().getName());
2629
constructor = clientCls.getConstructor(configCls);
2730
closeMethod = clientCls.getMethod("close", long.class);
2831
verifyMethod = clientCls.getMethod("clientIsReady");
@@ -32,11 +35,19 @@ public PoolClientFactory(C config, String clientClassName) throws ClassNotFoundE
3235
}
3336
}
3437

38+
public void configForKey(String key, C config) {
39+
configForKeys.put(key, config);
40+
}
41+
3542
@Override
3643
public T create(String key) throws Exception {
3744
try {
38-
T client = (T) constructor.newInstance(this.config);
39-
return client;
45+
C keyConfig = configForKeys.get(key);
46+
if (keyConfig == null) {
47+
return (T) constructor.newInstance(this.configDefault);
48+
} else {
49+
return (T) constructor.newInstance(keyConfig);
50+
}
4051
} catch (Exception e) {
4152
logger.error("Failed to create client, exception: ", e);
4253
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, e);

sdk-core/src/main/java/io/milvus/pool/PoolConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
@SuperBuilder
1111
public class PoolConfig {
1212
@Builder.Default
13-
private int maxIdlePerKey = 5;
13+
private int maxIdlePerKey = 10;
1414
@Builder.Default
1515
private int minIdlePerKey = 0;
1616
@Builder.Default
17-
private int maxTotalPerKey = 10;
17+
private int maxTotalPerKey = 30;
1818
@Builder.Default
19-
private int maxTotal = 50;
19+
private int maxTotal = 1000;
2020
@Builder.Default
2121
private boolean blockWhenExhausted = true;
2222
@Builder.Default

sdk-core/src/test/java/io/milvus/TestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
public class TestUtils {
1111
private int dimension = 256;
1212
private static final Random RANDOM = new Random();
13-
public static final String MilvusDockerImageID = "milvusdb/milvus:v2.5.14";
13+
public static final String MilvusDockerImageID = "milvusdb/milvus:v2.5.17";
1414

1515
public TestUtils(int dimension) {
1616
this.dimension = dimension;

sdk-core/src/test/java/io/milvus/v2/client/MilvusClientV2DockerTest.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1954,7 +1954,14 @@ void testDatabase() {
19541954

19551955
@Test
19561956
void testClientPool() {
1957+
// create a temp database
1958+
String dummyDb = "dummy_db";
1959+
client.createDatabase(CreateDatabaseReq.builder()
1960+
.databaseName(dummyDb)
1961+
.build());
1962+
19571963
try {
1964+
// the default connection config will connect to default db
19581965
ConnectConfig connectConfig = ConnectConfig.builder()
19591966
.uri(milvus.getEndpoint())
19601967
.rpcDeadlineMs(100L)
@@ -1963,16 +1970,24 @@ void testClientPool() {
19631970
.build();
19641971
MilvusClientV2Pool pool = new MilvusClientV2Pool(poolConfig, connectConfig);
19651972

1973+
// clients of the key "dummy_db" will connect to this db
1974+
pool.configForKey(dummyDb, ConnectConfig.builder()
1975+
.uri(milvus.getEndpoint())
1976+
.dbName(dummyDb)
1977+
.rpcDeadlineMs(100L)
1978+
.build());
1979+
19661980
List<Thread> threadList = new ArrayList<>();
19671981
int threadCount = 10;
19681982
int requestPerThread = 10;
1969-
String key = "192.168.1.1";
1983+
String key = "default";
19701984
for (int k = 0; k < threadCount; k++) {
19711985
Thread t = new Thread(() -> {
19721986
for (int i = 0; i < requestPerThread; i++) {
19731987
MilvusClientV2 client = pool.getClient(key);
19741988
String version = client.getServerVersion();
19751989
// System.out.printf("%d, %s%n", i, version);
1990+
Assertions.assertEquals(client.currentUsedDatabase(), "default");
19761991
System.out.printf("idle %d, active %d%n", pool.getIdleClientNumber(key), pool.getActiveClientNumber(key));
19771992
pool.returnClient(key, client);
19781993
}
@@ -1988,6 +2003,10 @@ void testClientPool() {
19882003

19892004
System.out.println(String.format("idle %d, active %d", pool.getIdleClientNumber(key), pool.getActiveClientNumber(key)));
19902005
System.out.println(String.format("total idle %d, total active %d", pool.getTotalIdleClientNumber(), pool.getTotalActiveClientNumber()));
2006+
2007+
// get client connect to the dummy db
2008+
MilvusClientV2 dummyClient = pool.getClient(dummyDb);
2009+
Assertions.assertEquals(dummyClient.currentUsedDatabase(), dummyDb);
19912010
pool.close();
19922011
} catch (Exception e) {
19932012
System.out.println(e.getMessage());

0 commit comments

Comments
 (0)