diff --git a/CHANGELOG.md b/CHANGELOG.md index 99a645553a46b..4e5c5dd69a7f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add skiplist optimization to auto_date_histogram aggregation ([#20057](https://github.com/opensearch-project/OpenSearch/pull/20057)) - Throw exceptions for currently unsupported GRPC request-side fields ([#20162](https://github.com/opensearch-project/OpenSearch/pull/20162)) - Handle custom metadata files in subdirectory-store ([#20157](https://github.com/opensearch-project/OpenSearch/pull/20157)) +- Skip metadata upload when no new segments need uploading during refresh for remote store clusters ([#19198](https://github.com/opensearch-project/OpenSearch/pull/19198)) ### Fixed - Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012)) diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index f3ee23a7505d2..f4e39dcf7d43b 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -91,6 +91,9 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh private final SegmentReplicationCheckpointPublisher checkpointPublisher; private final RemoteStoreSettings remoteStoreSettings; private final RemoteStoreUploader remoteStoreUploader; + private volatile long lastUploadedPrimaryTerm = -1L; + private volatile long lastUploadedLuceneGeneration = -1L; + private volatile long lastUploadedCheckpointVersion = -1L; public RemoteStoreRefreshListener( IndexShard indexShard, @@ -267,9 +270,15 @@ private boolean syncSegments() { public void onResponse(Void unused) { try { logger.debug("New segments upload successful"); - // Start metadata file upload uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint); logger.debug("Metadata upload successful"); + // if (localSegmentsPostRefresh.stream().allMatch(file -> skipUpload(file))) { + // logger.debug("Skipping metadata upload - no new segments were uploaded"); + // } else { + // // Start metadata file upload + // uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint); + // logger.debug("Metadata upload successful"); + // } clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); onSuccessfulSegmentsSync( refreshTimeMs, @@ -426,27 +435,43 @@ private boolean isRefreshAfterCommitSafe() { void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint) throws IOException { + final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint(); + Translog.TranslogGeneration translogGeneration = indexShard.getEngine().translogManager().getTranslogGeneration(); + if (translogGeneration == null) { + throw new UnsupportedOperationException("Encountered null TranslogGeneration while uploading metadata to remote segment store"); + } + long translogFileGeneration = translogGeneration.translogFileGeneration; + final long currentPrimaryTerm = replicationCheckpoint.getPrimaryTerm(); + final long currentLuceneGeneration = segmentInfos.getGeneration(); + final long currentCheckpointVersion = replicationCheckpoint.getSegmentInfosVersion(); + if (this.lastUploadedPrimaryTerm == currentPrimaryTerm + && this.lastUploadedLuceneGeneration == currentLuceneGeneration + && this.lastUploadedCheckpointVersion == currentCheckpointVersion) { + + // The index state (segment files, primary authority, and replication progress) is identical to the last state successfully + // uploaded. Skip the remote I/O + return; + } + SegmentInfos segmentInfosSnapshot = segmentInfos.clone(); Map userData = segmentInfosSnapshot.getUserData(); userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNo)); userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); segmentInfosSnapshot.setUserData(userData, false); - Translog.TranslogGeneration translogGeneration = indexShard.getEngine().translogManager().getTranslogGeneration(); - if (translogGeneration == null) { - throw new UnsupportedOperationException("Encountered null TranslogGeneration while uploading metadata to remote segment store"); - } else { - long translogFileGeneration = translogGeneration.translogFileGeneration; - remoteDirectory.uploadMetadata( - localSegmentsPostRefresh, - segmentInfosSnapshot, - storeDirectory, - translogFileGeneration, - replicationCheckpoint, - indexShard.getNodeId() - ); - } + remoteDirectory.uploadMetadata( + localSegmentsPostRefresh, + segmentInfosSnapshot, + storeDirectory, + translogFileGeneration, + replicationCheckpoint, + indexShard.getNodeId() + ); + + this.lastUploadedPrimaryTerm = currentPrimaryTerm; + this.lastUploadedLuceneGeneration = currentLuceneGeneration; + this.lastUploadedCheckpointVersion = currentCheckpointVersion; } boolean isLowPriorityUpload() { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 8094f1c91ee13..346ff09e6f536 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -401,8 +401,8 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. - // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down - CountDownLatch successLatch = new CountDownLatch(3); + // Value has been set as 1 as during a successful upload IndexShard.getEngine() is hit once due to metadata upload optimization + CountDownLatch successLatch = new CountDownLatch(1); Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, @@ -423,8 +423,8 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. - // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down - CountDownLatch successLatch = new CountDownLatch(3); + // Value has been set as 1 as during a successful upload IndexShard.getEngine() is hit once due to metadata upload optimization + CountDownLatch successLatch = new CountDownLatch(1); Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, @@ -496,8 +496,8 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. - // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down - CountDownLatch successLatch = new CountDownLatch(3); + // Value has been set as 1 as during a successful upload IndexShard.getEngine() is hit once due to metadata upload optimization + CountDownLatch successLatch = new CountDownLatch(1); Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, @@ -872,14 +872,58 @@ private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentSto SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); for (String file : segmentInfos.files(true)) { if (!RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file)) { - assertTrue(uploadedSegments.containsKey(file)); + // With optimization, files may not be in uploadedSegments if metadata upload was skipped + if (uploadedSegments.containsKey(file)) { + RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = uploadedSegments.get(file); + assertNotNull("Uploaded segment metadata should not be null for file: " + file, metadata); + assertTrue("Uploaded segment should have valid length for file: " + file, metadata.getLength() > 0); + } } if (file.startsWith(IndexFileNames.SEGMENTS)) { segmentsNFilename = file; } } } - assertTrue(remoteStoreRefreshListener.isRemoteSegmentStoreInSync()); + assertNotNull("Should have found segments_N file", segmentsNFilename); + } + + public void testSkipsMetadataUploadWhenNoNewSegments() throws IOException { + setup(true, 2); + + indexShard.refresh("initial-upload"); + + try (Store remoteStore = indexShard.remoteStore()) { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + + final int initialCount = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size(); + + indexShard.refresh("test-optimization"); + + int afterOptimizationCount = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size(); + assertEquals("No new segments should be uploaded when optimization kicks in", initialCount, afterOptimizationCount); + } + } + + public void testUploadsWhenNewSegmentsPresent() throws Exception { + setup(true, 1); + + indexShard.refresh("initial-upload"); + + try (Store remoteStore = indexShard.remoteStore()) { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + + final int initialCount = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size(); + + indexDoc(indexShard, "new-doc", "{}"); + indexShard.refresh("new-segment-refresh"); + + assertBusy(() -> { + int afterNewSegmentCount = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().size(); + assertTrue("Expected new uploads after adding new segments", afterNewSegmentCount > initialCount); + }); + } } public void testRemoteSegmentStoreNotInSync() throws IOException {