diff --git a/CHANGES.txt b/CHANGES.txt index b5400d6ab..b6752ecde 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.3.0 ----- + * Adding gossip based safety check to Live Migration data copy task endpoint (CASSSIDECAR-409) * CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308) * SchemaStorePublisherFactory should be Injectable in CachingSchemaStore (CASSSIDECAR-408) * Fix StorageClientTest Docker API compatibility and improve CI test reporting (CASSSIDECAR-410) diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java index ebb447d69..f475c9b5f 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java @@ -45,12 +45,40 @@ public class LiveMigrationDataCopyRequest public final int maxConcurrency; /** - * Creates a new request with auto-generated ID. + * Batch size for fetching gossip information from cluster instances. + * During live migration safety checks, gossip info is fetched from instances in parallel batches. + * This controls how many instances are contacted simultaneously per batch. + * It is an optional parameter. Server will apply default if not specified. + */ + public final Integer gossipFetchBatchSize; + + /** + * Maximum number of batch retry attempts when fetching gossip information. + * If all instances in a batch fail to return gossip info, the next batch is tried. + * This limits the total number of batch attempts across all instances. + * It is an optional parameter. Server will apply default if not specified. + */ + public final Integer gossipFetchMaxRetries; + + /** + * Flag to skip gossip-based safety checks before data copy. + * When true, the safety validation that ensures destination is not present in cluster gossip + * will be skipped. Use with caution - skipping this check may lead to data loss if the + * destination node has been started. + * Optional parameter - defaults to false (performs safety checks) if not specified. + */ + public final Boolean skipGossipCheck; + + /** + * Creates a new live migration data copy request. */ @JsonCreator public LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int maxIterations, @JsonProperty("successThreshold") double successThreshold, - @JsonProperty("maxConcurrency") int maxConcurrency) + @JsonProperty("maxConcurrency") int maxConcurrency, + @JsonProperty("gossipFetchBatchSize") Integer gossipFetchBatchSize, + @JsonProperty("gossipFetchMaxRetries") Integer gossipFetchMaxRetries, + @JsonProperty("skipGossipCheck") Boolean skipGossipCheck) { if (maxIterations <= 0) @@ -71,8 +99,24 @@ public LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int maxIterat + ". It cannot be less than or equal to zero."); } + // Validate optional gossip fetch parameters if specified + if (gossipFetchBatchSize != null && gossipFetchBatchSize <= 0) + { + throw new IllegalArgumentException("Invalid gossipFetchBatchSize " + gossipFetchBatchSize + + ". It must be greater than zero when specified."); + } + + if (gossipFetchMaxRetries != null && gossipFetchMaxRetries <= 0) + { + throw new IllegalArgumentException("Invalid gossipFetchMaxRetries " + gossipFetchMaxRetries + + ". It must be greater than zero when specified."); + } + this.maxIterations = maxIterations; this.successThreshold = successThreshold; this.maxConcurrency = maxConcurrency; + this.gossipFetchBatchSize = gossipFetchBatchSize; + this.gossipFetchMaxRetries = gossipFetchMaxRetries; + this.skipGossipCheck = skipGossipCheck; } } diff --git a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequestTest.java b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequestTest.java index e1c9ab8e0..f37d46055 100644 --- a/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequestTest.java +++ b/client-common/src/test/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequestTest.java @@ -32,7 +32,7 @@ class LiveMigrationDataCopyRequestTest @Test void testSerializationDeserializationRoundTrip() throws Exception { - LiveMigrationDataCopyRequest original = new LiveMigrationDataCopyRequest(5, 0.95, 10); + LiveMigrationDataCopyRequest original = new LiveMigrationDataCopyRequest(5, 0.95, 10, null, null, null); String json = objectMapper.writeValueAsString(original); LiveMigrationDataCopyRequest deserialized = objectMapper.readValue(json, LiveMigrationDataCopyRequest.class); @@ -42,10 +42,26 @@ void testSerializationDeserializationRoundTrip() throws Exception assertThat(deserialized.maxConcurrency).isEqualTo(original.maxConcurrency); } + @Test + void testSerializationDeserializationRoundTripWithOptionalValues() throws Exception + { + LiveMigrationDataCopyRequest original = new LiveMigrationDataCopyRequest(5, 0.95, 10, 5, 5, true); + + String json = objectMapper.writeValueAsString(original); + LiveMigrationDataCopyRequest deserialized = objectMapper.readValue(json, LiveMigrationDataCopyRequest.class); + + assertThat(deserialized.maxIterations).isEqualTo(original.maxIterations); + assertThat(deserialized.successThreshold).isEqualTo(original.successThreshold); + assertThat(deserialized.maxConcurrency).isEqualTo(original.maxConcurrency); + assertThat(deserialized.gossipFetchBatchSize).isEqualTo(original.gossipFetchBatchSize); + assertThat(deserialized.gossipFetchMaxRetries).isEqualTo(original.gossipFetchMaxRetries); + assertThat(deserialized.skipGossipCheck).isEqualTo(original.skipGossipCheck); + } + @Test void testConstructorWithInvalidMaxIterationsThrowsException() { - assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(0, 0.95, 10)) + assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(0, 0.95, 10, null, null, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid maxIterations 0. It cannot be less than or equal to zero."); } @@ -53,7 +69,7 @@ void testConstructorWithInvalidMaxIterationsThrowsException() @Test void testConstructorWithNegativeMaxIterationsThrowsException() { - assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(-5, 0.95, 10)) + assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(-5, 0.95, 10, null, null, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid maxIterations -5. It cannot be less than or equal to zero."); } @@ -61,7 +77,7 @@ void testConstructorWithNegativeMaxIterationsThrowsException() @Test void testConstructorWithInvalidSuccessThresholdBelowZeroThrowsException() { - assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, -0.1, 10)) + assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, -0.1, 10, null, null, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid successThreshold -0.1. It cannot be less than zero or greater than one."); } @@ -69,7 +85,7 @@ void testConstructorWithInvalidSuccessThresholdBelowZeroThrowsException() @Test void testConstructorWithInvalidSuccessThresholdAboveOneThrowsException() { - assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 1.5, 10)) + assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 1.5, 10, null, null, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid successThreshold 1.5. It cannot be less than zero or greater than one."); } @@ -77,7 +93,7 @@ void testConstructorWithInvalidSuccessThresholdAboveOneThrowsException() @Test void testConstructorWithInvalidMaxConcurrencyThrowsException() { - assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 0)) + assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 0, null, null, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid maxConcurrency 0. It cannot be less than or equal to zero."); } @@ -85,7 +101,7 @@ void testConstructorWithInvalidMaxConcurrencyThrowsException() @Test void testConstructorWithNegativeMaxConcurrencyThrowsException() { - assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, -3)) + assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, -3, null, null, null)) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Invalid maxConcurrency -3. It cannot be less than or equal to zero."); } @@ -93,7 +109,7 @@ void testConstructorWithNegativeMaxConcurrencyThrowsException() @Test void testValidBoundaryValues() { - LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 0.0, 1); + LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 0.0, 1, null, null, null); assertThat(request.maxIterations).isEqualTo(1); assertThat(request.successThreshold).isEqualTo(0.0); @@ -105,10 +121,29 @@ void testValidBoundaryValuesUpperBound() { LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(Integer.MAX_VALUE, 1.0, - Integer.MAX_VALUE); + Integer.MAX_VALUE, + null, + null, + null); assertThat(request.maxIterations).isEqualTo(Integer.MAX_VALUE); assertThat(request.successThreshold).isEqualTo(1.0); assertThat(request.maxConcurrency).isEqualTo(Integer.MAX_VALUE); } + + @Test + void testConstructorWithInvalidGossipFetchBatchSizeThrowsException() + { + assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 10, 0, null, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid gossipFetchBatchSize 0. It must be greater than zero when specified."); + } + + @Test + void testConstructorWithInvalidGossipFetchMaxRetriesThrowsException() + { + assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 10, null, -1, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid gossipFetchMaxRetries -1. It must be greater than zero when specified."); + } } diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index 6d6068c7d..421a10766 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -257,6 +257,19 @@ public CompletableFuture gossipInfo() return executor.executeRequestAsync(requestBuilder().gossipInfoRequest().build()); } + /** + * Executes the gossip info request using the default retry policy and provided {@code instance} + * + * @param instance the instance where the request will be executed + * @return a completable future of the gossip info + */ + public CompletableFuture gossipInfo(SidecarInstance instance) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .gossipInfoRequest() + .build()); + } /** * Executes the GET gossip health request using the default retry policy and configured selection policy diff --git a/conf/sidecar.yaml b/conf/sidecar.yaml index 62e521858..a01ac301c 100644 --- a/conf/sidecar.yaml +++ b/conf/sidecar.yaml @@ -484,6 +484,8 @@ live_migration: migration_map: # Map of source and destination Cassandra instances # localhost1: localhost4 # This entry says that localhost1 will be migrated to localhost4 max_concurrent_downloads: 20 # Maximum number of concurrent downloads allowed + gossip_fetch_batch_size: 3 # Batch size for fetching gossip information from cluster instances during safety checks + gossip_fetch_max_retries: 3 # Maximum number of batch retry attempts when fetching gossip information # Configuration to allow sidecar start and stop Cassandra instances via the lifecycle API (disabled by default) lifecycle: diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/LiveMigrationConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/LiveMigrationConfiguration.java index 392b8a20a..c8c517891 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/LiveMigrationConfiguration.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/LiveMigrationConfiguration.java @@ -52,4 +52,20 @@ public interface LiveMigrationConfiguration * Maximum number of concurrent downloads allowed. */ int maxConcurrentDownloads(); + + /** + * Batch size for fetching gossip information from cluster instances during safety checks. + * Controls how many instances are contacted simultaneously per batch. + * + * @return the batch size for gossip fetching + */ + int gossipFetchBatchSize(); + + /** + * Maximum number of batch retry attempts when fetching gossip information. + * If all instances in a batch fail to return gossip info, the next batch is tried. + * + * @return the maximum number of retry attempts + */ + int gossipFetchMaxRetries(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LiveMigrationConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LiveMigrationConfigurationImpl.java index a1e8c7104..54da134a6 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LiveMigrationConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/LiveMigrationConfigurationImpl.java @@ -33,22 +33,29 @@ public class LiveMigrationConfigurationImpl implements LiveMigrationConfiguratio { public static final int DEFAULT_MAX_CONCURRENT_DOWNLOADS = 20; + public static final int DEFAULT_GOSSIP_FETCH_BATCH_SIZE = 3; + public static final int DEFAULT_GOSSIP_FETCH_MAX_RETRIES = 3; private final Set filesToExclude; private final Set directoriesToExclude; private final Map migrationMap; private final int maxConcurrentDownloads; + private final int gossipFetchBatchSize; + private final int gossipFetchMaxRetries; public LiveMigrationConfigurationImpl() { - this(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), DEFAULT_MAX_CONCURRENT_DOWNLOADS); + this(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), + DEFAULT_MAX_CONCURRENT_DOWNLOADS, DEFAULT_GOSSIP_FETCH_BATCH_SIZE, DEFAULT_GOSSIP_FETCH_MAX_RETRIES); } @JsonCreator public LiveMigrationConfigurationImpl(@JsonProperty("files_to_exclude") Set filesToExclude, @JsonProperty("dirs_to_exclude") Set directoriesToExclude, @JsonProperty("migration_map") Map migrationMap, - @JsonProperty("max_concurrent_downloads") int maxConcurrentDownloads) + @JsonProperty("max_concurrent_downloads") int maxConcurrentDownloads, + @JsonProperty("gossip_fetch_batch_size") Integer gossipFetchBatchSize, + @JsonProperty("gossip_fetch_max_retries") Integer gossipFetchMaxRetries) { this.filesToExclude = filesToExclude; this.directoriesToExclude = directoriesToExclude; @@ -60,6 +67,24 @@ public LiveMigrationConfigurationImpl(@JsonProperty("files_to_exclude") Set= 1"); } this.maxConcurrentDownloads = maxConcurrentDownloads; + + if (gossipFetchBatchSize != null && gossipFetchBatchSize < 1) + { + throw new IllegalArgumentException("Invalid gossip_fetch_batch_size " + gossipFetchBatchSize + + ". It must be >= 1"); + } + this.gossipFetchBatchSize = gossipFetchBatchSize == null + ? DEFAULT_GOSSIP_FETCH_BATCH_SIZE + : gossipFetchBatchSize; + + if (gossipFetchMaxRetries != null && gossipFetchMaxRetries < 1) + { + throw new IllegalArgumentException("Invalid gossip_fetch_max_retries " + gossipFetchMaxRetries + + ". It must be >= 1"); + } + this.gossipFetchMaxRetries = gossipFetchMaxRetries == null + ? DEFAULT_GOSSIP_FETCH_MAX_RETRIES + : gossipFetchMaxRetries; } @Override @@ -89,4 +114,18 @@ public int maxConcurrentDownloads() { return maxConcurrentDownloads; } + + @Override + @JsonProperty("gossip_fetch_batch_size") + public int gossipFetchBatchSize() + { + return gossipFetchBatchSize; + } + + @Override + @JsonProperty("gossip_fetch_max_retries") + public int gossipFetchMaxRetries() + { + return gossipFetchMaxRetries; + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java index b6875d982..14fe60b6e 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java @@ -103,7 +103,7 @@ protected void handleInternal(RoutingContext context, .onFailure(throwable -> { if (throwable instanceof LiveMigrationInvalidRequestException) { - LOGGER.error("Input payload is not valid.", throwable); + LOGGER.error("Invalid live migration request.", throwable); context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, throwable.getMessage(), throwable)); } else if (throwable instanceof LiveMigrationDataCopyInProgressException) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java index 719b93d35..d580c8872 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java @@ -31,10 +31,13 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import io.vertx.core.Future; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException; import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; @@ -52,17 +55,20 @@ public class DataCopyTaskManager @VisibleForTesting final ConcurrentHashMap currentTasks = new ConcurrentHashMap<>(); + private final ExecutorPools executorPools; private final InstancesMetadata instancesMetadata; private final SidecarConfiguration sidecarConfiguration; private final LiveMigrationMap liveMigrationMap; private final LiveMigrationTaskFactory liveMigrationTaskFactory; @Inject - public DataCopyTaskManager(InstancesMetadata instancesMetadata, + public DataCopyTaskManager(ExecutorPools executorPools, + InstancesMetadata instancesMetadata, SidecarConfiguration sidecarConfiguration, LiveMigrationMap liveMigrationMap, LiveMigrationTaskFactory liveMigrationTaskFactory) { + this.executorPools = executorPools; this.instancesMetadata = instancesMetadata; this.sidecarConfiguration = sidecarConfiguration; this.liveMigrationMap = liveMigrationMap; @@ -101,39 +107,85 @@ Future createDataCopyTask(LiveMigrationDataCopyRequest reques String source, InstanceMetadata localInstanceMetadata) { - LiveMigrationTask newTask = createTask(request, - source, - sidecarConfiguration.serviceConfiguration().port(), - localInstanceMetadata); - - // It is possible to serve only one live migration data copy request per instance at a time. - // Checking if there is another migration is in progress before accepting new one. - boolean accepted = currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> { - if (taskInMap == null) + // Fast local JMX check before creating task - prevents task creation if Cassandra is running + return verifyCassandraNotRunning(localInstanceMetadata) + .compose(v -> { + LiveMigrationTask newTask = createTask(request, + source, + sidecarConfiguration.serviceConfiguration().port(), + localInstanceMetadata); + + // It is possible to serve only one live migration data copy request per instance at a time. + // Checking if there is another migration is in progress before accepting new one. + boolean accepted = currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> { + if (taskInMap == null) + { + return newTask; + } + + if (!taskInMap.isCompleted()) + { + // Accept new task if and only if the existing task has completed. + return taskInMap; + } + else + { + return newTask; + } + }) == newTask; + + if (!accepted) + { + return Future.failedFuture( + new LiveMigrationDataCopyInProgressException("Another task is already under progress. Cannot accept new task.")); + } + LOGGER.info("Starting data copy task with id={}, source={}, destination={}", + newTask.id(), source, localInstanceMetadata.host()); + newTask.start(); + return Future.succeededFuture(newTask); + }); + } + + /** + * Initiating data copy once a Cassandra instance starts is not acceptable. This method checks whether + * Cassandra is running or not at the moment on the destination instance by checking if Sidecar + * was able to connect to the Cassandra instance's JMX port. It returns a failed future if Sidecar + * is able to connect to the JMX port of Cassandra. + * + * @param localInstance metadata for the local Cassandra instance + * @return Future that succeeds if Cassandra is not running, fails if it is running + */ + private Future verifyCassandraNotRunning(InstanceMetadata localInstance) + { + return executorPools.internal().executeBlocking(() -> { + try { - return newTask; - } + CassandraAdapterDelegate delegate = localInstance.delegate(); - if (!taskInMap.isCompleted()) + if (delegate.isJmxUp()) + { + throw new LiveMigrationInvalidRequestException( + "Cannot start data copy: Cassandra is currently running on this instance " + + "(JMX connectivity established). Data copy cannot proceed while Cassandra is active."); + } + + // JMX is down - Cassandra is not running (or at least wasn't during last health check) + LOGGER.debug("Local JMX check passed: Cassandra not detected as running on {}", localInstance.host()); + return null; + } + catch (CassandraUnavailableException e) { - // Accept new task if and only if the existing task has completed. - return taskInMap; + // No delegate available - Cassandra is not running + LOGGER.debug("No Cassandra delegate available for {} (Cassandra not running)", localInstance.host()); + return null; } - else + catch (Exception e) { - return newTask; + // Unexpected error - be conservative and reject for safety + LOGGER.warn("Unable to verify Cassandra status on {}, rejecting for safety", localInstance.host(), e); + throw e; } - }) == newTask; - - if (!accepted) - { - return Future.failedFuture( - new LiveMigrationDataCopyInProgressException("Another task is already under progress. Cannot accept new task.")); - } - LOGGER.info("Starting data copy task with id={}, source={}, destination={}", - newTask.id(), source, localInstanceMetadata.host()); - newTask.start(); - return Future.succeededFuture(newTask); + }); } LiveMigrationTask createTask(LiveMigrationDataCopyRequest request, diff --git a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/GossipInfoFetcher.java b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/GossipInfoFetcher.java new file mode 100644 index 000000000..db57968a5 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/GossipInfoFetcher.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; +import org.jetbrains.annotations.VisibleForTesting; + +/** + * Responsible for fetching gossip information from cluster instances. + * Uses a batch-based parallel approach to minimize network calls and maximize efficiency. + */ +class GossipInfoFetcher +{ + private static final Logger LOGGER = LoggerFactory.getLogger(GossipInfoFetcher.class); + + private final SidecarClient sidecarClient; + private final InstancesMetadata instancesMetadata; + private final int batchSize; + private final int maxRetries; + private final int sidecarPort; + + GossipInfoFetcher(SidecarClient sidecarClient, + InstancesMetadata instancesMetadata, + int sidecarPort, + int batchSize, + int maxRetries) + { + this.sidecarClient = sidecarClient; + this.instancesMetadata = instancesMetadata; + this.batchSize = batchSize; + this.maxRetries = maxRetries; + this.sidecarPort = sidecarPort; + } + + /** + * Fetches gossip info from cluster instances using a batch-based parallel approach. + * Tries instances in batches, returning as soon as any instance successfully provides gossip info. + * + * @return Future with gossip response from the first successful instance + */ + Future fetchGossipInfo() + { + List allInstances = new ArrayList<>(instancesMetadata.instances()); + + if (allInstances.isEmpty()) + { + return Future.failedFuture("No instances available for gossip fetch"); + } + + LOGGER.info("Fetching gossip info from {} instances in batches of {}", allInstances.size(), batchSize); + + // Shuffle to ensure that random instances will be contacted in random manner + Collections.shuffle(allInstances); + + return fetchGossipFromBatch(allInstances, 0, batchSize, maxRetries, 1); + } + + /** + * Recursively fetches gossip from a batch of instances. + * Tries instances in parallel within each batch, returns on first success. + * + * @param instances all available instances + * @param startIndex starting index for this batch + * @param batchSize number of instances to try in parallel + * @param maxRetries maximum number of batch attempts + * @param attempt current attempt number + * @return Future with gossip response from first successful instance + */ + @VisibleForTesting + Future fetchGossipFromBatch(List instances, + int startIndex, + int batchSize, + int maxRetries, + int attempt) + { + if (attempt > maxRetries) + { + String errorMsg = String.format("Failed to fetch gossip after %d attempts across %d instances", + maxRetries, instances.size()); + LOGGER.error(errorMsg); + return Future.failedFuture(errorMsg); + } + + List batch = new ArrayList<>(); + for (int i = startIndex; i < Math.min(startIndex + batchSize, instances.size()); i++) + { + batch.add(instances.get(i)); + } + + if (batch.isEmpty()) + { + // No more instances to try, return failure + String errorMsg = "Failed to fetch gossip after trying all " + instances.size() + " instances"; + LOGGER.error(errorMsg); + return Future.failedFuture(errorMsg); + } + + LOGGER.info("Trying batch of {} instances (attempt {}/{})", batch.size(), attempt, maxRetries); + + // Try all instances in this batch in parallel + List> batchFutures = batch.stream() + .map(this::fetchGossipFromInstance) + .collect(Collectors.toList()); + + return Future.any(batchFutures) + .compose(this::extractSuccessfulResult, + cause -> retryFetchGossipInfo(instances, startIndex, batchSize, + maxRetries, attempt, cause)); + } + + private Future extractSuccessfulResult(CompositeFuture cf) + { + for (int i = 0; i < cf.size(); i++) + { + if (cf.succeeded(i)) + { + LOGGER.info("Successfully fetched gossip info"); + return Future.succeededFuture(cf.resultAt(i)); + } + } + return Future.failedFuture("Failed to fetch gossip info"); + } + + private Future retryFetchGossipInfo(List instances, + int startIndex, + int batchSize, + int maxRetries, + int attempt, + Throwable cause) + { + LOGGER.debug("Batch failed, trying next batch: {}", cause.getMessage()); + int nextIndex = startIndex + batchSize; + return fetchGossipFromBatch(instances, nextIndex, batchSize, maxRetries, attempt + 1); + } + + /** + * Fetches gossip info from a single instance using the default retry policy of the SidecarClient. + * + * @param instance the instance to fetch gossip from + * @return Future with gossip response + */ + private Future fetchGossipFromInstance(InstanceMetadata instance) + { + SidecarInstanceImpl sidecarInstance = new SidecarInstanceImpl(instance.host(), sidecarPort); + + LOGGER.debug("Fetching gossip from {}", instance.host()); + + return Future.fromCompletionStage(sidecarClient.gossipInfo(sidecarInstance)) + .onSuccess(response -> + LOGGER.debug("Successfully fetched gossip from {}", instance.host())) + .onFailure(error -> + LOGGER.debug("Failed to fetch gossip from {}: {}", + instance.host(), error.getMessage())); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java index 4f918808f..adff7a812 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.livemigration; import java.io.IOException; +import java.net.UnknownHostException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.attribute.FileTime; @@ -45,9 +46,11 @@ import io.vertx.core.file.FileSystemException; import org.apache.cassandra.sidecar.client.SidecarClient; import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.DataObjectBuilder; import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest; +import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; import org.apache.cassandra.sidecar.common.response.InstanceFileInfo; import org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType; import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse; @@ -75,6 +78,7 @@ class LiveMigrationFileDownloader private final Consumer statusUpdater; private final InstanceMetadata instanceMetadata; private final LiveMigrationConfiguration liveMigrationConfiguration; + private final InstancesMetadata instancesMetadata; private final SidecarClient sidecarClient; private final String id; private final String source; @@ -93,6 +97,7 @@ protected LiveMigrationFileDownloader(Builder builder) this.statusUpdater = builder.statusUpdater; this.instanceMetadata = builder.instanceMetadata; this.liveMigrationConfiguration = builder.liveMigrationConfiguration; + this.instancesMetadata = builder.instancesMetadata; this.id = builder.id; this.source = builder.source; this.port = builder.port; @@ -115,7 +120,8 @@ public static Builder builder() */ public Future downloadFiles() { - return checkLiveMigrationStatusOfSource() + return checkGossip() + .compose(v -> checkLiveMigrationStatusOfSource()) .compose(v -> fetchSourceFileList()) .compose(this::cleanupUnnecessaryFiles) .compose(this::prepareDownloadList) @@ -124,6 +130,130 @@ public Future downloadFiles() .otherwise(this::handleDownloadFailure); } + /** + * Validates via cluster gossip that destination has not been started. + * This is a best-effort safety check performed before each file download iteration. + *

