Skip to content

Conversation

@Cat-Drink
Copy link

@Cat-Drink Cat-Drink commented Sep 15, 2025

Purpose of the pull request

This pull request introduces support for the Kubernetes client connection pool, optimizing the interaction performance and resource management between Apache DolphinScheduler and Kubernetes clusters. By maintaining a reuse mechanism for client connections, this feature significantly reduces the overhead caused by frequent creation and destruction of Kubernetes clients, thereby improving the overall efficiency and stability of the system.

Brief change log

  • Core Backend Implementation: Added the KubernetesClientPool class, which implements a singleton-pattern connection pool manager to maintain an independent connection pool for each Kubernetes cluster.
  • Connection Pool Configuration: Implemented the PoolConfig inner class, which supports configuration items such as minimum idle connections, maximum idle connections, maximum connections, maximum waiting time, and idle timeout. These configurations can be customized through properties in TaskConstants.
  • Cluster Connection Pool: Implemented the ClusterClientPool inner class, which is responsible for managing client connections of specific clusters, including the creation, acquisition, return, and closure of connections.
  • Idle Connection Cleanup: Added a periodic cleanup thread that executes the cleanupIdleClients method every 30 seconds to clear timed-out idle connections, effectively saving resources.
  • Cluster Identification: Implemented a cluster identification generation mechanism based on kubeconfig, using the SHA-256 algorithm to generate a unique identifier for each cluster.
  • Client Validation: Added the isClientValid method to ensure that the returned client connection remains valid.
  • Integration into Utility Classes: Updated utility classes such as K8sUtils and KubernetesApplicationManager to enable them to acquire and return clients using the connection pool.

Verify this pull request

This change includes comprehensive test cases and can be verified through the following methods:

Automated Testing

  • Basic Functionality Test: KubernetesClientPoolTest.testKubernetesClientPoolBasicFunction verifies the basic operations of the connection pool (acquiring cluster ID, obtaining client, returning client, and closing the connection pool).
  • Configuration Test: KubernetesClientPoolTest.testKubernetesClientPoolConfig verifies the correctness of connection pool configurations.
  • Cluster ID Generation Test: KubernetesClientPoolTest.testClusterIdGeneration verifies the function of generating a unique cluster identifier based on kubeconfig.
  • Singleton Pattern Test: KubernetesClientPoolTest.testSingletonPattern verifies the singleton pattern implementation of the connection pool.
  • Concurrent Access Test: Verifies the thread safety and stability of the connection pool in a multi-threaded environment.
  • Timeout Handling Test: KubernetesClientPoolTest.testClusterClientPoolBorrowObjectTimeout verifies the handling mechanism for client acquisition timeouts.
  • Idle Connection Cleanup Test: KubernetesClientPoolTest.testCleanupIdleClients and KubernetesClientPoolTest.testClusterClientPoolCleanupIdle verify the periodic cleanup function of idle connections.

Manual Verification Steps

  1. Ensure that appropriate connection pool parameters (such as k8s.client.pool.max.size) are set in the configuration file.
  2. Submit multiple Kubernetes tasks to different clusters.
  3. Check the logs to verify whether the connection pool correctly creates and manages client connections.
  4. Monitor system resource usage to verify whether the connection pool effectively reduces the overhead caused by connection creation and destruction.

Pull Request Notice

Pull Request Notice

If your pull request contains incompatible change, you should also add it to docs/docs/en/guide/upgrade/incompatible.md

@boring-cyborg
Copy link

boring-cyborg bot commented Sep 15, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://github.com/apache/dolphinscheduler/blob/dev/docs/docs/en/contribute/join/pull-request.md)

@github-actions github-actions bot added UI ui and front end related backend test labels Sep 15, 2025
@Cat-Drink Cat-Drink changed the title K8s Connections Pool [Feature-3328][Task] Kubernetes Connection Pool Implementation Sep 15, 2025
@wangxj3 wangxj3 changed the title [Feature-3328][Task] Kubernetes Connection Pool Implementation [DSIP-88][Task] Kubernetes Connection Pool Implementation Sep 16, 2025
Copy link
Member

@SbloodyS SbloodyS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pr's title and DSIP-88 are no related. Please modify the correct title and with full design detail and description. @Cat-Drink

@Cat-Drink
Copy link
Author

PTAL‌

@davidzollo davidzollo added the improvement make more easy to user or prompt friendly label Oct 28, 2025
String kubeConfig = k8sTaskExecutionContext.getConfigYaml();
int hashCode = kubeConfig.hashCode();
int nonNegativeHash = hashCode & 0x7FFFFFFF;
return "k8s-cluster-" + nonNegativeHash;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use constants here.

byte[] hashBytes = digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8));

String base64Hash = Base64.getUrlEncoder().encodeToString(hashBytes);
return base64Hash.replace("=", "");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use constants here.

