Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
chargePendingTransition(resource, currentStateOutput, throttleController, cache,
preferenceLists, stateModelDef);

boolean recoveryRebalanceForTopStateDownwardTransition =
clusterConfig.isRecoveryBalanceForTopStateDownwardTransitionEnabled();

// Sort partitions in case of urgent partition need to take the quota first.
List<Partition> partitions = new ArrayList<>(resource.getPartitions());
partitions.sort(new PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
Expand All @@ -375,7 +378,10 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
for (Message message : messagesToThrottle) {
RebalanceType rebalanceType =
getRebalanceTypePerMessage(requiredState, message, derivedCurrentStateMap);

if(recoveryRebalanceForTopStateDownwardTransition) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move the logic to getRebalanceTypePerMessage() method itself - instead of overriding the decision here.

rebalanceType = validateTopStateDownwardTransition(stateModelDef, message) ? RebalanceType.RECOVERY_BALANCE :
rebalanceType;
}
// Number of states required by StateModelDefinition are not satisfied, need recovery
if (rebalanceType.equals(RebalanceType.RECOVERY_BALANCE)) {
message.setSTRebalanceType(Message.STRebalanceType.RECOVERY_REBALANCE);
Expand Down Expand Up @@ -435,6 +441,18 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
return intermediatePartitionStateMap;
}

private boolean validateTopStateDownwardTransition(StateModelDefinition stateModelDef, Message msg) {
String topState = stateModelDef.getTopState();
if (topState == null) {
return false;
}
String secondTopState = stateModelDef.getNextStateForTransition(topState, stateModelDef.getInitialState());
if (secondTopState == null) {
return false;
}
return msg.getFromState() == topState && msg.getToState() == secondTopState;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return msg.getFromState() == topState && msg.getToState() == secondTopState;
return topState.equals(msg.getFromState()) && secondTopState.equals(msg.getToState());

This might not work. We should use .equals for String comparision. IIRC == compares object reference and not the content.

}

/**
* Determine the message is downward message or not.
* @param message message for load rebalance
Expand Down
12 changes: 12 additions & 0 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ public enum ClusterConfigProperty {

// Allow disabled partitions to remain OFFLINE instead of being reassigned in WAGED rebalancer
RELAXED_DISABLED_PARTITION_CONSTRAINT,

// If enabled, all downward transitions from TopState (e.g., MASTER→SLAVE or LEADER→STANDBY)
// are classified as RECOVERY_REBALANCE instead of LOAD_BALANCE.
ENABLE_RECOVERY_REBALANCE_FOR_TOPSTATE_DOWNWARD_TRANSITION,
}

public enum GlobalRebalancePreferenceKey {
Expand Down Expand Up @@ -1309,4 +1313,12 @@ public void setParticipantDeregistrationTimeout(long timeout) {
public boolean isParticipantDeregistrationEnabled() {
return getParticipantDeregistrationTimeout() > -1;
}

public boolean isRecoveryBalanceForTopStateDownwardTransitionEnabled() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public boolean isRecoveryBalanceForTopStateDownwardTransitionEnabled() {
public boolean isRecoveryRebalanceForTopStateDownwardTransitionEnabled() {

Rebalance**

return _record.getBooleanField(ClusterConfigProperty.ENABLE_RECOVERY_REBALANCE_FOR_TOPSTATE_DOWNWARD_TRANSITION.name(), false);
}

public void setRecoveryBalanceForTopStateDownwardTransitionEnabled(boolean enabled) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void setRecoveryBalanceForTopStateDownwardTransitionEnabled(boolean enabled) {
public void setRecoveryRebalanceForTopStateDownwardTransitionEnabled(boolean enabled) {

Rebalance**

_record.setBooleanField(ClusterConfigProperty.ENABLE_RECOVERY_REBALANCE_FOR_TOPSTATE_DOWNWARD_TRANSITION.name(),enabled);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_record.setBooleanField(ClusterConfigProperty.ENABLE_RECOVERY_REBALANCE_FOR_TOPSTATE_DOWNWARD_TRANSITION.name(),enabled);
_record.setBooleanField(ClusterConfigProperty.ENABLE_RECOVERY_REBALANCE_FOR_TOPSTATE_DOWNWARD_TRANSITION.name(), enabled);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -649,6 +650,102 @@ public void testMessageAlreadyApplied() {
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
}
}
@Test
public void testEnableRecoveryRebalanceForTopStateDownwardStateTransition() {
String[] resources = {"TestResource"};
String resource = resources[0];
int nReplica = 3;
int nPartition = 1;

// ---- Basic setup ----
setupIdealState(4, resources, nPartition, nReplica, IdealState.RebalanceMode.FULL_AUTO,
"MasterSlave", null, null, 2);
setupStateModel();
setupInstances(4);
setupLiveInstances(4);
event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(resources, nPartition, "MasterSlave"));
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
getResourceMap(resources, nPartition, "MasterSlave"));

BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
MessageOutput messageSelectOutput = new MessageOutput();
IntermediateStateOutput expectedResult = new IntermediateStateOutput();

// To simulate that master->slave downward transition is recovery rebalance
// we set load balance throttling to allow 0 load balance messages per instance
_clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
_clusterConfig.setStateTransitionThrottleConfigs(ImmutableList.of(
new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 0)));
setClusterConfig(_clusterConfig);

// ---- Partition and states ----
Partition partition = new Partition(resource + "_0");
Map<String, List<String>> partitionMap = new HashMap<>();
partitionMap.put(partition.getPartitionName(), Arrays.asList("localhost_1", "localhost_2", "localhost_3"));
bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);

// Best possible: new assignment
bestPossibleStateOutput.setState(resource, partition, "localhost_1", "MASTER");
bestPossibleStateOutput.setState(resource, partition, "localhost_2", "SLAVE");
bestPossibleStateOutput.setState(resource, partition, "localhost_3", "SLAVE");

// Current: old assignment
currentStateOutput.setCurrentState(resource, partition, "localhost_0", "MASTER");
currentStateOutput.setCurrentState(resource, partition, "localhost_1", "SLAVE");
currentStateOutput.setCurrentState(resource, partition, "localhost_2", "SLAVE");

// Message to demote localhost_0 from MASTER → SLAVE
messageSelectOutput.addMessage(resource, partition,
generateMessage("MASTER", "SLAVE", "localhost_0"));

// ---- Attach attributes and run ----
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());

runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());

IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());

// Validate message got throttled because it was load balance type
Assert.assertEquals(messageSelectOutput.getMessages(resource, partition).size(), 0);
// intermediate output matches current state output because ST message got throttled
Assert.assertTrue(
output.getPartitionStateMap(resource).getPartitionMap(partition).
equals(currentStateOutput.getCurrentStateMap(resource, partition)));

//-------- Test Success scenario--------------

// Set config to treat topState downward transition as recovery rebalance
_clusterConfig.setRecoveryBalanceForTopStateDownwardTransitionEnabled(true);
setClusterConfig(_clusterConfig);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
// Reset selected message to demote localhost_0
// Message to demote localhost_0 from MASTER → SLAVE
messageSelectOutput.addMessage(resource, partition,
generateMessage("MASTER", "SLAVE", "localhost_0"));
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());

output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());

// Validate message didn't get throttled because it was load balance type
Assert.assertEquals(messageSelectOutput.getMessages(resource, partition).size(), 1);
Assert.assertEquals(messageSelectOutput.getMessages(resource, partition).get(0).getSTRebalanceType(),
Message.STRebalanceType.RECOVERY_REBALANCE);
// intermediate output state processed the ST message
Map<String, String> stateMap =
output.getPartitionStateMap(resource).getPartitionMap(partition);
Assert.assertTrue(
stateMap.values().stream().allMatch(state -> state.equals("SLAVE")),
"All hosts should be in SLAVE state");
}

private void preSetup(String[] resources, int numOfLiveInstances, int numOfReplicas) {
setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas,
Expand Down