From b5fc89acba80540f830b7df8a177c20b04749e09 Mon Sep 17 00:00:00 2001 From: Swetha Guptha Date: Tue, 9 Dec 2025 09:53:09 +0530 Subject: [PATCH] Introduce primaryTerm and inSynAllocationIdsK in IndexRoutingTable Signed-off-by: Swetha Guptha --- .../cluster/routing/IndexRoutingTable.java | 229 +++++++++++++++++- .../routing/RoutingTableIncrementalDiff.java | 2 + 2 files changed, 228 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index 971c8ef95282e..d43061c0c5367 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -33,8 +33,10 @@ package org.opensearch.cluster.routing; import org.apache.lucene.util.CollectionUtil; +import org.opensearch.Version; import org.opensearch.cluster.AbstractDiffable; import org.opensearch.cluster.Diff; +import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RecoverySource.EmptyStoreRecoverySource; @@ -43,6 +45,7 @@ import org.opensearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.opensearch.cluster.routing.allocation.IndexMetadataUpdater; import org.opensearch.common.Randomness; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; @@ -51,16 +54,20 @@ import org.opensearch.core.common.io.stream.VerifiableWriteable; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.TreeSet; import java.util.function.Predicate; /** @@ -95,7 +102,20 @@ public class IndexRoutingTable extends AbstractDiffable private final List allActiveShards; + private final long[] primaryTerms; + + private final Map> inSyncAllocationIds; + IndexRoutingTable(Index index, final Map shards) { + this(index, shards, null, null); + } + + IndexRoutingTable( + Index index, + final Map shards, + final long[] primaryTerms, + final Map> inSyncAllocationIds + ) { this.index = index; this.shuffler = new RotationShardShuffler(Randomness.get().nextInt()); this.shards = Collections.unmodifiableMap(shards); @@ -108,6 +128,8 @@ public class IndexRoutingTable extends AbstractDiffable } } this.allActiveShards = Collections.unmodifiableList(allActiveShards); + this.primaryTerms = primaryTerms; + this.inSyncAllocationIds = inSyncAllocationIds != null ? Collections.unmodifiableMap(inSyncAllocationIds) : Collections.emptyMap(); } /** @@ -141,6 +163,30 @@ boolean validate(Metadata metadata) { throw new IllegalStateException("Wrong number of shards in routing table, missing: " + expected); } + // validate primaryTerms - only if populated + if (primaryTerms != null && primaryTerms.length > 0 && primaryTerms.length != indexMetadata.getNumberOfShards()) { + throw new IllegalStateException( + "Primary terms length [" + + primaryTerms.length + + "] does not match number of shards [" + + indexMetadata.getNumberOfShards() + + "]" + ); + } + + // validate inSyncAllocationIds - only if populated + if (inSyncAllocationIds != null + && !inSyncAllocationIds.isEmpty() + && inSyncAllocationIds.size() != indexMetadata.getNumberOfShards()) { + throw new IllegalStateException( + "InSyncAllocationIds size [" + + inSyncAllocationIds.size() + + "] does not match number of shards [" + + indexMetadata.getNumberOfShards() + + "]" + ); + } + boolean isSearchOnlyClusterBlockEnabled = indexMetadata.getSettings() .getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false); @@ -260,6 +306,61 @@ public IndexShardRoutingTable shard(int shardId) { return shards.get(shardId); } + /** + * The term of the current selected primary. This is a non-negative number incremented when + * a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary. + *

+ * Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard + * that can be indexed into) is larger than 0. See {@link IndexMetadataUpdater#applyChanges}. + **/ + public long getPrimaryTerm(int shardId) { + if (primaryTerms == null) { + throw new IllegalStateException("Primary term not populated for index {} " + index); + } + if (primaryTerms.length < shardId) { + throw new IllegalArgumentException( + "Primary term not found for shard [" + + index + + "][" + + shardId + + "]. Primary term tracked for " + + primaryTerms.length + + "shards." + ); + } + + return this.primaryTerms[shardId]; + } + + public long[] getPrimaryTerms() { + if (primaryTerms == null) { + throw new IllegalStateException("Primary term not tracked for index {} " + index); + } + return this.primaryTerms.clone(); + } + + public Set getInSyncAllocationIds(int shardId) { + if (inSyncAllocationIds.size() < shardId) { + throw new IllegalArgumentException( + "InSyncAllocationIds not found for shard [" + + index + + "][" + + shardId + + "]. InSyncAllocationIds tracked for " + + inSyncAllocationIds.size() + + "shards." + ); + } + return inSyncAllocationIds.get(shardId); + } + + public Map> getInSyncAllocationIds() { + if (inSyncAllocationIds == null || inSyncAllocationIds.isEmpty()) { + throw new IllegalStateException("InSyncAllocationIds not tracked for index {} " + index); + } + return inSyncAllocationIds; + } + /** * Returns true if all shards are primary and active. Otherwise false. */ @@ -356,7 +457,8 @@ public boolean equals(Object o) { if (!index.equals(that.index)) return false; if (!shards.equals(that.shards)) return false; - + if (!Arrays.equals(primaryTerms, that.primaryTerms)) return false; + if (!Objects.equals(inSyncAllocationIds, that.inSyncAllocationIds)) return false; return true; } @@ -364,12 +466,24 @@ public boolean equals(Object o) { public int hashCode() { int result = index.hashCode(); result = 31 * result + shards.hashCode(); + result = 31 * result + Arrays.hashCode(primaryTerms); + result = 31 * result + Objects.hashCode(inSyncAllocationIds); return result; } @Override public String toString() { - return "IndexRoutingTable{" + "shards=" + shards + ", index=" + index + '}'; + return new StringBuilder().append("IndexRoutingTable{") + .append("shards=") + .append(shards) + .append(", index=") + .append(index) + .append(", primaryTerms=") + .append(Arrays.toString(primaryTerms)) + .append(", inSyncAllocationIds=") + .append(inSyncAllocationIds) + .append('}') + .toString(); } public static IndexRoutingTable readFrom(StreamInput in) throws IOException { @@ -381,6 +495,17 @@ public static IndexRoutingTable readFrom(StreamInput in) throws IOException { builder.addIndexShard(IndexShardRoutingTable.Builder.readFromThin(in, index)); } + if (in.getVersion().onOrAfter(Version.V_3_4_0)) { + builder.setPrimaryTerms(in.readLongArray()); + int inSyncAllocationIdsSize = in.readVInt(); + for (int i = 0; i < inSyncAllocationIdsSize; i++) { + int shardId = in.readVInt(); + Set allocationIds = DiffableUtils.StringSetValueSerializer.getInstance().read(in, shardId); + builder.setInSyncAllocationIds(shardId, allocationIds); + } + builder.setNumberOfShards(inSyncAllocationIdsSize); + } + return builder.build(); } @@ -395,11 +520,29 @@ public void writeTo(StreamOutput out) throws IOException { for (IndexShardRoutingTable indexShard : this) { IndexShardRoutingTable.Builder.writeToThin(indexShard, out); } + if (out.getVersion().onOrAfter(Version.V_3_4_0)) { + if (primaryTerms != null && inSyncAllocationIds != null) { + out.writeVLongArray(primaryTerms); + out.writeVInt(inSyncAllocationIds.size()); + for (final Map.Entry> cursor : inSyncAllocationIds.entrySet()) { + out.writeVInt(cursor.getKey()); + DiffableUtils.StringSetValueSerializer.getInstance().write(cursor.getValue(), out); + } + } + } } public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { index.writeTo(out); out.writeMapValues(shards, (stream, value) -> IndexShardRoutingTable.Builder.writeVerifiableTo(value, stream)); + if (out.getVersion().onOrAfter(Version.V_3_4_0)) { + out.writeVLongArray(primaryTerms); + out.writeMap( + inSyncAllocationIds, + StreamOutput::writeVInt, + (stream, val) -> DiffableUtils.StringSetValueSerializer.getInstance().write(new TreeSet<>(val), stream) + ); + } } public static Builder builder(Index index) { @@ -416,9 +559,61 @@ public static class Builder { private final Index index; private final Map shards = new HashMap<>(); + private long[] primaryTerms = null; + private final Map> inSyncAllocationIds; + private int numberOfShards; public Builder(Index index) { this.index = index; + this.inSyncAllocationIds = new HashMap<>(); + } + + public Builder setPrimaryTerms(final long[] primaryTerms) { + this.primaryTerms = primaryTerms.clone(); + return this; + } + + public Builder setPrimaryTerm(int shardId, long primaryTerm) { + if (primaryTerms == null) { + initializePrimaryTerms(); + } + this.primaryTerms[shardId] = primaryTerm; + return this; + } + + public long getPrimaryTerm(int shardId) { + if (primaryTerms == null) { + initializePrimaryTerms(); + } + return this.primaryTerms[shardId]; + } + + private void initializePrimaryTerms() { + assert primaryTerms == null; + if (numberOfShards < 0) { + throw new IllegalStateException("you must set the number of shards before setting/reading primary terms"); + } + primaryTerms = new long[numberOfShards]; + Arrays.fill(primaryTerms, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + } + + public Builder setInSyncAllocationIds(int shardId, Set allocationIds) { + inSyncAllocationIds.put(shardId, new HashSet<>(allocationIds)); + return this; + } + + public Builder setInSyncAllocationIds(Map> inSyncAllocationIds) { + this.inSyncAllocationIds.putAll(inSyncAllocationIds); + return this; + } + + public Set getInSyncAllocationIds(int shardId) { + return inSyncAllocationIds.get(shardId); + } + + public Builder setNumberOfShards(int numberOfShards) { + this.numberOfShards = numberOfShards; + return this; } /** @@ -811,13 +1006,41 @@ public Builder addShard(ShardRouting shard) { } public IndexRoutingTable build() { - return new IndexRoutingTable(index, shards); + if (primaryTerms == null) { + initializePrimaryTerms(); + } else if (primaryTerms.length != numberOfShards) { + throw new IllegalStateException( + "primaryTerms length is [" + primaryTerms.length + "] but should be equal to number of shards [" + numberOfShards + "]" + ); + } + + // fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable + final Map> filledInSyncAllocationIds = new HashMap<>(); + for (int i = 0; i < numberOfShards; i++) { + if (inSyncAllocationIds.containsKey(i)) { + filledInSyncAllocationIds.put(i, Collections.unmodifiableSet(new HashSet<>(inSyncAllocationIds.get(i)))); + } else { + filledInSyncAllocationIds.put(i, Collections.emptySet()); + } + } + return new IndexRoutingTable(index, shards, primaryTerms, filledInSyncAllocationIds); } } public String prettyPrint() { StringBuilder sb = new StringBuilder("-- index [" + index + "]\n"); + if (primaryTerms != null) { + sb.append("-- primary terms: ").append(Arrays.toString(primaryTerms)).append("\n"); + } + + if (inSyncAllocationIds != null && !inSyncAllocationIds.isEmpty()) { + sb.append("-- in-sync allocation ids:\n"); + for (Map.Entry> entry : inSyncAllocationIds.entrySet()) { + sb.append("---- shard ").append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); + } + } + List ordered = new ArrayList<>(); for (IndexShardRoutingTable indexShard : this) { ordered.add(indexShard); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java index 13501a431d9f9..d6729aeeb88a1 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java @@ -99,6 +99,7 @@ public static class IndexRoutingTableIncrementalDiff implements Diff