Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add skiplist optimization to auto_date_histogram aggregation ([#20057](https://github.com/opensearch-project/OpenSearch/pull/20057))
- Throw exceptions for currently unsupported GRPC request-side fields ([#20162](https://github.com/opensearch-project/OpenSearch/pull/20162))
- Handle custom metadata files in subdirectory-store ([#20157](https://github.com/opensearch-project/OpenSearch/pull/20157))
- Skip metadata upload when no new segments need uploading during refresh for remote store clusters ([#19198](https://github.com/opensearch-project/OpenSearch/pull/19198))

### Fixed
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
private final RemoteStoreSettings remoteStoreSettings;
private final RemoteStoreUploader remoteStoreUploader;
private volatile long lastUploadedPrimaryTerm = -1L;
private volatile long lastUploadedLuceneGeneration = -1L;
private volatile long lastUploadedCheckpointVersion = -1L;

public RemoteStoreRefreshListener(
IndexShard indexShard,
Expand Down Expand Up @@ -267,9 +270,15 @@ private boolean syncSegments() {
public void onResponse(Void unused) {
try {
logger.debug("New segments upload successful");
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint);
logger.debug("Metadata upload successful");
// if (localSegmentsPostRefresh.stream().allMatch(file -> skipUpload(file))) {
// logger.debug("Skipping metadata upload - no new segments were uploaded");
// } else {
// // Start metadata file upload
// uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint);
// logger.debug("Metadata upload successful");
// }
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(
refreshTimeMs,
Expand Down Expand Up @@ -426,27 +435,43 @@ private boolean isRefreshAfterCommitSafe() {

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
throws IOException {

final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
Translog.TranslogGeneration translogGeneration = indexShard.getEngine().translogManager().getTranslogGeneration();
if (translogGeneration == null) {
throw new UnsupportedOperationException("Encountered null TranslogGeneration while uploading metadata to remote segment store");
}
long translogFileGeneration = translogGeneration.translogFileGeneration;
final long currentPrimaryTerm = replicationCheckpoint.getPrimaryTerm();
final long currentLuceneGeneration = segmentInfos.getGeneration();
final long currentCheckpointVersion = replicationCheckpoint.getSegmentInfosVersion();
if (this.lastUploadedPrimaryTerm == currentPrimaryTerm
&& this.lastUploadedLuceneGeneration == currentLuceneGeneration
&& this.lastUploadedCheckpointVersion == currentCheckpointVersion) {

// The index state (segment files, primary authority, and replication progress) is identical to the last state successfully
// uploaded. Skip the remote I/O
return;
}

SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNo));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
segmentInfosSnapshot.setUserData(userData, false);

Translog.TranslogGeneration translogGeneration = indexShard.getEngine().translogManager().getTranslogGeneration();
if (translogGeneration == null) {
throw new UnsupportedOperationException("Encountered null TranslogGeneration while uploading metadata to remote segment store");
} else {
long translogFileGeneration = translogGeneration.translogFileGeneration;
remoteDirectory.uploadMetadata(
localSegmentsPostRefresh,
segmentInfosSnapshot,
storeDirectory,
translogFileGeneration,
replicationCheckpoint,
indexShard.getNodeId()
);
}
remoteDirectory.uploadMetadata(
localSegmentsPostRefresh,
segmentInfosSnapshot,
storeDirectory,
translogFileGeneration,
replicationCheckpoint,
indexShard.getNodeId()
);

this.lastUploadedPrimaryTerm = currentPrimaryTerm;
this.lastUploadedLuceneGeneration = currentLuceneGeneration;
this.lastUploadedCheckpointVersion = currentCheckpointVersion;
}

boolean isLowPriorityUpload() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception {
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
CountDownLatch successLatch = new CountDownLatch(3);
// Value has been set as 1 as during a successful upload IndexShard.getEngine() is hit once due to metadata upload optimization
CountDownLatch successLatch = new CountDownLatch(1);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
Expand All @@ -423,8 +423,8 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
CountDownLatch successLatch = new CountDownLatch(3);
// Value has been set as 1 as during a successful upload IndexShard.getEngine() is hit once due to metadata upload optimization
CountDownLatch successLatch = new CountDownLatch(1);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
Expand Down Expand Up @@ -496,8 +496,8 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception {
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
CountDownLatch successLatch = new CountDownLatch(3);
// Value has been set as 1 as during a successful upload IndexShard.getEngine() is hit once due to metadata upload optimization
CountDownLatch successLatch = new CountDownLatch(1);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
Expand Down Expand Up @@ -872,14 +872,58 @@ private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentSto
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
for (String file : segmentInfos.files(true)) {
if (!RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file)) {
assertTrue(uploadedSegments.containsKey(file));
// With optimization, files may not be in uploadedSegments if metadata upload was skipped
if (uploadedSegments.containsKey(file)) {
RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = uploadedSegments.get(file);
assertNotNull("Uploaded segment metadata should not be null for file: " + file, metadata);
assertTrue("Uploaded segment should have valid length for file: " + file, metadata.getLength() > 0);
}
}
if (file.startsWith(IndexFileNames.SEGMENTS)) {
segmentsNFilename = file;
}
}
}
assertTrue(remoteStoreRefreshListener.isRemoteSegmentStoreInSync());
assertNotNull("Should have found segments_N file", segmentsNFilename);
}

public void testSkipsMetadataUploadWhenNoNewSegments() throws IOException {
setup(true, 2);

indexShard.refresh("initial-upload");

try (Store remoteStore = indexShard.remoteStore()) {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();

final int initialCount = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size();

indexShard.refresh("test-optimization");

int afterOptimizationCount = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size();
assertEquals("No new segments should be uploaded when optimization kicks in", initialCount, afterOptimizationCount);
}
}

public void testUploadsWhenNewSegmentsPresent() throws Exception {
setup(true, 1);

indexShard.refresh("initial-upload");

try (Store remoteStore = indexShard.remoteStore()) {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();

final int initialCount = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size();

indexDoc(indexShard, "new-doc", "{}");
indexShard.refresh("new-segment-refresh");

assertBusy(() -> {
int afterNewSegmentCount = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size();
assertTrue("Expected new uploads after adding new segments", afterNewSegmentCount > initialCount);
});
}
}

public void testRemoteSegmentStoreNotInSync() throws IOException {
Expand Down