-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Introduce primaryTerm and inSynAllocationIds in IndexRoutingTable #20194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,8 +33,10 @@ | |
| package org.opensearch.cluster.routing; | ||
|
|
||
| import org.apache.lucene.util.CollectionUtil; | ||
| import org.opensearch.Version; | ||
| import org.opensearch.cluster.AbstractDiffable; | ||
| import org.opensearch.cluster.Diff; | ||
| import org.opensearch.cluster.DiffableUtils; | ||
| import org.opensearch.cluster.metadata.IndexMetadata; | ||
| import org.opensearch.cluster.metadata.Metadata; | ||
| import org.opensearch.cluster.routing.RecoverySource.EmptyStoreRecoverySource; | ||
|
|
@@ -43,6 +45,7 @@ | |
| import org.opensearch.cluster.routing.RecoverySource.PeerRecoverySource; | ||
| import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource; | ||
| import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; | ||
| import org.opensearch.cluster.routing.allocation.IndexMetadataUpdater; | ||
| import org.opensearch.common.Randomness; | ||
| import org.opensearch.common.annotation.PublicApi; | ||
| import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; | ||
|
|
@@ -51,16 +54,20 @@ | |
| import org.opensearch.core.common.io.stream.VerifiableWriteable; | ||
| import org.opensearch.core.index.Index; | ||
| import org.opensearch.core.index.shard.ShardId; | ||
| import org.opensearch.index.seqno.SequenceNumbers; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.TreeSet; | ||
| import java.util.function.Predicate; | ||
|
|
||
| /** | ||
|
|
@@ -95,7 +102,20 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> | |
|
|
||
| private final List<ShardRouting> allActiveShards; | ||
|
|
||
| private final long[] primaryTerms; | ||
|
|
||
| private final Map<Integer, Set<String>> inSyncAllocationIds; | ||
|
|
||
| IndexRoutingTable(Index index, final Map<Integer, IndexShardRoutingTable> shards) { | ||
| this(index, shards, null, null); | ||
| } | ||
|
|
||
| IndexRoutingTable( | ||
| Index index, | ||
| final Map<Integer, IndexShardRoutingTable> shards, | ||
| final long[] primaryTerms, | ||
| final Map<Integer, Set<String>> inSyncAllocationIds | ||
| ) { | ||
| this.index = index; | ||
| this.shuffler = new RotationShardShuffler(Randomness.get().nextInt()); | ||
| this.shards = Collections.unmodifiableMap(shards); | ||
|
|
@@ -108,6 +128,8 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> | |
| } | ||
| } | ||
| this.allActiveShards = Collections.unmodifiableList(allActiveShards); | ||
| this.primaryTerms = primaryTerms; | ||
| this.inSyncAllocationIds = inSyncAllocationIds != null ? Collections.unmodifiableMap(inSyncAllocationIds) : Collections.emptyMap(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -141,6 +163,30 @@ boolean validate(Metadata metadata) { | |
| throw new IllegalStateException("Wrong number of shards in routing table, missing: " + expected); | ||
| } | ||
|
|
||
| // validate primaryTerms - only if populated | ||
| if (primaryTerms != null && primaryTerms.length > 0 && primaryTerms.length != indexMetadata.getNumberOfShards()) { | ||
| throw new IllegalStateException( | ||
| "Primary terms length [" | ||
| + primaryTerms.length | ||
| + "] does not match number of shards [" | ||
| + indexMetadata.getNumberOfShards() | ||
| + "]" | ||
| ); | ||
| } | ||
|
|
||
| // validate inSyncAllocationIds - only if populated | ||
| if (inSyncAllocationIds != null | ||
| && !inSyncAllocationIds.isEmpty() | ||
| && inSyncAllocationIds.size() != indexMetadata.getNumberOfShards()) { | ||
| throw new IllegalStateException( | ||
| "InSyncAllocationIds size [" | ||
| + inSyncAllocationIds.size() | ||
| + "] does not match number of shards [" | ||
| + indexMetadata.getNumberOfShards() | ||
| + "]" | ||
| ); | ||
| } | ||
|
|
||
| boolean isSearchOnlyClusterBlockEnabled = indexMetadata.getSettings() | ||
| .getAsBoolean(IndexMetadata.INDEX_BLOCKS_SEARCH_ONLY_SETTING.getKey(), false); | ||
|
|
||
|
|
@@ -260,6 +306,61 @@ public IndexShardRoutingTable shard(int shardId) { | |
| return shards.get(shardId); | ||
| } | ||
|
|
||
| /** | ||
| * The term of the current selected primary. This is a non-negative number incremented when | ||
| * a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary. | ||
| * <p> | ||
| * Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard | ||
| * that can be indexed into) is larger than 0. See {@link IndexMetadataUpdater#applyChanges}. | ||
| **/ | ||
| public long getPrimaryTerm(int shardId) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we maintain parity with the existing getter in IndexMetadata? Also, since we are allowing null values in validate, can we have null checks?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see a specific reason to maintain this parity. Getters of other IndexRoutingTable class variables are prefixed with get, hence kept the getter naming convention within the class.
In the core the values will be null only for Remote downloads which is not addressed in scope of this CR. Added check for NPE. |
||
| if (primaryTerms == null) { | ||
| throw new IllegalStateException("Primary term not populated for index {} " + index); | ||
| } | ||
| if (primaryTerms.length < shardId) { | ||
| throw new IllegalArgumentException( | ||
| "Primary term not found for shard [" | ||
| + index | ||
| + "][" | ||
| + shardId | ||
| + "]. Primary term tracked for " | ||
| + primaryTerms.length | ||
| + "shards." | ||
| ); | ||
| } | ||
|
|
||
| return this.primaryTerms[shardId]; | ||
| } | ||
|
|
||
| public long[] getPrimaryTerms() { | ||
| if (primaryTerms == null) { | ||
| throw new IllegalStateException("Primary term not tracked for index {} " + index); | ||
| } | ||
| return this.primaryTerms.clone(); | ||
| } | ||
|
|
||
| public Set<String> getInSyncAllocationIds(int shardId) { | ||
| if (inSyncAllocationIds.size() < shardId) { | ||
| throw new IllegalArgumentException( | ||
| "InSyncAllocationIds not found for shard [" | ||
| + index | ||
| + "][" | ||
| + shardId | ||
| + "]. InSyncAllocationIds tracked for " | ||
| + inSyncAllocationIds.size() | ||
| + "shards." | ||
| ); | ||
| } | ||
| return inSyncAllocationIds.get(shardId); | ||
SwethaGuptha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| public Map<Integer, Set<String>> getInSyncAllocationIds() { | ||
| if (inSyncAllocationIds == null || inSyncAllocationIds.isEmpty()) { | ||
| throw new IllegalStateException("InSyncAllocationIds not tracked for index {} " + index); | ||
| } | ||
| return inSyncAllocationIds; | ||
| } | ||
SwethaGuptha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Returns <code>true</code> if all shards are primary and active. Otherwise <code>false</code>. | ||
| */ | ||
|
|
@@ -356,20 +457,33 @@ public boolean equals(Object o) { | |
|
|
||
| if (!index.equals(that.index)) return false; | ||
| if (!shards.equals(that.shards)) return false; | ||
|
|
||
| if (!Arrays.equals(primaryTerms, that.primaryTerms)) return false; | ||
| if (!Objects.equals(inSyncAllocationIds, that.inSyncAllocationIds)) return false; | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| int result = index.hashCode(); | ||
| result = 31 * result + shards.hashCode(); | ||
| result = 31 * result + Arrays.hashCode(primaryTerms); | ||
| result = 31 * result + Objects.hashCode(inSyncAllocationIds); | ||
| return result; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "IndexRoutingTable{" + "shards=" + shards + ", index=" + index + '}'; | ||
| return new StringBuilder().append("IndexRoutingTable{") | ||
| .append("shards=") | ||
| .append(shards) | ||
| .append(", index=") | ||
| .append(index) | ||
| .append(", primaryTerms=") | ||
| .append(Arrays.toString(primaryTerms)) | ||
| .append(", inSyncAllocationIds=") | ||
| .append(inSyncAllocationIds) | ||
| .append('}') | ||
| .toString(); | ||
| } | ||
|
|
||
| public static IndexRoutingTable readFrom(StreamInput in) throws IOException { | ||
|
|
@@ -381,6 +495,17 @@ public static IndexRoutingTable readFrom(StreamInput in) throws IOException { | |
| builder.addIndexShard(IndexShardRoutingTable.Builder.readFromThin(in, index)); | ||
| } | ||
|
|
||
| if (in.getVersion().onOrAfter(Version.V_3_4_0)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe branch for 3.4 has already been cut. Do we need to change checks to 3.5?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's okay to leave it at 3.4 for now. We will not merge this PR now until the remaining PRs are published. All the PRs will be merged together, we can update the version at the time of merge to main. |
||
| builder.setPrimaryTerms(in.readLongArray()); | ||
| int inSyncAllocationIdsSize = in.readVInt(); | ||
| for (int i = 0; i < inSyncAllocationIdsSize; i++) { | ||
| int shardId = in.readVInt(); | ||
| Set<String> allocationIds = DiffableUtils.StringSetValueSerializer.getInstance().read(in, shardId); | ||
| builder.setInSyncAllocationIds(shardId, allocationIds); | ||
| } | ||
| builder.setNumberOfShards(inSyncAllocationIdsSize); | ||
| } | ||
|
|
||
| return builder.build(); | ||
| } | ||
|
|
||
|
|
@@ -395,11 +520,29 @@ public void writeTo(StreamOutput out) throws IOException { | |
| for (IndexShardRoutingTable indexShard : this) { | ||
| IndexShardRoutingTable.Builder.writeToThin(indexShard, out); | ||
| } | ||
| if (out.getVersion().onOrAfter(Version.V_3_4_0)) { | ||
| if (primaryTerms != null && inSyncAllocationIds != null) { | ||
| out.writeVLongArray(primaryTerms); | ||
| out.writeVInt(inSyncAllocationIds.size()); | ||
| for (final Map.Entry<Integer, Set<String>> cursor : inSyncAllocationIds.entrySet()) { | ||
| out.writeVInt(cursor.getKey()); | ||
| DiffableUtils.StringSetValueSerializer.getInstance().write(cursor.getValue(), out); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException { | ||
| index.writeTo(out); | ||
| out.writeMapValues(shards, (stream, value) -> IndexShardRoutingTable.Builder.writeVerifiableTo(value, stream)); | ||
| if (out.getVersion().onOrAfter(Version.V_3_4_0)) { | ||
SwethaGuptha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| out.writeVLongArray(primaryTerms); | ||
| out.writeMap( | ||
| inSyncAllocationIds, | ||
| StreamOutput::writeVInt, | ||
| (stream, val) -> DiffableUtils.StringSetValueSerializer.getInstance().write(new TreeSet<>(val), stream) | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| public static Builder builder(Index index) { | ||
|
|
@@ -416,9 +559,61 @@ public static class Builder { | |
|
|
||
| private final Index index; | ||
| private final Map<Integer, IndexShardRoutingTable> shards = new HashMap<>(); | ||
| private long[] primaryTerms = null; | ||
| private final Map<Integer, Set<String>> inSyncAllocationIds; | ||
| private int numberOfShards; | ||
|
|
||
| public Builder(Index index) { | ||
| this.index = index; | ||
| this.inSyncAllocationIds = new HashMap<>(); | ||
| } | ||
|
|
||
| public Builder setPrimaryTerms(final long[] primaryTerms) { | ||
| this.primaryTerms = primaryTerms.clone(); | ||
| return this; | ||
| } | ||
|
|
||
| public Builder setPrimaryTerm(int shardId, long primaryTerm) { | ||
| if (primaryTerms == null) { | ||
| initializePrimaryTerms(); | ||
| } | ||
| this.primaryTerms[shardId] = primaryTerm; | ||
| return this; | ||
| } | ||
|
|
||
| public long getPrimaryTerm(int shardId) { | ||
| if (primaryTerms == null) { | ||
| initializePrimaryTerms(); | ||
| } | ||
| return this.primaryTerms[shardId]; | ||
| } | ||
|
|
||
| private void initializePrimaryTerms() { | ||
| assert primaryTerms == null; | ||
| if (numberOfShards < 0) { | ||
| throw new IllegalStateException("you must set the number of shards before setting/reading primary terms"); | ||
| } | ||
| primaryTerms = new long[numberOfShards]; | ||
| Arrays.fill(primaryTerms, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| public Builder setInSyncAllocationIds(int shardId, Set<String> allocationIds) { | ||
| inSyncAllocationIds.put(shardId, new HashSet<>(allocationIds)); | ||
| return this; | ||
| } | ||
|
|
||
| public Builder setInSyncAllocationIds(Map<Integer, Set<String>> inSyncAllocationIds) { | ||
| this.inSyncAllocationIds.putAll(inSyncAllocationIds); | ||
| return this; | ||
| } | ||
|
|
||
| public Set<String> getInSyncAllocationIds(int shardId) { | ||
| return inSyncAllocationIds.get(shardId); | ||
| } | ||
|
|
||
| public Builder setNumberOfShards(int numberOfShards) { | ||
| this.numberOfShards = numberOfShards; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we even require this member variable? Its easy to miss out on setting this in ser/de. Besides, this anyway doesn't get set from IndexMetadata, so it seems like we will end up asserting the same value which we set using array's length.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be set from IndexMetadata. Variable numberOfShards is required to configure the primaryTerms and in-sync allocation Ids size. I have handled the ser/de bug. |
||
| return this; | ||
| } | ||
|
Comment on lines
560
to
617
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Builder The builder changes are generally good, but there’s a subtle footgun:
If a caller forgets to call Given this is a public builder API, I’d recommend tightening the contract:
For example: - private int numberOfShards;
+ private int numberOfShards; // 0 means "not explicitly set"
@@
private void initializePrimaryTerms() {
assert primaryTerms == null;
- if (numberOfShards < 0) {
- throw new IllegalStateException("you must set the number of shards before setting/reading primary terms");
- }
- primaryTerms = new long[numberOfShards];
+ int shardsCount = numberOfShards > 0 ? numberOfShards : shards.size();
+ if (shardsCount <= 0) {
+ throw new IllegalStateException("you must set the number of shards before setting/reading primary terms");
+ }
+ numberOfShards = shardsCount;
+ primaryTerms = new long[numberOfShards];
Arrays.fill(primaryTerms, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}
@@
public IndexRoutingTable build() {
- if (primaryTerms == null) {
- initializePrimaryTerms();
- } else if (primaryTerms.length != numberOfShards) {
+ if (primaryTerms == null) {
+ initializePrimaryTerms();
+ } else if (numberOfShards > 0 && primaryTerms.length != numberOfShards) {
throw new IllegalStateException(
"primaryTerms length is [" + primaryTerms.length + "] but should be equal to number of shards [" + numberOfShards + "]"
);
}This keeps existing builder usages working (deriving from Also applies to: 591-598, 600-617, 1009-1026 🤖 Prompt for AI Agents |
||
|
|
||
| /** | ||
|
|
@@ -811,13 +1006,41 @@ public Builder addShard(ShardRouting shard) { | |
| } | ||
|
|
||
| public IndexRoutingTable build() { | ||
| return new IndexRoutingTable(index, shards); | ||
| if (primaryTerms == null) { | ||
| initializePrimaryTerms(); | ||
| } else if (primaryTerms.length != numberOfShards) { | ||
| throw new IllegalStateException( | ||
| "primaryTerms length is [" + primaryTerms.length + "] but should be equal to number of shards [" + numberOfShards + "]" | ||
| ); | ||
| } | ||
|
|
||
| // fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When can this case arise? Should we log an error here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One valid case is initializing the in-sync when creating a new index. There might be some valid cases during snapshot/remote restore, dangling indices restore. Will add a error log later if required. |
||
| final Map<Integer, Set<String>> filledInSyncAllocationIds = new HashMap<>(); | ||
| for (int i = 0; i < numberOfShards; i++) { | ||
| if (inSyncAllocationIds.containsKey(i)) { | ||
| filledInSyncAllocationIds.put(i, Collections.unmodifiableSet(new HashSet<>(inSyncAllocationIds.get(i)))); | ||
| } else { | ||
| filledInSyncAllocationIds.put(i, Collections.emptySet()); | ||
| } | ||
| } | ||
| return new IndexRoutingTable(index, shards, primaryTerms, filledInSyncAllocationIds); | ||
| } | ||
| } | ||
|
|
||
| public String prettyPrint() { | ||
| StringBuilder sb = new StringBuilder("-- index [" + index + "]\n"); | ||
|
|
||
| if (primaryTerms != null) { | ||
| sb.append("-- primary terms: ").append(Arrays.toString(primaryTerms)).append("\n"); | ||
| } | ||
|
|
||
| if (inSyncAllocationIds != null && !inSyncAllocationIds.isEmpty()) { | ||
| sb.append("-- in-sync allocation ids:\n"); | ||
| for (Map.Entry<Integer, Set<String>> entry : inSyncAllocationIds.entrySet()) { | ||
| sb.append("---- shard ").append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); | ||
| } | ||
| } | ||
|
|
||
| List<IndexShardRoutingTable> ordered = new ArrayList<>(); | ||
| for (IndexShardRoutingTable indexShard : this) { | ||
| ordered.add(indexShard); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is a Public API, we need to maintain the constructor compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have reverted the original constructor and added a new constructor for additional fields.