Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.3.0
-----
* Adding gossip based safety check to Live Migration data copy task endpoint (CASSSIDECAR-409)
* CDC: Add end-to-end CDC integration tests (CASSSIDECAR-308)
* SchemaStorePublisherFactory should be Injectable in CachingSchemaStore (CASSSIDECAR-408)
* Fix StorageClientTest Docker API compatibility and improve CI test reporting (CASSSIDECAR-410)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,40 @@ public class LiveMigrationDataCopyRequest
public final int maxConcurrency;

/**
* Creates a new request with auto-generated ID.
* Batch size for fetching gossip information from cluster instances.
* During live migration safety checks, gossip info is fetched from instances in parallel batches.
* This controls how many instances are contacted simultaneously per batch.
* It is an optional parameter. Server will apply default if not specified.
*/
public final Integer gossipFetchBatchSize;

/**
* Maximum number of batch retry attempts when fetching gossip information.
* If all instances in a batch fail to return gossip info, the next batch is tried.
* This limits the total number of batch attempts across all instances.
* It is an optional parameter. Server will apply default if not specified.
*/
public final Integer gossipFetchMaxRetries;

/**
* Flag to skip gossip-based safety checks before data copy.
* When true, the safety validation that ensures destination is not present in cluster gossip
* will be skipped. Use with caution - skipping this check may lead to data loss if the
* destination node has been started.
* Optional parameter - defaults to false (performs safety checks) if not specified.
*/
public final Boolean skipGossipCheck;

/**
* Creates a new live migration data copy request.
*/
@JsonCreator
public LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int maxIterations,
@JsonProperty("successThreshold") double successThreshold,
@JsonProperty("maxConcurrency") int maxConcurrency)
@JsonProperty("maxConcurrency") int maxConcurrency,
@JsonProperty("gossipFetchBatchSize") Integer gossipFetchBatchSize,
@JsonProperty("gossipFetchMaxRetries") Integer gossipFetchMaxRetries,
@JsonProperty("skipGossipCheck") Boolean skipGossipCheck)
{

if (maxIterations <= 0)
Expand All @@ -71,8 +99,24 @@ public LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int maxIterat
+ ". It cannot be less than or equal to zero.");
}

// Validate optional gossip fetch parameters if specified
if (gossipFetchBatchSize != null && gossipFetchBatchSize <= 0)
{
throw new IllegalArgumentException("Invalid gossipFetchBatchSize " + gossipFetchBatchSize
+ ". It must be greater than zero when specified.");
}

if (gossipFetchMaxRetries != null && gossipFetchMaxRetries <= 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also have some guardrails around an upperbound here? I'm thinking from a point of view of a malicious payload that can make the service retry for a long time?

{
throw new IllegalArgumentException("Invalid gossipFetchMaxRetries " + gossipFetchMaxRetries
+ ". It must be greater than zero when specified.");
}

this.maxIterations = maxIterations;
this.successThreshold = successThreshold;
this.maxConcurrency = maxConcurrency;
this.gossipFetchBatchSize = gossipFetchBatchSize;
this.gossipFetchMaxRetries = gossipFetchMaxRetries;
this.skipGossipCheck = skipGossipCheck;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class LiveMigrationDataCopyRequestTest
@Test
void testSerializationDeserializationRoundTrip() throws Exception
{
LiveMigrationDataCopyRequest original = new LiveMigrationDataCopyRequest(5, 0.95, 10);
LiveMigrationDataCopyRequest original = new LiveMigrationDataCopyRequest(5, 0.95, 10, null, null, null);

String json = objectMapper.writeValueAsString(original);
LiveMigrationDataCopyRequest deserialized = objectMapper.readValue(json, LiveMigrationDataCopyRequest.class);
Expand All @@ -42,58 +42,74 @@ void testSerializationDeserializationRoundTrip() throws Exception
assertThat(deserialized.maxConcurrency).isEqualTo(original.maxConcurrency);
}

@Test
void testSerializationDeserializationRoundTripWithOptionalValues() throws Exception
{
LiveMigrationDataCopyRequest original = new LiveMigrationDataCopyRequest(5, 0.95, 10, 5, 5, true);

String json = objectMapper.writeValueAsString(original);
LiveMigrationDataCopyRequest deserialized = objectMapper.readValue(json, LiveMigrationDataCopyRequest.class);

assertThat(deserialized.maxIterations).isEqualTo(original.maxIterations);
assertThat(deserialized.successThreshold).isEqualTo(original.successThreshold);
assertThat(deserialized.maxConcurrency).isEqualTo(original.maxConcurrency);
assertThat(deserialized.gossipFetchBatchSize).isEqualTo(original.gossipFetchBatchSize);
assertThat(deserialized.gossipFetchMaxRetries).isEqualTo(original.gossipFetchMaxRetries);
assertThat(deserialized.skipGossipCheck).isEqualTo(original.skipGossipCheck);
}

@Test
void testConstructorWithInvalidMaxIterationsThrowsException()
{
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(0, 0.95, 10))
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(0, 0.95, 10, null, null, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid maxIterations 0. It cannot be less than or equal to zero.");
}

@Test
void testConstructorWithNegativeMaxIterationsThrowsException()
{
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(-5, 0.95, 10))
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(-5, 0.95, 10, null, null, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid maxIterations -5. It cannot be less than or equal to zero.");
}

@Test
void testConstructorWithInvalidSuccessThresholdBelowZeroThrowsException()
{
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, -0.1, 10))
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, -0.1, 10, null, null, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid successThreshold -0.1. It cannot be less than zero or greater than one.");
}