while (true) {
try {
// every 30s
Thread.sleep(30000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use constants here.

Thread cleanupThread = new Thread(() -> {
while (true) {
try {
// every 30s
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// every 30s

Comment on lines 187 to 201
public int getMaxSize() {
return maxSize;
}
public int getMinIdle() {
return minIdle;
}
public int getMaxIdle() {
return maxIdle;
}
public long getMaxWaitMs() {
return maxWaitMs;
}
public long getIdleTimeoutMs() {
return idleTimeoutMs;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using @Getter here.

} catch (Exception e) {
log.error("job failed in k8s: {}", e.getMessage(), e);
taskResponse.setExitStatusCode(EXIT_CODE_FAILURE);
log.error(e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error(e.getMessage());

}
}
} catch (Exception e) {
// if Exception happen
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// if Exception happen

throw e;
} finally {
ProcessUtils.removeK8sClientCache(taskRequest.getTaskAppId());
log.error(e.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error(e.getMessage());
log.error("k8s task executor running error: ", e);

Comment on lines 114 to 116
/**
* 测试K8s客户端是否被正确归还到连接池
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid using chinese.

Comment on lines +63 to +68
this.poolConfig = new PoolConfig(
PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10),
PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2),
PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5),
PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000),
PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These params should add to docs and guide user how to use it.

@SbloodyS
Copy link
Member

This pr's title and DSIP-88 are no related.

BTW, this is still unaddressed.

@wangxj3 wangxj3 changed the title [Improvement-88][SERVER] Kubernetes Connection Pool Implementation [DSIP-88][SERVER] Kubernetes Connection Pool Implementation Oct 28, 2025
@Cat-Drink Cat-Drink changed the title [DSIP-88][SERVER] Kubernetes Connection Pool Implementation [DSIP-17122][SERVER] Kubernetes Connection Pool Implementation Oct 29, 2025
@nielifeng nielifeng requested a review from Copilot October 30, 2025 05:16
.inContainer(kubernetesApplicationManagerContext.getContainerName())
.watchLog();
} finally {
log.debug("Log watch client is not returned immediately, will be managed by caller after watch completes");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log seems meaningless.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a Kubernetes client connection pool (KubernetesClientPool) to manage K8s client connections efficiently across multiple clusters. The changes replace the previous approach of creating individual clients and caching them per task with a robust pooling mechanism that supports connection reuse, lifecycle management, and automatic cleanup of idle connections.

Key Changes

  • Implements a new KubernetesClientPool singleton with per-cluster connection pooling
  • Refactors K8sUtils to use the connection pool instead of instance-level clients
  • Updates KubernetesApplicationManager to retrieve clients from the pool and return them after use
  • Removes the removeK8sClientCache method from ProcessUtils as caching is now handled by the pool
  • Adds comprehensive test coverage for the new pooling functionality

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
KubernetesClientPool.java New connection pool implementation with cluster-based pooling, idle cleanup, and configurable limits
K8sUtils.java Refactored to use connection pool; all methods now accept configYaml parameter and properly return clients
KubernetesApplicationManager.java Updated to use connection pool instead of per-task client caching
K8sTaskExecutor.java Modified to pass configYaml to K8sUtils methods and removed client build calls
ProcessUtils.java Removed removeK8sClientCache method
TaskConstants.java Added configuration constants for pool sizing and timeouts
KubernetesClientPoolTest.java Comprehensive test suite for connection pool functionality
K8sUtilsTest.java New tests for refactored K8sUtils methods
KubernetesApplicationManagerTest.java Tests for application manager with connection pool
K8sTaskExecutorTest.java Updated tests with proper mocking for connection pool usage
configuration.md (en/zh) Documentation for new connection pool configuration parameters

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@BeforeEach
public void setUp() {
k8sUtils = new K8sUtils();
mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); // 拦截所有使用静态方法的请求
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chinese comment should be removed or translated to English. The comment '拦截所有使用静态方法的请求' (intercept all requests using static methods) is inconsistent with the rest of the codebase which uses English comments.

Suggested change
mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); // 拦截所有使用静态方法的请求
mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); // Intercept all requests using static methods

Copilot uses AI. Check for mistakes.

@Override
public TaskResponse run(String k8sParameterStr) throws Exception {
public TaskResponse run(String k8sParameterStr) {
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method signature changed from throws Exception to no exception declaration, but the method body still contains try-catch blocks that catch Exception. This is a breaking API change that could affect callers expecting to handle exceptions. The removed exception should either be kept in the signature or properly handled/logged in all code paths.

Suggested change
public TaskResponse run(String k8sParameterStr) {
public TaskResponse run(String k8sParameterStr) throws Exception {

Copilot uses AI. Check for mistakes.
KubernetesClient client = null;
try {
client = getClient(kubernetesApplicationManagerContext);
// Retrieve watchList again, as the previous instance of tes client connection pool may have expired
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: 'tes client' should be 'the client'.

Suggested change
// Retrieve watchList again, as the previous instance of tes client connection pool may have expired
// Retrieve watchList again, as the previous instance of the client connection pool may have expired

Copilot uses AI. Check for mistakes.
return base64Hash.replace(BASE64_PADDING_CHARACTER, EMPTY_STRING);
} catch (Exception e) {
log.error("Failed to generate cluster ID", e);
return Integer.toString(kubeConfig.hashCode());
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback cluster ID generation using hashCode() can produce negative values, which could cause issues. Although line 170 in KubernetesApplicationManager uses hashCode & 0x7FFFFFFF to ensure non-negative values, this fallback in KubernetesClientPool doesn't apply the same mask, creating inconsistency in cluster ID generation.

Suggested change
return Integer.toString(kubeConfig.hashCode());
return Integer.toString(kubeConfig.hashCode() & 0x7FFFFFFF);

Copilot uses AI. Check for mistakes.
Comment on lines 168 to 171
String kubeConfig = k8sTaskExecutionContext.getConfigYaml();
int hashCode = kubeConfig.hashCode();
int nonNegativeHash = hashCode & 0x7FFFFFFF;
return K8S_CLUSTER_PREFIX + nonNegativeHash;
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cluster ID generation logic is duplicated between KubernetesApplicationManager.getClusterId() and KubernetesClientPool.getClusterId(). These methods use different algorithms (SHA-256 in the pool vs hashCode in the manager), which will produce different cluster IDs for the same kubeConfig. This inconsistency will prevent proper client pooling and cause connection leaks.

Copilot uses AI. Check for mistakes.
Comment on lines 107 to 108
log.debug(
"createBatchJobWatcher does not return client immediately, caller should manage client lifecycle");
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment indicates the client is not returned immediately and the caller should manage the lifecycle, but the client is obtained in the try block (line 97) and logged in the finally block (line 106-109) without actually being returned to the pool. This will cause a client connection leak as the client remains borrowed but is never returned.

Copilot uses AI. Check for mistakes.
Comment on lines 280 to 288
return client.pods().inNamespace(pod.getMetadata().getNamespace())
.withName(pod.getMetadata().getName())
.inContainer(kubernetesApplicationManagerContext.getContainerName())
.watchLog();
} finally {
log.debug("Log watch client is not returned immediately, will be managed by caller after watch completes");
}
}

Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the issue in K8sUtils.createBatchJobWatcher, the client obtained in getPodLogWatcher (line 263) is not returned to the pool in the finally block. The debug message suggests it's intentional, but this creates a connection leak as there's no guarantee the caller will properly return the client after the watch completes.

Suggested change
return client.pods().inNamespace(pod.getMetadata().getNamespace())
.withName(pod.getMetadata().getName())
.inContainer(kubernetesApplicationManagerContext.getContainerName())
.watchLog();
} finally {
log.debug("Log watch client is not returned immediately, will be managed by caller after watch completes");
}
}
LogWatch logWatch = client.pods().inNamespace(pod.getMetadata().getNamespace())
.withName(pod.getMetadata().getName())
.inContainer(kubernetesApplicationManagerContext.getContainerName())
.watchLog();
return new ClientReturningLogWatch(logWatch, client, kubernetesApplicationManagerContext);
} catch (Exception e) {
if (client != null) {
KubernetesClientPool.returnClient(kubernetesApplicationManagerContext, client);
}
throw e;
}
}
/**
* Wrapper for LogWatch that returns the client to the pool when closed.
*/
private static class ClientReturningLogWatch implements LogWatch {
private final LogWatch delegate;
private final KubernetesClient client;
private final KubernetesApplicationManagerContext context;
private boolean closed = false;
public ClientReturningLogWatch(LogWatch delegate, KubernetesClient client, KubernetesApplicationManagerContext context) {
this.delegate = delegate;
this.client = client;
this.context = context;
}
@Override
public void close() {
if (!closed) {
try {
delegate.close();
} finally {
KubernetesClientPool.returnClient(context, client);
closed = true;
}
}
}
@Override
public java.io.InputStream getOutput() {
return delegate.getOutput();
}
@Override
public java.io.InputStream getError() {
return delegate.getError();
}
}

Copilot uses AI. Check for mistakes.
Comment on lines 613 to 614
KubernetesClient client1 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool);
KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool);
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable 'KubernetesClient client1' is never read.

Suggested change
KubernetesClient client1 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool);
KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool);
borrowObjectMethod.invoke(clusterClientPool);
borrowObjectMethod.invoke(clusterClientPool);

Copilot uses AI. Check for mistakes.

// Get all available clients (maxSize = 2)
KubernetesClient client1 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool);
KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool);
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable 'KubernetesClient client2' is never read.

Suggested change
KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool);
borrowObjectMethod.invoke(clusterClientPool);

Copilot uses AI. Check for mistakes.
String labelValue = kubernetesApplicationManagerContext.getLabelValue();

// Build the latest watchList again
FilterWatchListDeletable<Pod, PodList, PodResource> newWatchList =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a new newWatchList?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is a newWatchList

wangxj3
wangxj3 previously approved these changes Oct 30, 2025
@sonarqubecloud
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend document improvement make more easy to user or prompt friendly test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants