Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
package org.opensearch.cluster.routing;

import org.apache.lucene.util.CollectionUtil;
import org.opensearch.Version;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RecoverySource.EmptyStoreRecoverySource;
Expand All @@ -43,6 +45,7 @@
import org.opensearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.allocation.IndexMetadataUpdater;
import org.opensearch.common.Randomness;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput;
Expand All @@ -51,16 +54,20 @@
import org.opensearch.core.common.io.stream.VerifiableWriteable;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -95,7 +102,20 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable>

private final List<ShardRouting> allActiveShards;

private final long[] primaryTerms;

private final Map<Integer, Set<String>> inSyncAllocationIds;

IndexRoutingTable(Index index, final Map<Integer, IndexShardRoutingTable> shards) {
this(index, shards, null, null);
}

IndexRoutingTable(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is a Public API, we need to maintain the constructor compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have reverted the original constructor and added a new constructor for additional fields.

Index index,
final Map<Integer, IndexShardRoutingTable> shards,
final long[] primaryTerms,
final Map<Integer, Set<String>> inSyncAllocationIds
) {
this.index = index;
this.shuffler = new RotationShardShuffler(Randomness.get().nextInt());
this.shards = Collections.unmodifiableMap(shards);
Expand All @@ -108,6 +128,8 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable>
}
}
this.allActiveShards = Collections.unmodifiableList(allActiveShards);
this.primaryTerms = primaryTerms;
this.inSyncAllocationIds = inSyncAllocationIds != null ? Collections.unmodifiableMap(inSyncAllocationIds) : Collections.emptyMap();
}

/**
Expand Down Expand Up @@ -141,6 +163,30 @@ boolean validate(Metadata metadata) {
throw new IllegalStateException("Wrong number of shards in routing table, missing: " + expected);
}

// validate primaryTerms - only if populated
if (primaryTerms != null && primaryTerms.length > 0 && primaryTerms.length != indexMetadata.getNumberOfShards()) {
throw new IllegalStateException(
"Primary terms length ["
+ primaryTerms.length
+ "] does not match number of shards ["
+ indexMetadata.getNumberOfShards()
+ "]"
);
}

// validate inSyncAllocationIds - only if populated
if (inSyncAllocationIds != null
&& !inSyncAllocationIds.isEmpty()
&& inSyncAllocationIds.size() != indexMetadata.getNumberOfShards()) {
throw new IllegalStateException(
"InSyncAllocationIds size ["
+ inSyncAllocationIds.size()
+ "] does not match number of shards ["
+ indexMetadata.getNumberOfShards()
+ "]"
);
}

boolean isSearchOnlyClusterBlockEnabled = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false);

Expand Down Expand Up @@ -260,6 +306,61 @@ public IndexShardRoutingTable shard(int shardId) {
return shards.get(shardId);
}

/**
* The term of the current selected primary. This is a non-negative number incremented when
* a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary.
* <p>
* Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard
* that can be indexed into) is larger than 0. See {@link IndexMetadataUpdater#applyChanges}.
**/
public long getPrimaryTerm(int shardId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maintain parity with the existing getter in IndexMetadata?

    public long primaryTerm(int shardId) {
        return this.primaryTerms[shardId];
    }

Also, since we are allowing null values in validate, can we have null checks?

Copy link
Contributor Author

@SwethaGuptha SwethaGuptha Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a specific reason to maintain this parity. Getters of other IndexRoutingTable class variables are prefixed with get, hence kept the getter naming convention within the class.

Also, since we are allowing null values in validate, can we have null checks?

In the core the values will be null only for Remote downloads which is not addressed in scope of this CR. Added check for NPE.

if (primaryTerms == null) {
throw new IllegalStateException("Primary term not populated for index {} " + index);
}
if (primaryTerms.length < shardId) {
throw new IllegalArgumentException(
"Primary term not found for shard ["
+ index
+ "]["
+ shardId
+ "]. Primary term tracked for "
+ primaryTerms.length
+ "shards."
);
}

return this.primaryTerms[shardId];
}

public long[] getPrimaryTerms() {
if (primaryTerms == null) {
throw new IllegalStateException("Primary term not tracked for index {} " + index);
}
return this.primaryTerms.clone();
}

public Set<String> getInSyncAllocationIds(int shardId) {
if (inSyncAllocationIds.size() < shardId) {
throw new IllegalArgumentException(
"InSyncAllocationIds not found for shard ["
+ index
+ "]["
+ shardId
+ "]. InSyncAllocationIds tracked for "
+ inSyncAllocationIds.size()
+ "shards."
);
}
return inSyncAllocationIds.get(shardId);
}

public Map<Integer, Set<String>> getInSyncAllocationIds() {
if (inSyncAllocationIds == null || inSyncAllocationIds.isEmpty()) {
throw new IllegalStateException("InSyncAllocationIds not tracked for index {} " + index);
}
return inSyncAllocationIds;
}

/**
* Returns <code>true</code> if all shards are primary and active. Otherwise <code>false</code>.
*/
Expand Down Expand Up @@ -356,20 +457,33 @@ public boolean equals(Object o) {

if (!index.equals(that.index)) return false;
if (!shards.equals(that.shards)) return false;

if (!Arrays.equals(primaryTerms, that.primaryTerms)) return false;
if (!Objects.equals(inSyncAllocationIds, that.inSyncAllocationIds)) return false;
return true;
}

@Override
public int hashCode() {
int result = index.hashCode();
result = 31 * result + shards.hashCode();
result = 31 * result + Arrays.hashCode(primaryTerms);
result = 31 * result + Objects.hashCode(inSyncAllocationIds);
return result;
}

@Override
public String toString() {
return "IndexRoutingTable{" + "shards=" + shards + ", index=" + index + '}';
return new StringBuilder().append("IndexRoutingTable{")
.append("shards=")
.append(shards)
.append(", index=")
.append(index)
.append(", primaryTerms=")
.append(Arrays.toString(primaryTerms))
.append(", inSyncAllocationIds=")
.append(inSyncAllocationIds)
.append('}')
.toString();
}

public static IndexRoutingTable readFrom(StreamInput in) throws IOException {
Expand All @@ -381,6 +495,17 @@ public static IndexRoutingTable readFrom(StreamInput in) throws IOException {
builder.addIndexShard(IndexShardRoutingTable.Builder.readFromThin(in, index));
}

if (in.getVersion().onOrAfter(Version.V_3_4_0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe branch for 3.4 has already been cut. Do we need to change checks to 3.5?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay to leave it at 3.4 for now. We will not merge this PR now until the remaining PRs are published. All the PRs will be merged together, we can update the version at the time of merge to main.

builder.setPrimaryTerms(in.readLongArray());
int inSyncAllocationIdsSize = in.readVInt();
for (int i = 0; i < inSyncAllocationIdsSize; i++) {
int shardId = in.readVInt();
Set<String> allocationIds = DiffableUtils.StringSetValueSerializer.getInstance().read(in, shardId);
builder.setInSyncAllocationIds(shardId, allocationIds);
}
builder.setNumberOfShards(inSyncAllocationIdsSize);
}

return builder.build();
}

Expand All @@ -395,11 +520,29 @@ public void writeTo(StreamOutput out) throws IOException {
for (IndexShardRoutingTable indexShard : this) {
IndexShardRoutingTable.Builder.writeToThin(indexShard, out);
}
if (out.getVersion().onOrAfter(Version.V_3_4_0)) {
if (primaryTerms != null && inSyncAllocationIds != null) {
out.writeVLongArray(primaryTerms);
out.writeVInt(inSyncAllocationIds.size());
for (final Map.Entry<Integer, Set<String>> cursor : inSyncAllocationIds.entrySet()) {
out.writeVInt(cursor.getKey());
DiffableUtils.StringSetValueSerializer.getInstance().write(cursor.getValue(), out);
}
}
}
}

public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
index.writeTo(out);
out.writeMapValues(shards, (stream, value) -> IndexShardRoutingTable.Builder.writeVerifiableTo(value, stream));
if (out.getVersion().onOrAfter(Version.V_3_4_0)) {
out.writeVLongArray(primaryTerms);
out.writeMap(
inSyncAllocationIds,
StreamOutput::writeVInt,
(stream, val) -> DiffableUtils.StringSetValueSerializer.getInstance().write(new TreeSet<>(val), stream)
);
}
}

public static Builder builder(Index index) {
Expand All @@ -416,9 +559,61 @@ public static class Builder {

private final Index index;
private final Map<Integer, IndexShardRoutingTable> shards = new HashMap<>();
private long[] primaryTerms = null;
private final Map<Integer, Set<String>> inSyncAllocationIds;
private int numberOfShards;

public Builder(Index index) {
this.index = index;
this.inSyncAllocationIds = new HashMap<>();
}

public Builder setPrimaryTerms(final long[] primaryTerms) {
this.primaryTerms = primaryTerms.clone();
return this;
}

public Builder setPrimaryTerm(int shardId, long primaryTerm) {
if (primaryTerms == null) {
initializePrimaryTerms();
}
this.primaryTerms[shardId] = primaryTerm;
return this;
}

public long getPrimaryTerm(int shardId) {
if (primaryTerms == null) {
initializePrimaryTerms();
}
return this.primaryTerms[shardId];
}

private void initializePrimaryTerms() {
assert primaryTerms == null;
if (numberOfShards < 0) {
throw new IllegalStateException("you must set the number of shards before setting/reading primary terms");
}
primaryTerms = new long[numberOfShards];
Arrays.fill(primaryTerms, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public Builder setInSyncAllocationIds(int shardId, Set<String> allocationIds) {
inSyncAllocationIds.put(shardId, new HashSet<>(allocationIds));
return this;
}

public Builder setInSyncAllocationIds(Map<Integer, Set<String>> inSyncAllocationIds) {
this.inSyncAllocationIds.putAll(inSyncAllocationIds);
return this;
}

public Set<String> getInSyncAllocationIds(int shardId) {
return inSyncAllocationIds.get(shardId);
}

public Builder setNumberOfShards(int numberOfShards) {
this.numberOfShards = numberOfShards;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even require this member variable? Its easy to miss out on setting this in ser/de. Besides, this anyway doesn't get set from IndexMetadata, so it seems like we will end up asserting the same value which we set using array's length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be set from IndexMetadata. Variable numberOfShards is required to configure the primaryTerms and in-sync allocation Ids size. I have handled the ser/de bug.

return this;
}
Comment on lines 560 to 617
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Builder numberOfShards / initializePrimaryTerms contract is easy to misuse

The builder changes are generally good, but there’s a subtle footgun:

  • numberOfShards defaults to 0.
  • initializePrimaryTerms() only throws when numberOfShards < 0, then allocates new long[numberOfShards].
  • setPrimaryTerm() and getPrimaryTerm() call initializePrimaryTerms() when primaryTerms == null.

If a caller forgets to call setNumberOfShards() before using these methods (or before build() when primaryTerms is still null), initializePrimaryTerms() will happily create a long[0], and subsequent access like this.primaryTerms[shardId] will fail with ArrayIndexOutOfBoundsException.

Given this is a public builder API, I’d recommend tightening the contract:

  • Either infer numberOfShards from shards.size() when it hasn’t been set, or
  • Treat “unset” explicitly and fail fast.

For example:

-        private int numberOfShards;
+        private int numberOfShards; // 0 means "not explicitly set"
@@
         private void initializePrimaryTerms() {
             assert primaryTerms == null;
-            if (numberOfShards < 0) {
-                throw new IllegalStateException("you must set the number of shards before setting/reading primary terms");
-            }
-            primaryTerms = new long[numberOfShards];
+            int shardsCount = numberOfShards > 0 ? numberOfShards : shards.size();
+            if (shardsCount <= 0) {
+                throw new IllegalStateException("you must set the number of shards before setting/reading primary terms");
+            }
+            numberOfShards = shardsCount;
+            primaryTerms = new long[numberOfShards];
             Arrays.fill(primaryTerms, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
         }
@@
         public IndexRoutingTable build() {
-            if (primaryTerms == null) {
-                initializePrimaryTerms();
-            } else if (primaryTerms.length != numberOfShards) {
+            if (primaryTerms == null) {
+                initializePrimaryTerms();
+            } else if (numberOfShards > 0 && primaryTerms.length != numberOfShards) {
                 throw new IllegalStateException(
                     "primaryTerms length is [" + primaryTerms.length + "] but should be equal to number of shards [" + numberOfShards + "]"
                 );
             }

This keeps existing builder usages working (deriving from shards.size() when explicit information is missing) while still enforcing a consistent array length once numberOfShards is known.

Also applies to: 591-598, 600-617, 1009-1026

🤖 Prompt for AI Agents
In server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java
around lines 560-617, the Builder currently defaults numberOfShards to 0 so
initializePrimaryTerms() can silently create a zero-length primaryTerms array
causing ArrayIndexOutOfBounds later; change initializePrimaryTerms() to treat
"unset" explicitly by inferring numberOfShards from shards.size() when
numberOfShards is not set (or <= 0), and if that still yields 0 throw an
IllegalStateException requiring numberOfShards to be set; also validate
setNumberOfShards to require a positive value (or refuse changes after
primaryTerms is initialized) so primaryTerms length is always consistent with
the expected shard count.


/**
Expand Down Expand Up @@ -811,13 +1006,41 @@ public Builder addShard(ShardRouting shard) {
}

public IndexRoutingTable build() {
return new IndexRoutingTable(index, shards);
if (primaryTerms == null) {
initializePrimaryTerms();
} else if (primaryTerms.length != numberOfShards) {
throw new IllegalStateException(
"primaryTerms length is [" + primaryTerms.length + "] but should be equal to number of shards [" + numberOfShards + "]"
);
}

// fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this case arise? Should we log an error here?

Copy link
Contributor Author

@SwethaGuptha SwethaGuptha Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One valid case is initializing the in-sync when creating a new index. There might be some valid cases during snapshot/remote restore, dangling indices restore. Will add a error log later if required.

final Map<Integer, Set<String>> filledInSyncAllocationIds = new HashMap<>();
for (int i = 0; i < numberOfShards; i++) {
if (inSyncAllocationIds.containsKey(i)) {
filledInSyncAllocationIds.put(i, Collections.unmodifiableSet(new HashSet<>(inSyncAllocationIds.get(i))));
} else {
filledInSyncAllocationIds.put(i, Collections.emptySet());
}
}
return new IndexRoutingTable(index, shards, primaryTerms, filledInSyncAllocationIds);
}
}

public String prettyPrint() {
StringBuilder sb = new StringBuilder("-- index [" + index + "]\n");

if (primaryTerms != null) {
sb.append("-- primary terms: ").append(Arrays.toString(primaryTerms)).append("\n");
}

if (inSyncAllocationIds != null && !inSyncAllocationIds.isEmpty()) {
sb.append("-- in-sync allocation ids:\n");
for (Map.Entry<Integer, Set<String>> entry : inSyncAllocationIds.entrySet()) {
sb.append("---- shard ").append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
}
}

List<IndexShardRoutingTable> ordered = new ArrayList<>();
for (IndexShardRoutingTable indexShard : this) {
ordered.add(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static class IndexRoutingTableIncrementalDiff implements Diff<IndexRoutin

private final Index index;

// TODO: Add primary term and in-sync allocation ids to remote diff
public IndexRoutingTableIncrementalDiff(Index index, IndexRoutingTable before, IndexRoutingTable after) {
this.index = index;
this.indexShardRoutingTables = DiffableUtils.diff(before.getShards(), after.getShards(), DiffableUtils.getIntKeySerializer());
Expand All @@ -114,6 +115,7 @@ public IndexRoutingTableIncrementalDiff(StreamInput in) throws IOException {

@Override
public IndexRoutingTable apply(IndexRoutingTable part) {
// TODO: fix this for remote publication
return new IndexRoutingTable(index, indexShardRoutingTables.apply(part.getShards()));
}

Expand Down
Loading