@Test
void testConstructorWithInvalidSuccessThresholdAboveOneThrowsException()
{
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 1.5, 10))
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 1.5, 10, null, null, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid successThreshold 1.5. It cannot be less than zero or greater than one.");
}

@Test
void testConstructorWithInvalidMaxConcurrencyThrowsException()
{
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 0))
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 0, null, null, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid maxConcurrency 0. It cannot be less than or equal to zero.");
}

@Test
void testConstructorWithNegativeMaxConcurrencyThrowsException()
{
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, -3))
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, -3, null, null, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid maxConcurrency -3. It cannot be less than or equal to zero.");
}

@Test
void testValidBoundaryValues()
{
LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 0.0, 1);
LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(1, 0.0, 1, null, null, null);

assertThat(request.maxIterations).isEqualTo(1);
assertThat(request.successThreshold).isEqualTo(0.0);
Expand All @@ -105,10 +121,29 @@ void testValidBoundaryValuesUpperBound()
{
LiveMigrationDataCopyRequest request = new LiveMigrationDataCopyRequest(Integer.MAX_VALUE,
1.0,
Integer.MAX_VALUE);
Integer.MAX_VALUE,
null,
null,
null);

assertThat(request.maxIterations).isEqualTo(Integer.MAX_VALUE);
assertThat(request.successThreshold).isEqualTo(1.0);
assertThat(request.maxConcurrency).isEqualTo(Integer.MAX_VALUE);
}

@Test
void testConstructorWithInvalidGossipFetchBatchSizeThrowsException()
{
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 10, 0, null, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid gossipFetchBatchSize 0. It must be greater than zero when specified.");
}

