-
Notifications
You must be signed in to change notification settings - Fork 5k
[DSIP-17122][SERVER] Kubernetes Connection Pool Implementation #17510
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… and adapted to special handling in Watch scenarios
…connection pool interface of K8sUtils through the configYaml parameter
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://github.com/apache/dolphinscheduler/blob/dev/docs/docs/en/contribute/join/pull-request.md) |
...c/main/java/org/apache/dolphinscheduler/plugin/task/api/am/KubernetesApplicationManager.java
Fixed
Show fixed
Hide fixed
...-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java
Fixed
Show fixed
Hide fixed
...-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java
Fixed
Show fixed
Hide fixed
...-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/KubernetesClientPool.java
Fixed
Show fixed
Hide fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pr's title and DSIP-88 are no related. Please modify the correct title and with full design detail and description. @Cat-Drink
|
PTAL |
| String kubeConfig = k8sTaskExecutionContext.getConfigYaml(); | ||
| int hashCode = kubeConfig.hashCode(); | ||
| int nonNegativeHash = hashCode & 0x7FFFFFFF; | ||
| return "k8s-cluster-" + nonNegativeHash; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use constants here.
| byte[] hashBytes = digest.digest(kubeConfig.getBytes(StandardCharsets.UTF_8)); | ||
|
|
||
| String base64Hash = Base64.getUrlEncoder().encodeToString(hashBytes); | ||
| return base64Hash.replace("=", ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use constants here.
| while (true) { | ||
| try { | ||
| // every 30s | ||
| Thread.sleep(30000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use constants here.
| Thread cleanupThread = new Thread(() -> { | ||
| while (true) { | ||
| try { | ||
| // every 30s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // every 30s |
| public int getMaxSize() { | ||
| return maxSize; | ||
| } | ||
| public int getMinIdle() { | ||
| return minIdle; | ||
| } | ||
| public int getMaxIdle() { | ||
| return maxIdle; | ||
| } | ||
| public long getMaxWaitMs() { | ||
| return maxWaitMs; | ||
| } | ||
| public long getIdleTimeoutMs() { | ||
| return idleTimeoutMs; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using @Getter here.
| } catch (Exception e) { | ||
| log.error("job failed in k8s: {}", e.getMessage(), e); | ||
| taskResponse.setExitStatusCode(EXIT_CODE_FAILURE); | ||
| log.error(e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| log.error(e.getMessage()); |
| } | ||
| } | ||
| } catch (Exception e) { | ||
| // if Exception happen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // if Exception happen |
| throw e; | ||
| } finally { | ||
| ProcessUtils.removeK8sClientCache(taskRequest.getTaskAppId()); | ||
| log.error(e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| log.error(e.getMessage()); | |
| log.error("k8s task executor running error: ", e); |
| /** | ||
| * 测试K8s客户端是否被正确归还到连接池 | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid using chinese.
| this.poolConfig = new PoolConfig( | ||
| PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_SIZE, 10), | ||
| PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MIN_IDLE, 2), | ||
| PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_IDLE, 5), | ||
| PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_MAX_WAIT_MS, 30000), | ||
| PropertyUtils.getInt(TaskConstants.K8S_CLIENT_POOL_IDLE_TIMEOUT_MS, 600000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These params should add to docs and guide user how to use it.
BTW, this is still unaddressed. |
| .inContainer(kubernetesApplicationManagerContext.getContainerName()) | ||
| .watchLog(); | ||
| } finally { | ||
| log.debug("Log watch client is not returned immediately, will be managed by caller after watch completes"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log seems meaningless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a Kubernetes client connection pool (KubernetesClientPool) to manage K8s client connections efficiently across multiple clusters. The changes replace the previous approach of creating individual clients and caching them per task with a robust pooling mechanism that supports connection reuse, lifecycle management, and automatic cleanup of idle connections.
Key Changes
- Implements a new
KubernetesClientPoolsingleton with per-cluster connection pooling - Refactors
K8sUtilsto use the connection pool instead of instance-level clients - Updates
KubernetesApplicationManagerto retrieve clients from the pool and return them after use - Removes the
removeK8sClientCachemethod fromProcessUtilsas caching is now handled by the pool - Adds comprehensive test coverage for the new pooling functionality
Reviewed Changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| KubernetesClientPool.java | New connection pool implementation with cluster-based pooling, idle cleanup, and configurable limits |
| K8sUtils.java | Refactored to use connection pool; all methods now accept configYaml parameter and properly return clients |
| KubernetesApplicationManager.java | Updated to use connection pool instead of per-task client caching |
| K8sTaskExecutor.java | Modified to pass configYaml to K8sUtils methods and removed client build calls |
| ProcessUtils.java | Removed removeK8sClientCache method |
| TaskConstants.java | Added configuration constants for pool sizing and timeouts |
| KubernetesClientPoolTest.java | Comprehensive test suite for connection pool functionality |
| K8sUtilsTest.java | New tests for refactored K8sUtils methods |
| KubernetesApplicationManagerTest.java | Tests for application manager with connection pool |
| K8sTaskExecutorTest.java | Updated tests with proper mocking for connection pool usage |
| configuration.md (en/zh) | Documentation for new connection pool configuration parameters |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @BeforeEach | ||
| public void setUp() { | ||
| k8sUtils = new K8sUtils(); | ||
| mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); // 拦截所有使用静态方法的请求 |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chinese comment should be removed or translated to English. The comment '拦截所有使用静态方法的请求' (intercept all requests using static methods) is inconsistent with the rest of the codebase which uses English comments.
| mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); // 拦截所有使用静态方法的请求 | |
| mockedKubernetesClientPool = Mockito.mockStatic(KubernetesClientPool.class); // Intercept all requests using static methods |
|
|
||
| @Override | ||
| public TaskResponse run(String k8sParameterStr) throws Exception { | ||
| public TaskResponse run(String k8sParameterStr) { |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method signature changed from throws Exception to no exception declaration, but the method body still contains try-catch blocks that catch Exception. This is a breaking API change that could affect callers expecting to handle exceptions. The removed exception should either be kept in the signature or properly handled/logged in all code paths.
| public TaskResponse run(String k8sParameterStr) { | |
| public TaskResponse run(String k8sParameterStr) throws Exception { |
| KubernetesClient client = null; | ||
| try { | ||
| client = getClient(kubernetesApplicationManagerContext); | ||
| // Retrieve watchList again, as the previous instance of tes client connection pool may have expired |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in comment: 'tes client' should be 'the client'.
| // Retrieve watchList again, as the previous instance of tes client connection pool may have expired | |
| // Retrieve watchList again, as the previous instance of the client connection pool may have expired |
| return base64Hash.replace(BASE64_PADDING_CHARACTER, EMPTY_STRING); | ||
| } catch (Exception e) { | ||
| log.error("Failed to generate cluster ID", e); | ||
| return Integer.toString(kubeConfig.hashCode()); |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback cluster ID generation using hashCode() can produce negative values, which could cause issues. Although line 170 in KubernetesApplicationManager uses hashCode & 0x7FFFFFFF to ensure non-negative values, this fallback in KubernetesClientPool doesn't apply the same mask, creating inconsistency in cluster ID generation.
| return Integer.toString(kubeConfig.hashCode()); | |
| return Integer.toString(kubeConfig.hashCode() & 0x7FFFFFFF); |
| String kubeConfig = k8sTaskExecutionContext.getConfigYaml(); | ||
| int hashCode = kubeConfig.hashCode(); | ||
| int nonNegativeHash = hashCode & 0x7FFFFFFF; | ||
| return K8S_CLUSTER_PREFIX + nonNegativeHash; |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cluster ID generation logic is duplicated between KubernetesApplicationManager.getClusterId() and KubernetesClientPool.getClusterId(). These methods use different algorithms (SHA-256 in the pool vs hashCode in the manager), which will produce different cluster IDs for the same kubeConfig. This inconsistency will prevent proper client pooling and cause connection leaks.
| log.debug( | ||
| "createBatchJobWatcher does not return client immediately, caller should manage client lifecycle"); |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment indicates the client is not returned immediately and the caller should manage the lifecycle, but the client is obtained in the try block (line 97) and logged in the finally block (line 106-109) without actually being returned to the pool. This will cause a client connection leak as the client remains borrowed but is never returned.
| return client.pods().inNamespace(pod.getMetadata().getNamespace()) | ||
| .withName(pod.getMetadata().getName()) | ||
| .inContainer(kubernetesApplicationManagerContext.getContainerName()) | ||
| .watchLog(); | ||
| } finally { | ||
| log.debug("Log watch client is not returned immediately, will be managed by caller after watch completes"); | ||
| } | ||
| } | ||
|
|
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the issue in K8sUtils.createBatchJobWatcher, the client obtained in getPodLogWatcher (line 263) is not returned to the pool in the finally block. The debug message suggests it's intentional, but this creates a connection leak as there's no guarantee the caller will properly return the client after the watch completes.
| return client.pods().inNamespace(pod.getMetadata().getNamespace()) | |
| .withName(pod.getMetadata().getName()) | |
| .inContainer(kubernetesApplicationManagerContext.getContainerName()) | |
| .watchLog(); | |
| } finally { | |
| log.debug("Log watch client is not returned immediately, will be managed by caller after watch completes"); | |
| } | |
| } | |
| LogWatch logWatch = client.pods().inNamespace(pod.getMetadata().getNamespace()) | |
| .withName(pod.getMetadata().getName()) | |
| .inContainer(kubernetesApplicationManagerContext.getContainerName()) | |
| .watchLog(); | |
| return new ClientReturningLogWatch(logWatch, client, kubernetesApplicationManagerContext); | |
| } catch (Exception e) { | |
| if (client != null) { | |
| KubernetesClientPool.returnClient(kubernetesApplicationManagerContext, client); | |
| } | |
| throw e; | |
| } | |
| } | |
| /** | |
| * Wrapper for LogWatch that returns the client to the pool when closed. | |
| */ | |
| private static class ClientReturningLogWatch implements LogWatch { | |
| private final LogWatch delegate; | |
| private final KubernetesClient client; | |
| private final KubernetesApplicationManagerContext context; | |
| private boolean closed = false; | |
| public ClientReturningLogWatch(LogWatch delegate, KubernetesClient client, KubernetesApplicationManagerContext context) { | |
| this.delegate = delegate; | |
| this.client = client; | |
| this.context = context; | |
| } | |
| @Override | |
| public void close() { | |
| if (!closed) { | |
| try { | |
| delegate.close(); | |
| } finally { | |
| KubernetesClientPool.returnClient(context, client); | |
| closed = true; | |
| } | |
| } | |
| } | |
| @Override | |
| public java.io.InputStream getOutput() { | |
| return delegate.getOutput(); | |
| } | |
| @Override | |
| public java.io.InputStream getError() { | |
| return delegate.getError(); | |
| } | |
| } |
| KubernetesClient client1 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); | ||
| KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable 'KubernetesClient client1' is never read.
| KubernetesClient client1 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); | |
| KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); | |
| borrowObjectMethod.invoke(clusterClientPool); | |
| borrowObjectMethod.invoke(clusterClientPool); |
|
|
||
| // Get all available clients (maxSize = 2) | ||
| KubernetesClient client1 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); | ||
| KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); |
Copilot
AI
Oct 30, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable 'KubernetesClient client2' is never read.
| KubernetesClient client2 = (KubernetesClient) borrowObjectMethod.invoke(clusterClientPool); | |
| borrowObjectMethod.invoke(clusterClientPool); |
| String labelValue = kubernetesApplicationManagerContext.getLabelValue(); | ||
|
|
||
| // Build the latest watchList again | ||
| FilterWatchListDeletable<Pod, PodList, PodResource> newWatchList = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a new newWatchList?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is a newWatchList
|



Purpose of the pull request
This pull request introduces support for the Kubernetes client connection pool, optimizing the interaction performance and resource management between Apache DolphinScheduler and Kubernetes clusters. By maintaining a reuse mechanism for client connections, this feature significantly reduces the overhead caused by frequent creation and destruction of Kubernetes clients, thereby improving the overall efficiency and stability of the system.
Brief change log
KubernetesClientPoolclass, which implements a singleton-pattern connection pool manager to maintain an independent connection pool for each Kubernetes cluster.PoolConfiginner class, which supports configuration items such as minimum idle connections, maximum idle connections, maximum connections, maximum waiting time, and idle timeout. These configurations can be customized through properties inTaskConstants.ClusterClientPoolinner class, which is responsible for managing client connections of specific clusters, including the creation, acquisition, return, and closure of connections.cleanupIdleClientsmethod every 30 seconds to clear timed-out idle connections, effectively saving resources.isClientValidmethod to ensure that the returned client connection remains valid.K8sUtilsandKubernetesApplicationManagerto enable them to acquire and return clients using the connection pool.Verify this pull request
This change includes comprehensive test cases and can be verified through the following methods:
Automated Testing
KubernetesClientPoolTest.testKubernetesClientPoolBasicFunctionverifies the basic operations of the connection pool (acquiring cluster ID, obtaining client, returning client, and closing the connection pool).KubernetesClientPoolTest.testKubernetesClientPoolConfigverifies the correctness of connection pool configurations.KubernetesClientPoolTest.testClusterIdGenerationverifies the function of generating a unique cluster identifier based on kubeconfig.KubernetesClientPoolTest.testSingletonPatternverifies the singleton pattern implementation of the connection pool.KubernetesClientPoolTest.testClusterClientPoolBorrowObjectTimeoutverifies the handling mechanism for client acquisition timeouts.KubernetesClientPoolTest.testCleanupIdleClientsandKubernetesClientPoolTest.testClusterClientPoolCleanupIdleverify the periodic cleanup function of idle connections.Manual Verification Steps
k8s.client.pool.max.size) are set in the configuration file.Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md