+ * Fetches gossip info from cluster instances and checks: + *

    + *
  • Source node is present in gossip
  • + *
  • Destination node is NOT present in gossip
  • + *
+ *

+ * Note: This cannot guarantee destination won't start during data copy. + * Operators must ensure destination is not started until migration completes. + * + * @return Future that succeeds if validation passes, fails with + * {@link LiveMigrationInvalidRequestException} if unsafe condition detected + */ + private Future checkGossip() + { + // Skip gossip check if explicitly requested + if (request.skipGossipCheck != null && request.skipGossipCheck) + { + LOGGER.warn("{} Skipping gossip safety check as requested. " + + "This bypasses validation that destination has not been started.", logPrefix); + return Future.succeededFuture(); + } + + String destination = instanceMetadata.host(); + LOGGER.debug("{} Validating gossip state: source={}, destination={}", logPrefix, source, destination); + + // Apply configuration defaults for optional gossip fetch parameters if not specified by client + int batchSize = request.gossipFetchBatchSize != null + ? request.gossipFetchBatchSize + : liveMigrationConfiguration.gossipFetchBatchSize(); + int maxRetries = request.gossipFetchMaxRetries != null + ? request.gossipFetchMaxRetries + : liveMigrationConfiguration.gossipFetchMaxRetries(); + + GossipInfoFetcher gossipFetcher = new GossipInfoFetcher( + sidecarClient, + instancesMetadata, + port, + batchSize, + maxRetries); + + return gossipFetcher.fetchGossipInfo() + .compose(gossipResponse -> { + try + { + return validateGossipResponse(gossipResponse, source, destination); + } + catch (UnknownHostException e) + { + return Future.failedFuture(e); + } + }) + .onSuccess(v -> LOGGER.debug("{} Gossip validation passed", logPrefix)) + .onFailure(err -> LOGGER.error("{} Gossip validation failed: {}", logPrefix, err.getMessage())); + } + + /** + * Validates the gossip response to ensure source is present in gossip and destination is not present. + * + * @param gossipResponse the gossip information from a healthy node + * @param source the source instance to check for + * @param destination the destination instance to check for + * @return Future that succeeds if source is found and destination not found, + * fails if source is not found or destination is found in gossip + */ + private Future validateGossipResponse(GossipInfoResponse gossipResponse, + String source, + String destination) throws UnknownHostException + { + GossipInfoResponse.GossipInfo srcInfo = findGossipInfo(gossipResponse, source); + if (srcInfo == null) + { + String errorMsg = "SAFETY CHECK FAILED: Source node '" + source + "' not found in cluster gossip. " + + "Cannot proceed with data copy. Please fix the source/cluster " + + "and then re-trigger data copy task if required"; + LOGGER.error("{} {}", logPrefix, errorMsg); + return Future.failedFuture(new LiveMigrationInvalidRequestException(errorMsg)); + } + + GossipInfoResponse.GossipInfo destInfo = findGossipInfo(gossipResponse, destination); + if (destInfo != null) + { + // Destination found in gossip - it was started! + String status = destInfo.statusWithPort(); + String errorMsg = String.format( + "SAFETY CHECK FAILED: Destination node '%s' found in cluster gossip with status '%s'. " + + "This indicates Cassandra was previously started on the destination. " + + "Data copy would overwrite potentially newer data, causing DATA LOSS. Aborting.", + destination, status); + + LOGGER.error("{} {}", logPrefix, errorMsg); + return Future.failedFuture(new LiveMigrationInvalidRequestException(errorMsg)); + } + + // Reached here means, source found and destination not found in gossip - safe to proceed + LOGGER.info("{} Gossip validation passed: destination {} not found in cluster gossip", + logPrefix, destination); + return Future.succeededFuture(); + } + + private GossipInfoResponse.GossipInfo findGossipInfo(GossipInfoResponse gossipResponse, + String instance) throws UnknownHostException + { + InstanceMetadata metadata = instancesMetadata.instanceFromHost(instance); + try + { + // InstanceMetadata may not have ip address populated. Calling 'refreshIpAddress' to ensure + // ip address is available. + metadata.refreshIpAddress(); + } + catch (UnknownHostException e) + { + LOGGER.error("{} Failed to resolve ipAddress for instance {}", logPrefix, instance, e); + throw e; + } + String ipAddress = metadata.ipAddress(); + // Composed Gossip info key based on javadoc from: + // org.apache.cassandra.sidecar.common.response.GossipInfoResponse.get + String gossipInfoKey = "/" + ipAddress + ":" + metadata.storagePort(); + return gossipResponse.get(gossipInfoKey); + } /** * Checks whether the live migration status at the source is NOT_COMPLETED or COMPLETED. @@ -569,6 +699,7 @@ static class Builder implements DataObjectBuilder statusUpdater; private InstanceMetadata instanceMetadata; private LiveMigrationConfiguration liveMigrationConfiguration; + private InstancesMetadata instancesMetadata; private String id; private String source; private int port; @@ -660,6 +791,17 @@ public Builder liveMigrationConfiguration(LiveMigrationConfiguration liveMigrati return update(b -> b.liveMigrationConfiguration = liveMigrationConfiguration); } + /** + * Sets the {@code instancesMetadata} and returns a reference to this Builder enabling method chaining. + * + * @param instancesMetadata the {@code instancesMetadata} to set + * @return a reference to this Builder + */ + public Builder instancesMetadata(InstancesMetadata instancesMetadata) + { + return update(b -> b.instancesMetadata = instancesMetadata); + } + /** * Sets the {@code id} and returns a reference to this Builder enabling method chaining. * @@ -717,6 +859,7 @@ public LiveMigrationFileDownloader build() Objects.requireNonNull(sidecarClient); Objects.requireNonNull(statusUpdater); Objects.requireNonNull(liveMigrationConfiguration); + Objects.requireNonNull(instancesMetadata); Objects.requireNonNull(id); Objects.requireNonNull(request); Objects.requireNonNull(source); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java index bc98c2cc9..50e9561f5 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; @@ -39,17 +40,20 @@ public class LiveMigrationTaskFactoryImpl implements LiveMigrationTaskFactory private final SidecarClientProvider sidecarClientProvider; private final LiveMigrationConfiguration liveMigrationConfiguration; private final ExecutorPools executorPools; + private final InstancesMetadata instancesMetadata; @Inject public LiveMigrationTaskFactoryImpl(Vertx vertx, ExecutorPools executorPools, SidecarClientProvider sidecarClientProvider, - SidecarConfiguration sidecarConfiguration) + SidecarConfiguration sidecarConfiguration, + InstancesMetadata instancesMetadata) { this.vertx = vertx; this.executorPools = executorPools; this.sidecarClientProvider = sidecarClientProvider; this.liveMigrationConfiguration = sidecarConfiguration.liveMigrationConfiguration(); + this.instancesMetadata = instancesMetadata; } /** @@ -63,6 +67,6 @@ public LiveMigrationTask create(String id, InstanceMetadata instanceMetadata) { return new LiveMigrationTaskImpl(vertx, executorPools, sidecarClientProvider, liveMigrationConfiguration, - id, request, source, port, instanceMetadata); + instancesMetadata, id, request, source, port, instanceMetadata); } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java index 723f35de7..9e0988950 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java @@ -29,6 +29,7 @@ import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest; import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskResponse; @@ -52,6 +53,7 @@ public class LiveMigrationTaskImpl implements LiveMigrationTask private final ExecutorPools executorPools; private final SidecarClientProvider sidecarClientProvider; private final LiveMigrationConfiguration liveMigrationConfiguration; + private final InstancesMetadata instancesMetadata; // Indicates overall status of the operation (succeeded or failed). // Future returned by downloader changes on next iteration. Using a separate future to track overall operation. @@ -65,6 +67,7 @@ public LiveMigrationTaskImpl(Vertx vertx, ExecutorPools executorPools, SidecarClientProvider sidecarClientProvider, LiveMigrationConfiguration liveMigrationConfiguration, + InstancesMetadata instancesMetadata, String id, LiveMigrationDataCopyRequest request, String source, @@ -80,6 +83,7 @@ public LiveMigrationTaskImpl(Vertx vertx, this.instanceMetadata = instanceMetadata; this.source = source; this.port = port; + this.instancesMetadata = instancesMetadata; } /** @@ -125,6 +129,7 @@ Future startInternal(int iteration) .request(request) .iteration(iteration) .statusUpdater(this.statusUpdater(iteration)) + .instancesMetadata(instancesMetadata) .instanceMetadata(instanceMetadata) .liveMigrationConfiguration(liveMigrationConfiguration) .source(source) diff --git a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java index e33463d90..8b10e9876 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java @@ -560,6 +560,10 @@ void testSidecarLiveMigrationConfiguration() throws IOException assertThat(liveMigrationConfiguration.migrationMap()).isNotNull() .hasSize(1) .containsEntry("localhost1", "localhost4"); + + assertThat(liveMigrationConfiguration.maxConcurrentDownloads()).isEqualTo(20); + assertThat(liveMigrationConfiguration.gossipFetchBatchSize()).isEqualTo(5); + assertThat(liveMigrationConfiguration.gossipFetchMaxRetries()).isEqualTo(2); } @Test diff --git a/server/src/test/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationMapSidecarConfigImplTest.java b/server/src/test/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationMapSidecarConfigImplTest.java index 12bf6a106..de06d0c53 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationMapSidecarConfigImplTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationMapSidecarConfigImplTest.java @@ -48,7 +48,9 @@ void testMigrationMap() new LiveMigrationConfigurationImpl(Collections.emptySet(), Collections.emptySet(), Map.of("localhost1", "localhost4"), - 20); + 20, + 5, + 5); SidecarConfigurationImpl sidecarConfig = SidecarConfigurationImpl.builder() @@ -89,7 +91,9 @@ void testLiveMigrationNotConfigured() new LiveMigrationConfigurationImpl(Collections.emptySet(), Collections.emptySet(), Map.of(), - 20); + 20, + 5, + 5); SidecarConfigurationImpl sidecarConfig = SidecarConfigurationImpl.builder() diff --git a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java index 996663753..703f2c206 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java @@ -33,13 +33,17 @@ import com.google.inject.Injector; import io.vertx.core.Future; import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest; import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration; import org.apache.cassandra.sidecar.config.ServiceConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.config.yaml.WorkerPoolConfigurationImpl; +import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException; import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException; import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; @@ -47,11 +51,13 @@ import org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationMap; import org.jetbrains.annotations.NotNull; +import static org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL_AND_JMX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -98,7 +104,7 @@ public void testCreateTaskSuccess() throws InterruptedException { Injector injector = getInjector(); DataCopyTaskManager dataCopyTaskManager = getDataCopyTaskManager(injector); - LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 2); + LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 2, null, null, null); Future future = dataCopyTaskManager.createTask(request, dest1Name); awaitForFuture(future); @@ -113,7 +119,7 @@ public void testCreateTaskWithMaxConcurrencyExceeded() throws InterruptedExcepti { Injector injector = getInjector(); DataCopyTaskManager dataCopyTaskManager = getDataCopyTaskManager(injector); - LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 10); // exceeds max concurrency of 5 + LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 10, null, null, null); // exceeds max concurrency of 5 Future future = dataCopyTaskManager.createTask(request, dest1Name); awaitForFuture(future); @@ -133,7 +139,7 @@ public void testCreateTaskWhenAnotherTaskInProgress() throws InterruptedExceptio LiveMigrationTask inProgressTask = getInProgressTask("existing-task"); dataCopyTaskManager.currentTasks.put(dest1Id, inProgressTask); - LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 2); + LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 2, null, null, null); Future future = dataCopyTaskManager.createTask(request, dest1Name); awaitForFuture(future); @@ -152,7 +158,7 @@ public void testCreateTaskWhenPreviousTaskCompleted() throws InterruptedExceptio LiveMigrationTask completedTask = getSucceededTask("completed-task", source1Name); dataCopyTaskManager.currentTasks.put(dest1Id, completedTask); - LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 2); + LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 2, null, null, null); Future future = dataCopyTaskManager.createTask(request, dest1Name); awaitForFuture(future); @@ -161,6 +167,48 @@ public void testCreateTaskWhenPreviousTaskCompleted() throws InterruptedExceptio assertThat(future.result().id()).isNotEqualTo("completed-task"); } + @Test + public void testCreateTaskShouldFailWhenCassandraInstanceJMXIsUp() throws InterruptedException + { + Injector injector = getInjector(); + DataCopyTaskManager dataCopyTaskManager = getDataCopyTaskManager(injector); + InstancesMetadata instancesMetadata = injector.getInstance(InstancesMetadata.class); + InstanceMetadata destinationMetadata = instancesMetadata.instanceFromHost(dest1Name); + + // Mocking JMX as up + when(destinationMetadata.delegate().isJmxUp()).thenReturn(true); + + LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 2, null, null, null); + Future future = dataCopyTaskManager.createTask(request, dest1Name); + awaitForFuture(future); + + assertThat(future.succeeded()).isFalse(); + assertThat(future.failed()).isTrue(); + assertThat(future.result()).isNull(); + assertThat(future.cause()).isNotNull(); + } + + @Test + public void testCreateTaskShouldSucceedWhenCassandraAdapterIsNotAvailable() throws InterruptedException + { + Injector injector = getInjector(); + DataCopyTaskManager dataCopyTaskManager = getDataCopyTaskManager(injector); + InstancesMetadata instancesMetadata = injector.getInstance(InstancesMetadata.class); + InstanceMetadata destinationMetadata = instancesMetadata.instanceFromHost(dest1Name); + when(destinationMetadata.delegate()) + .thenThrow(new CassandraUnavailableException(CQL_AND_JMX, "CassandraAdapterDelegate is not available")); + + LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 2, null, null, null); + Future future = dataCopyTaskManager.createTask(request, dest1Name); + awaitForFuture(future); + + assertThat(future.succeeded()).isTrue(); + assertThat(future.failed()).isFalse(); + assertThat(future.result()).isNotNull(); + assertThat(future.result().id()).isNotNull(); + assertThat(future.cause()).isNull(); + } + @Test public void testGetTaskSuccess() { @@ -263,7 +311,7 @@ public void testConcurrentTaskCreation() throws InterruptedException startLatch.await(); LiveMigrationDataCopyRequest request - = new LiveMigrationDataCopyRequest(1, 1.0, 2); + = new LiveMigrationDataCopyRequest(1, 1.0, 2, null, null, null); Future future = dataCopyTaskManager.createTask(request, dest1Name); results.add(future); @@ -324,7 +372,7 @@ private void awaitForFuture(Future future) throws InterruptedException CountDownLatch latch = new CountDownLatch(1); future.onComplete(res -> latch.countDown()); - latch.await(100, TimeUnit.MILLISECONDS); + latch.await(2, TimeUnit.SECONDS); } private DataCopyTaskManager getDataCopyTaskManager(Injector injector) @@ -333,8 +381,10 @@ private DataCopyTaskManager getDataCopyTaskManager(Injector injector) SidecarConfiguration sidecarConfiguration = injector.getInstance(SidecarConfiguration.class); LiveMigrationMap liveMigrationMap = injector.getInstance(LiveMigrationMap.class); LiveMigrationTaskFactory liveMigrationTaskFactory = injector.getInstance(LiveMigrationTaskFactory.class); + Vertx vertx = injector.getInstance(Vertx.class); + ExecutorPools executorPools = new ExecutorPools(vertx, sidecarConfiguration.serviceConfiguration()); - return new DataCopyTaskManager(instancesMetadata, sidecarConfiguration, liveMigrationMap, + return new DataCopyTaskManager(executorPools, instancesMetadata, sidecarConfiguration, liveMigrationMap, liveMigrationTaskFactory); } @@ -342,7 +392,7 @@ private LiveMigrationTask getInProgressTask(@NotNull String taskId) { List statusList = List.of(new LiveMigrationTaskResponse.Status(0, "PREPARING", 500L, 1, 1, 1, 0, 0, 500L)); - LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 1); + LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 1, null, null, null); LiveMigrationTaskResponse response = new LiveMigrationTaskResponse(taskId, source1Name, 9043, request, statusList); return new FakeLiveMigrationTask(response); } @@ -351,7 +401,7 @@ private LiveMigrationTask getSucceededTask(@NotNull String taskId, @NotNull Stri { List statusList = List.of(new LiveMigrationTaskResponse.Status(0, "SUCCESS", 1000L, 1, 1, 1, 1, 0, 1000L)); - LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 1); + LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 1.0, 1, null, null, null); LiveMigrationTaskResponse response = new LiveMigrationTaskResponse(taskId, sourceHost, 9043, request, statusList); return new FakeLiveMigrationTask(response); } @@ -360,7 +410,7 @@ private static class DataCopyTaskManagerTestModule extends AbstractModule { private final LiveMigrationTaskFactory mockLiveMigrationTaskFactory = mock(LiveMigrationTaskFactory.class); private final SidecarConfiguration mockSidecarConfiguration = mock(SidecarConfiguration.class); - private final ServiceConfiguration mockServiceConfiguration = mock(ServiceConfiguration.class); + private final ServiceConfiguration mockServiceConfiguration = mock(ServiceConfiguration.class, RETURNS_DEEP_STUBS); private final LiveMigrationConfiguration mockLiveMigrationConfiguration = mock(LiveMigrationConfiguration.class); private final LiveMigrationMap mockLiveMigrationmap = mock(LiveMigrationMap.class); private final InstanceMetadata mockDest1InstanceMeta = mock(InstanceMetadata.class); @@ -381,6 +431,8 @@ protected void configure() // Configure SidecarConfiguration mocks when(mockSidecarConfiguration.serviceConfiguration()).thenReturn(mockServiceConfiguration); when(mockServiceConfiguration.port()).thenReturn(9043); + when(mockServiceConfiguration.serverWorkerPoolConfiguration()).thenReturn(new WorkerPoolConfigurationImpl()); + when(mockServiceConfiguration.serverInternalWorkerPoolConfiguration()).thenReturn(new WorkerPoolConfigurationImpl()); when(mockSidecarConfiguration.liveMigrationConfiguration()).thenReturn(mockLiveMigrationConfiguration); when(mockLiveMigrationConfiguration.maxConcurrentDownloads()).thenReturn(5); @@ -388,17 +440,21 @@ protected void configure() when(mockInstancesMetadata.instanceFromHost(dest1Name)).thenReturn(mockDest1InstanceMeta); when(mockDest1InstanceMeta.id()).thenReturn(dest1Id); when(mockDest1InstanceMeta.dataDirs()).thenReturn(List.of("/data1", "/data2")); + when(mockDest1InstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class)); when(mockInstancesMetadata.instanceFromHost(dest2Name)).thenReturn(mockDest2InstanceMeta); when(mockDest2InstanceMeta.id()).thenReturn(dest2Id); when(mockDest2InstanceMeta.dataDirs()).thenReturn(List.of("/data1", "/data2")); + when(mockDest2InstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class)); when(mockInstancesMetadata.instanceFromHost(dest3Name)).thenReturn(mockDest3InstanceMeta); when(mockDest3InstanceMeta.id()).thenReturn(dest3Id); when(mockDest3InstanceMeta.dataDirs()).thenReturn(List.of("/data1", "/data2")); + when(mockDest3InstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class)); when(mockInstancesMetadata.instanceFromHost(source1Name)).thenReturn(mockSourceInstanceMeta); when(mockSourceInstanceMeta.dataDirs()).thenReturn(List.of("/data1")); + when(mockSourceInstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class)); // Configure LiveMigrationTaskFactory to return fake tasks when(mockLiveMigrationTaskFactory.create(anyString(), any(LiveMigrationDataCopyRequest.class), anyString(), anyInt(), any(InstanceMetadata.class))).thenAnswer(invocation -> { diff --git a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/GossipInfoFetcherTest.java b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/GossipInfoFetcherTest.java new file mode 100644 index 000000000..0181f3ebc --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/GossipInfoFetcherTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.Future; +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.client.SidecarInstance; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("SameParameterValue") +class GossipInfoFetcherTest +{ + @Test + void testSuccessOnFirstBatch() throws InterruptedException + { + SidecarClient mockClient = mock(SidecarClient.class); + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + List instances = createMockInstances(2); + when(mockInstancesMetadata.instances()).thenReturn(instances); + + GossipInfoResponse mockResponse = new GossipInfoResponse(); + when(mockClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.completedFuture(mockResponse)); + + GossipInfoFetcher fetcher = new GossipInfoFetcher(mockClient, mockInstancesMetadata, 9043, 5, 5); + Future result = fetcher.fetchGossipInfo(); + + awaitForFuture(result); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.result()).isEqualTo(mockResponse); + } + + @Test + void testSuccessOnSecondBatchAfterFirstFails() throws InterruptedException + { + SidecarClient mockClient = mock(SidecarClient.class); + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + + // Create 10 instances - first 5 will fail, 6th will succeed + List instances = createMockInstances(6); + when(mockInstancesMetadata.instances()).thenReturn(instances); + + GossipInfoResponse mockResponse = new GossipInfoResponse(); + + // First 5 instances fail, 6th succeeds + when(mockClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.completedFuture(mockResponse)); + + GossipInfoFetcher fetcher = spy(new GossipInfoFetcher(mockClient, mockInstancesMetadata, 9043, 5, 5)); + Future result = fetcher.fetchGossipInfo(); + + awaitForFuture(result); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.result()).isEqualTo(mockResponse); + verify(fetcher, times(2)).fetchGossipFromBatch(anyList(), anyInt(), anyInt(), anyInt(), anyInt()); + } + + @Test + void testFailureWhenAllInstancesFail() throws InterruptedException + { + SidecarClient mockClient = mock(SidecarClient.class); + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + List instances = createMockInstances(2); + when(mockInstancesMetadata.instances()).thenReturn(instances); + when(mockClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))); + + GossipInfoFetcher fetcher = new GossipInfoFetcher(mockClient, mockInstancesMetadata, 9043, 5, 5); + Future result = fetcher.fetchGossipInfo(); + + awaitForFuture(result); + + assertThat(result.failed()).isTrue(); + assertThat(result.cause().getMessage()).contains("Failed to fetch gossip after trying all 2 instances"); + } + + @Test + void testFailureWhenNoInstancesAvailable() throws InterruptedException + { + SidecarClient mockClient = mock(SidecarClient.class); + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + when(mockInstancesMetadata.instances()).thenReturn(List.of()); + + GossipInfoFetcher fetcher = new GossipInfoFetcher(mockClient, mockInstancesMetadata, 9043, 5, 5); + Future result = fetcher.fetchGossipInfo(); + + awaitForFuture(result); + + assertThat(result.failed()).isTrue(); + assertThat(result.cause().getMessage()).isEqualTo("No instances available for gossip fetch"); + } + + @Test + void testPartialBatchSuccess() throws InterruptedException + { + SidecarClient mockClient = mock(SidecarClient.class); + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + + List instances = createMockInstances(3); + when(mockInstancesMetadata.instances()).thenReturn(instances); + + GossipInfoResponse mockResponse = new GossipInfoResponse(); + + // First instance fails, second succeeds + when(mockClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.completedFuture(mockResponse)); + + GossipInfoFetcher fetcher = spy(new GossipInfoFetcher(mockClient, mockInstancesMetadata, 9043, 5, 5)); + Future result = fetcher.fetchGossipInfo(); + + awaitForFuture(result); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.result()).isEqualTo(mockResponse); + verify(fetcher, times(1)) + .fetchGossipFromBatch(anyList(), anyInt(), anyInt(), anyInt(), anyInt()); + } + + @Test + void testReturnsFirstSuccessfulResponse() throws InterruptedException + { + SidecarClient mockClient = mock(SidecarClient.class); + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + List instances = createMockInstances(2); + when(mockInstancesMetadata.instances()).thenReturn(instances); + + GossipInfoResponse response1 = new GossipInfoResponse(); + GossipInfoResponse response2 = new GossipInfoResponse(); + + when(mockClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.completedFuture(response1)) + .thenReturn(CompletableFuture.completedFuture(response2)); + + GossipInfoFetcher fetcher = new GossipInfoFetcher(mockClient, mockInstancesMetadata, 9043, 5, 5); + Future result = fetcher.fetchGossipInfo(); + + awaitForFuture(result); + + assertThat(result.succeeded()).isTrue(); + // Should return one of the responses (Future.any returns first successful) + assertThat(result.result()).isIn(response1, response2); + } + + @Test + void testMaxRetriesExhausted() throws InterruptedException + { + SidecarClient mockClient = mock(SidecarClient.class); + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + + // Create enough instances to exceed max retries (5 batches of 5 = 25 instances) + List instances = createMockInstances(25); + when(mockInstancesMetadata.instances()).thenReturn(instances); + when(mockClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))); + + GossipInfoFetcher fetcher = new GossipInfoFetcher(mockClient, mockInstancesMetadata, 9043, 5, 5); + Future result = fetcher.fetchGossipInfo(); + + awaitForFuture(result); + + assertThat(result.failed()).isTrue(); + assertThat(result.cause().getMessage()).contains("Failed to fetch gossip after 5 attempts across 25 instances"); + } + + @Test + void testFetchesFromMultipleBatches() throws InterruptedException + { + SidecarClient mockClient = mock(SidecarClient.class); + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + + // Create 7 instances - first batch of 5 fails, second batch of 2 succeeds on first instance + List instances = createMockInstances(7); + when(mockInstancesMetadata.instances()).thenReturn(instances); + + GossipInfoResponse mockResponse = new GossipInfoResponse(); + + // First 5 calls fail, 6th succeeds + when(mockClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Connection failed"))) + .thenReturn(CompletableFuture.completedFuture(mockResponse)); + + GossipInfoFetcher fetcher = spy(new GossipInfoFetcher(mockClient, mockInstancesMetadata, 9043, 5, 5)); + Future result = fetcher.fetchGossipInfo(); + + awaitForFuture(result); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.result()).isEqualTo(mockResponse); + verify(mockClient, times(7)).gossipInfo(any(SidecarInstance.class)); + verify(fetcher, times(2)) + .fetchGossipFromBatch(anyList(), anyInt(), anyInt(), anyInt(), anyInt()); + } + + private InstanceMetadata createMockInstance(String host, int port) + { + InstanceMetadata instance = mock(InstanceMetadata.class); + when(instance.host()).thenReturn(host); + when(instance.port()).thenReturn(port); + return instance; + } + + private List createMockInstances(int count) + { + List instances = new ArrayList<>(count); + for (int i = 1; i <= count; i++) + { + instances.add(createMockInstance("host" + i, 9043)); + } + return instances; + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + private void awaitForFuture(Future future) throws InterruptedException + { + CountDownLatch latch = new CountDownLatch(1); + future.onComplete(res -> latch.countDown()); + latch.await(5, TimeUnit.SECONDS); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java index 194683db0..f9700fb0e 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.net.UnknownHostException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -57,13 +58,16 @@ import org.apache.cassandra.sidecar.client.SidecarClient; import org.apache.cassandra.sidecar.client.SidecarInstance; import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl; import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest; +import org.apache.cassandra.sidecar.common.response.GossipInfoResponse; import org.apache.cassandra.sidecar.common.response.InstanceFileInfo; import org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType; import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse; import org.apache.cassandra.sidecar.common.response.LiveMigrationStatus; +import org.apache.cassandra.sidecar.common.server.dns.DnsResolver; import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationDirType; @@ -98,14 +102,20 @@ class LiveMigrationFileDownloaderTest { private static final int MAX_ITERATIONS = 3; private static final String SOURCE = "source"; + private static final String SOURCE_IP_ADDRESS = "127.0.0.1"; + private static final String DESTINATION = "destination"; + private static final String DESTINATION_IP_ADDRESS = "127.0.0.2"; private static final int FILE_DOWNLOAD_MAX_CONCURRENCY = 10; private static final int PORT = 9043; private static final LiveMigrationDataCopyRequest dummyRequest100pThreshold = - new LiveMigrationDataCopyRequest(MAX_ITERATIONS, 1.0, FILE_DOWNLOAD_MAX_CONCURRENCY); + new LiveMigrationDataCopyRequest(MAX_ITERATIONS, 1.0, FILE_DOWNLOAD_MAX_CONCURRENCY, null, null, null); final Vertx vertx = Vertx.vertx(); private final List dataDirsOne = Collections.singletonList("/tmp/data0"); + @TempDir + Path sourceStorageDir; + @Test void testDownloadListingFilesFailed() throws InterruptedException { @@ -377,6 +387,124 @@ void testDownloadFilesWhenSourceHasStatusCompleted(@TempDir Path tmpDir) throws verify(sidecarClient, times(0)).liveMigrationListInstanceFilesAsync(any()); } + @Test + void testDownloadFilesWhenSourceNotFoundInGossip(@TempDir Path tmpDir) throws InterruptedException + { + String storageDir = tmpDir.resolve("testDownloadFilesWhenSourceNotFoundInGossip") + .toAbsolutePath().toString(); + List dataDirs = getDataDirList(storageDir); + final Consumer statusUpdater = mock(Consumer.class); + + Injector injector = getInjector(); + SidecarClient sidecarClient = injector.getInstance(SidecarClient.class); + when(sidecarClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.completedFuture(new GossipInfoResponse())); // Empty gossip - source not available + + LiveMigrationFileDownloader downloaderSpy = + getDownloaderSpy(injector, dummyRequest100pThreshold, 0, statusUpdater, storageDir, dataDirs); + + Future statusFuture = downloaderSpy.downloadFiles(); + awaitForFuture(statusFuture); + + assertThat(statusFuture.isComplete()).isTrue(); + assertThat(statusFuture.result().state()).isEqualTo(OperationStatus.State.FAILED); + + verify(statusUpdater, times(1)).accept(any(OperationStatus.class)); + verify(sidecarClient, times(0)).liveMigrationListInstanceFilesAsync(any()); + } + + @Test + void testDownloadFilesWhenDestinationFoundInGossip(@TempDir Path tmpDir) throws InterruptedException + { + String storageDir = tmpDir.resolve("testDownloadFilesWhenDestinationFoundInGossip") + .toAbsolutePath().toString(); + List dataDirs = getDataDirList(storageDir); + final Consumer statusUpdater = mock(Consumer.class); + + Injector injector = getInjector(); + SidecarClient sidecarClient = injector.getInstance(SidecarClient.class); + + // Create gossipInfo that includes both source and destination + // Test should fail since destination is in gossip + GossipInfoResponse gossipInfoResponse = new GossipInfoResponse(); + gossipInfoResponse.put("/" + SOURCE_IP_ADDRESS + ":7000", new GossipInfoResponse.GossipInfo()); + gossipInfoResponse.put("/" + DESTINATION_IP_ADDRESS + ":7000", new GossipInfoResponse.GossipInfo()); + + when(sidecarClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.completedFuture(gossipInfoResponse)); + + LiveMigrationFileDownloader downloaderSpy = + getDownloaderSpy(injector, dummyRequest100pThreshold, 0, statusUpdater, storageDir, dataDirs); + + Future statusFuture = downloaderSpy.downloadFiles(); + awaitForFuture(statusFuture); + + assertThat(statusFuture.isComplete()).isTrue(); + assertThat(statusFuture.result().state()).isEqualTo(OperationStatus.State.FAILED); + + verify(statusUpdater, times(1)).accept(any(OperationStatus.class)); + verify(sidecarClient, times(0)).liveMigrationListInstanceFilesAsync(any()); + } + + @Test + void testDownloadFilesWithSkipGossipCheckEnabled(@TempDir Path tmpDir) throws InterruptedException, IOException + { + String storageDir = tmpDir.resolve("testDownloadFilesWithSkipGossipCheckEnabled") + .toAbsolutePath().toString(); + List dataDirs = getDataDirList(storageDir); + final Consumer statusUpdater = mock(Consumer.class); + int fileSize = 64; + long lastModifiedTime = System.currentTimeMillis(); + List filesToDownload = List.of( + new TestFile(DATA_FILE_DIR, 0, "ks1/t1/data.db", fileSize, lastModifiedTime), + new TestFile(DATA_FILE_DIR, 0, "ks1/t2/data.db", fileSize, lastModifiedTime) + ); + + Injector injector = getInjector(); + SidecarClient sidecarClient = injector.getInstance(SidecarClient.class); + + // Create gossipInfo that includes both source and destination - normally this would fail + GossipInfoResponse gossipInfoResponse = new GossipInfoResponse(); + gossipInfoResponse.put("/" + SOURCE_IP_ADDRESS + ":7000", new GossipInfoResponse.GossipInfo()); + gossipInfoResponse.put("/" + DESTINATION_IP_ADDRESS + ":7000", new GossipInfoResponse.GossipInfo()); + + when(sidecarClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.completedFuture(gossipInfoResponse)); + + when(sidecarClient.liveMigrationListInstanceFilesAsync(eq(new SidecarInstanceImpl(SOURCE, PORT)))) + .thenReturn(CompletableFuture.completedFuture(getInstanceFilesListResponse(filesToDownload))); + + // Create request with skipGossipCheck set to true + boolean skipGossipCheck = true; + LiveMigrationDataCopyRequest requestWithSkipGossipCheck = + new LiveMigrationDataCopyRequest(MAX_ITERATIONS, 1.0, FILE_DOWNLOAD_MAX_CONCURRENCY, null, null, skipGossipCheck); + + LiveMigrationFileDownloader downloaderSpy = + getDownloaderSpy(injector, requestWithSkipGossipCheck, 0, statusUpdater, storageDir, dataDirs); + + doAnswer(invocation -> invocation.getArguments()[0]) + .when(downloaderSpy).deleteUnnecessaryFilesAndDirectories(any(InstanceFilesListResponse.class)); + + doReturn(Future.succeededFuture(getInstanceFileInfo(filesToDownload))) + .when(downloaderSpy).shortlistDownloadFiles(any(InstanceFilesListResponse.class), anyDouble()); + + doReturn(Future.succeededFuture()) + .when(downloaderSpy).updateFileTimestampAsync(any(Path.class), anyLong()); + + when(sidecarClient.liveMigrationStreamFileAsync(any(SidecarInstance.class), anyString(), anyString())) + .thenReturn(CompletableFuture.completedFuture(null)); + + Future statusFuture = downloaderSpy.downloadFiles(); + awaitForFuture(statusFuture); + + assertThat(statusFuture.isComplete()).isTrue(); + // Should succeed because gossip check was skipped even though destination was found in gossip + assertThat(statusFuture.result().state()).isEqualTo(OperationStatus.State.DOWNLOAD_COMPLETE); + verify(statusUpdater, times(4)).accept(any(OperationStatus.class)); + // Verify that file listing was actually called, proving gossip check was skipped + verify(sidecarClient, times(1)).liveMigrationListInstanceFilesAsync(any()); + } + @Test void testDownloadZeroSizedFiles(@TempDir Path tmpDir) throws InterruptedException { @@ -624,7 +752,7 @@ void testCancelWhenDownloadsAreInProgress(@TempDir Path tempDir) throws Exceptio final Consumer statusUpdater = mock(Consumer.class); Injector injector = getInjector(); LiveMigrationDataCopyRequest maxConcurrency1Request = - new LiveMigrationDataCopyRequest(1, 1.0, 2); + new LiveMigrationDataCopyRequest(1, 1.0, 2, null, null, null); LiveMigrationFileDownloader downloaderSpy = spy(getDownloader(injector, maxConcurrency1Request, 0, statusUpdater, storageDir, dataDirs)); @@ -1354,6 +1482,35 @@ LiveMigrationFileDownloader getDownloader(Injector injector, SidecarClientProvider sidecarClientProvider = injector.getInstance(SidecarClientProvider.class); LiveMigrationConfiguration liveMigrationConfig = injector.getInstance(SidecarConfiguration.class) .liveMigrationConfiguration(); + + DnsResolver mockDnsResolver = injector.getInstance(DnsResolver.class); + + InstancesMetadata instancesMetadata = mock(InstancesMetadata.class); + InstanceMetadata sourceInstanceMetadata = + InstanceMetadataImpl.builder() + .id(2) + .host(SOURCE, mockDnsResolver) + .port(9042) + .storagePort(7000) + .metricRegistry(new MetricRegistry()) + .storageDir(sourceStorageDir.toAbsolutePath().toString()) + .build(); + InstanceMetadata destinationInstanceMetadata = + InstanceMetadataImpl.builder() + .host(DESTINATION, mockDnsResolver) + .port(9042) + .storagePort(7000) + .dataDirs(dataDirs) + .storageDir(storageDir) + .metricRegistry(new MetricRegistry()) + .id(1) + .build(); + when(instancesMetadata.instanceFromHost(eq(SOURCE))) + .thenReturn(sourceInstanceMetadata); + when(instancesMetadata.instanceFromHost(eq(DESTINATION))) + .thenReturn(destinationInstanceMetadata); + when(instancesMetadata.instances()) + .thenReturn(List.of(sourceInstanceMetadata, destinationInstanceMetadata)); return LiveMigrationFileDownloader.builder() .id(UUID.randomUUID().toString()) .vertx(vertx) @@ -1361,14 +1518,9 @@ LiveMigrationFileDownloader getDownloader(Injector injector, .request(request) .iteration(currentIteration) .statusUpdater(mockStatusUpdater) - .instanceMetadata(InstanceMetadataImpl.builder() - .dataDirs(dataDirs) - .storageDir(storageDir) - .metricRegistry(new MetricRegistry()) - .id(1) - .storagePort(7000) - .build()) + .instanceMetadata(destinationInstanceMetadata) .liveMigrationConfiguration(liveMigrationConfig) + .instancesMetadata(instancesMetadata) .source(SOURCE) .port(PORT) .executorPools(ExecutorPoolsHelper.createdSharedTestPool(vertx)) @@ -1451,6 +1603,7 @@ private static class LiveMigrationFileDownloaderTestModule extends AbstractModul SidecarClientProvider sidecarClientProvider = mock(SidecarClientProvider.class); SidecarConfiguration mockSidecarConfiguration = mock(SidecarConfiguration.class); LiveMigrationConfiguration mockLiveMigrationConfig = mock(LiveMigrationConfiguration.class); + DnsResolver mockDnsResolver = mock(DnsResolver.class); @Override protected void configure() @@ -1458,13 +1611,29 @@ protected void configure() bind(SidecarClient.class).toInstance(sidecarClient); bind(SidecarConfiguration.class).toInstance(mockSidecarConfiguration); bind(SidecarClientProvider.class).toInstance(sidecarClientProvider); + bind(DnsResolver.class).toInstance(mockDnsResolver); + try + { + when(mockDnsResolver.resolve(eq(SOURCE))).thenReturn(SOURCE_IP_ADDRESS); + when(mockDnsResolver.resolve(eq(DESTINATION))).thenReturn(DESTINATION_IP_ADDRESS); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } when(sidecarClient.liveMigrationStatus(any(SidecarInstance.class))) .thenReturn(CompletableFuture.completedFuture(new LiveMigrationStatus(NOT_COMPLETED, 1L))); when(sidecarClientProvider.get()).thenReturn(sidecarClient); + GossipInfoResponse mockGossipInfoResponse = new GossipInfoResponse(); + mockGossipInfoResponse.put("/" + SOURCE_IP_ADDRESS + ":7000", new GossipInfoResponse.GossipInfo()); + when(sidecarClient.gossipInfo(any(SidecarInstance.class))) + .thenReturn(CompletableFuture.completedFuture(mockGossipInfoResponse)); when(mockSidecarConfiguration.liveMigrationConfiguration()).thenReturn(mockLiveMigrationConfig); when(mockLiveMigrationConfig.filesToExclude()).thenReturn(Set.of()); when(mockLiveMigrationConfig.directoriesToExclude()).thenReturn(Set.of()); + when(mockLiveMigrationConfig.gossipFetchMaxRetries()).thenReturn(2); + when(mockLiveMigrationConfig.gossipFetchBatchSize()).thenReturn(5); } } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java index 4976867c7..20bd44844 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java @@ -26,6 +26,7 @@ import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.ExecutorPoolsHelper; import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest; import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskResponse; @@ -34,6 +35,7 @@ import org.apache.cassandra.sidecar.utils.SidecarClientProvider; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -46,7 +48,7 @@ class LiveMigrationTaskImplTest private LiveMigrationTaskImpl createTask() { - return createTask("test-task-id", new LiveMigrationDataCopyRequest(5, 0.8, 10)); + return createTask("test-task-id", new LiveMigrationDataCopyRequest(5, 0.8, 10, null, null, null)); } private LiveMigrationTaskImpl createTask(String id, LiveMigrationDataCopyRequest request) @@ -56,13 +58,16 @@ private LiveMigrationTaskImpl createTask(String id, LiveMigrationDataCopyRequest SidecarClient sidecarClient = mock(SidecarClient.class); LiveMigrationConfiguration liveMigrationConfiguration = mock(LiveMigrationConfiguration.class); InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + InstanceMetadata sourceMetadata = mock(InstanceMetadata.class); + InstancesMetadata instancesMetadata = mock(InstancesMetadata.class); + when(instancesMetadata.instanceFromHost(eq(SOURCE))).thenReturn(sourceMetadata); when(sidecarClientProvider.get()).thenReturn(sidecarClient); ExecutorPools executorPools = ExecutorPoolsHelper.createdSharedTestPool(vertx); return new LiveMigrationTaskImpl(vertx, executorPools, sidecarClientProvider, liveMigrationConfiguration, - id, request, SOURCE, PORT, instanceMetadata); + instancesMetadata, id, request, SOURCE, PORT, instanceMetadata); } @Test diff --git a/server/src/test/resources/config/sidecar_live_migration.yaml b/server/src/test/resources/config/sidecar_live_migration.yaml index 51cacfdc8..d6a19a9ad 100644 --- a/server/src/test/resources/config/sidecar_live_migration.yaml +++ b/server/src/test/resources/config/sidecar_live_migration.yaml @@ -212,3 +212,5 @@ live_migration: migration_map: localhost1: localhost4 max_concurrent_downloads: 20 + gossip_fetch_batch_size: 5 + gossip_fetch_max_retries: 2