@Test
void testConstructorWithInvalidGossipFetchMaxRetriesThrowsException()
{
assertThatThrownBy(() -> new LiveMigrationDataCopyRequest(5, 0.95, 10, null, -1, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid gossipFetchMaxRetries -1. It must be greater than zero when specified.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,19 @@ public CompletableFuture<GossipInfoResponse> gossipInfo()
return executor.executeRequestAsync(requestBuilder().gossipInfoRequest().build());
}

/**
* Executes the gossip info request using the default retry policy and provided {@code instance}
*
* @param instance the instance where the request will be executed
* @return a completable future of the gossip info
*/
public CompletableFuture<GossipInfoResponse> gossipInfo(SidecarInstance instance)
{
return executor.executeRequestAsync(requestBuilder()
.singleInstanceSelectionPolicy(instance)
.gossipInfoRequest()
.build());
}

/**
* Executes the GET gossip health request using the default retry policy and configured selection policy
Expand Down
2 changes: 2 additions & 0 deletions conf/sidecar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ live_migration:
migration_map: # Map of source and destination Cassandra instances
# localhost1: localhost4 # This entry says that localhost1 will be migrated to localhost4
max_concurrent_downloads: 20 # Maximum number of concurrent downloads allowed
gossip_fetch_batch_size: 3 # Batch size for fetching gossip information from cluster instances during safety checks
gossip_fetch_max_retries: 3 # Maximum number of batch retry attempts when fetching gossip information

# Configuration to allow sidecar start and stop Cassandra instances via the lifecycle API (disabled by default)
lifecycle:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,20 @@ public interface LiveMigrationConfiguration
* Maximum number of concurrent downloads allowed.
*/
int maxConcurrentDownloads();

/**
* Batch size for fetching gossip information from cluster instances during safety checks.
* Controls how many instances are contacted simultaneously per batch.
*
* @return the batch size for gossip fetching
*/
int gossipFetchBatchSize();

/**
* Maximum number of batch retry attempts when fetching gossip information.
* If all instances in a batch fail to return gossip info, the next batch is tried.
*
* @return the maximum number of retry attempts
*/
int gossipFetchMaxRetries();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,29 @@ public class LiveMigrationConfigurationImpl implements LiveMigrationConfiguratio
{

public static final int DEFAULT_MAX_CONCURRENT_DOWNLOADS = 20;
public static final int DEFAULT_GOSSIP_FETCH_BATCH_SIZE = 3;
public static final int DEFAULT_GOSSIP_FETCH_MAX_RETRIES = 3;

private final Set<String> filesToExclude;
private final Set<String> directoriesToExclude;
private final Map<String, String> migrationMap;
private final int maxConcurrentDownloads;
private final int gossipFetchBatchSize;
private final int gossipFetchMaxRetries;

public LiveMigrationConfigurationImpl()
{
this(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(), DEFAULT_MAX_CONCURRENT_DOWNLOADS);
this(Collections.emptySet(), Collections.emptySet(), Collections.emptyMap(),
DEFAULT_MAX_CONCURRENT_DOWNLOADS, DEFAULT_GOSSIP_FETCH_BATCH_SIZE, DEFAULT_GOSSIP_FETCH_MAX_RETRIES);
}

@JsonCreator
public LiveMigrationConfigurationImpl(@JsonProperty("files_to_exclude") Set<String> filesToExclude,
@JsonProperty("dirs_to_exclude") Set<String> directoriesToExclude,
@JsonProperty("migration_map") Map<String, String> migrationMap,
@JsonProperty("max_concurrent_downloads") int maxConcurrentDownloads)
@JsonProperty("max_concurrent_downloads") int maxConcurrentDownloads,
@JsonProperty("gossip_fetch_batch_size") Integer gossipFetchBatchSize,
@JsonProperty("gossip_fetch_max_retries") Integer gossipFetchMaxRetries)
{
this.filesToExclude = filesToExclude;
this.directoriesToExclude = directoriesToExclude;
Expand All @@ -60,6 +67,24 @@ public LiveMigrationConfigurationImpl(@JsonProperty("files_to_exclude") Set<Stri
". It must be >= 1");
}
this.maxConcurrentDownloads = maxConcurrentDownloads;

if (gossipFetchBatchSize != null && gossipFetchBatchSize < 1)
{
throw new IllegalArgumentException("Invalid gossip_fetch_batch_size " + gossipFetchBatchSize +
". It must be >= 1");
}
this.gossipFetchBatchSize = gossipFetchBatchSize == null
? DEFAULT_GOSSIP_FETCH_BATCH_SIZE
: gossipFetchBatchSize;

if (gossipFetchMaxRetries != null && gossipFetchMaxRetries < 1)
{
throw new IllegalArgumentException("Invalid gossip_fetch_max_retries " + gossipFetchMaxRetries +
". It must be >= 1");
}
this.gossipFetchMaxRetries = gossipFetchMaxRetries == null
? DEFAULT_GOSSIP_FETCH_MAX_RETRIES
: gossipFetchMaxRetries;
}

@Override
Expand Down Expand Up @@ -89,4 +114,18 @@ public int maxConcurrentDownloads()
{
return maxConcurrentDownloads;
}

@Override
@JsonProperty("gossip_fetch_batch_size")
public int gossipFetchBatchSize()
{
return gossipFetchBatchSize;
}

@Override
@JsonProperty("gossip_fetch_max_retries")
public int gossipFetchMaxRetries()
{
return gossipFetchMaxRetries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void handleInternal(RoutingContext context,
.onFailure(throwable -> {
if (throwable instanceof LiveMigrationInvalidRequestException)
{
LOGGER.error("Input payload is not valid.", throwable);
LOGGER.error("Invalid live migration request.", throwable);
context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, throwable.getMessage(), throwable));
}
else if (throwable instanceof LiveMigrationDataCopyInProgressException)
Expand Down
Loading
Loading