Skip to content

Commit aec5c4a

Browse files
authored
Fix duplicate actions in auto-scaler history (#18715)
Changes: - Do not clear `pendingCompletionTaskGroups` in `clearAllocationInfo` - Add unit test
1 parent 2a09371 commit aec5c4a

File tree

2 files changed

+224
-2
lines changed

2 files changed

+224
-2
lines changed

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
7272
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
7373
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
74+
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
7475
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
7576
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
7677
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
@@ -122,6 +123,7 @@
122123

123124
import javax.annotation.Nullable;
124125
import java.io.IOException;
126+
import java.lang.reflect.Method;
125127
import java.time.Instant;
126128
import java.time.temporal.ChronoUnit;
127129
import java.util.ArrayList;
@@ -134,6 +136,7 @@
134136
import java.util.Map;
135137
import java.util.Properties;
136138
import java.util.TreeMap;
139+
import java.util.concurrent.CopyOnWriteArrayList;
137140
import java.util.concurrent.Executor;
138141
import java.util.concurrent.ScheduledExecutorService;
139142
import java.util.function.Function;
@@ -5047,6 +5050,201 @@ public void test_doesTaskMatchSupervisor()
50475050
EasyMock.replay(differentTaskType);
50485051
}
50495052

5053+
@Test
5054+
public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublishing() throws Exception
5055+
{
5056+
final TaskLocation location1 = TaskLocation.create("testHost1", 1234, -1);
5057+
final TaskLocation location2 = TaskLocation.create("testHost2", 1235, -1);
5058+
final TaskLocation location3 = TaskLocation.create("testHost3", 1236, -1);
5059+
final DateTime startTime = DateTimes.nowUtc();
5060+
5061+
// Create supervisor with 3 task groups
5062+
supervisor = getTestableSupervisor(1, 3, true, "PT1H", null, null);
5063+
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
5064+
addSomeEvents(100);
5065+
5066+
// Manually create 3 tasks (one for each partition/task group)
5067+
Task task1 = createKafkaIndexTask(
5068+
"task1",
5069+
DATASOURCE,
5070+
0,
5071+
new SeekableStreamStartSequenceNumbers<>(
5072+
topic,
5073+
singlePartitionMap(topic, 0, 0L),
5074+
ImmutableSet.of()
5075+
),
5076+
new SeekableStreamEndSequenceNumbers<>(
5077+
topic,
5078+
singlePartitionMap(topic, 0, Long.MAX_VALUE)
5079+
),
5080+
null,
5081+
null,
5082+
tuningConfig
5083+
);
5084+
5085+
Task task2 = createKafkaIndexTask(
5086+
"task2",
5087+
DATASOURCE,
5088+
1,
5089+
new SeekableStreamStartSequenceNumbers<>(
5090+
topic,
5091+
singlePartitionMap(topic, 1, 0L),
5092+
ImmutableSet.of()
5093+
),
5094+
new SeekableStreamEndSequenceNumbers<>(
5095+
topic,
5096+
singlePartitionMap(topic, 1, Long.MAX_VALUE)
5097+
),
5098+
null,
5099+
null,
5100+
tuningConfig
5101+
);
5102+
5103+
Task task3 = createKafkaIndexTask(
5104+
"task3",
5105+
DATASOURCE,
5106+
2,
5107+
new SeekableStreamStartSequenceNumbers<>(
5108+
topic,
5109+
singlePartitionMap(topic, 2, 0L),
5110+
ImmutableSet.of()
5111+
),
5112+
new SeekableStreamEndSequenceNumbers<>(
5113+
topic,
5114+
singlePartitionMap(topic, 2, Long.MAX_VALUE)
5115+
),
5116+
null,
5117+
null,
5118+
tuningConfig
5119+
);
5120+
5121+
Collection workItems = new ArrayList<>();
5122+
workItems.add(new TestTaskRunnerWorkItem(task1, null, location1));
5123+
workItems.add(new TestTaskRunnerWorkItem(task2, null, location2));
5124+
workItems.add(new TestTaskRunnerWorkItem(task3, null, location3));
5125+
5126+
// Setup mocks for initial discovery of tasks
5127+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
5128+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
5129+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
5130+
EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes();
5131+
EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes();
5132+
EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes();
5133+
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(task1, task2, task3)).anyTimes();
5134+
EasyMock.expect(taskStorage.getStatus(task1.getId())).andReturn(Optional.of(TaskStatus.running(task1.getId()))).anyTimes();
5135+
EasyMock.expect(taskStorage.getStatus(task2.getId())).andReturn(Optional.of(TaskStatus.running(task2.getId()))).anyTimes();
5136+
EasyMock.expect(taskStorage.getStatus(task3.getId())).andReturn(Optional.of(TaskStatus.running(task3.getId()))).anyTimes();
5137+
EasyMock.expect(taskStorage.getTask(task1.getId())).andReturn(Optional.of(task1)).anyTimes();
5138+
EasyMock.expect(taskStorage.getTask(task2.getId())).andReturn(Optional.of(task2)).anyTimes();
5139+
EasyMock.expect(taskStorage.getTask(task3.getId())).andReturn(Optional.of(task3)).anyTimes();
5140+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)).anyTimes();
5141+
EasyMock.expect(taskClient.getStatusAsync(task1.getId())).andReturn(Futures.immediateFuture(Status.READING));
5142+
EasyMock.expect(taskClient.getStatusAsync(task2.getId())).andReturn(Futures.immediateFuture(Status.READING));
5143+
EasyMock.expect(taskClient.getStatusAsync(task3.getId())).andReturn(Futures.immediateFuture(Status.READING));
5144+
EasyMock.expect(taskClient.getStartTimeAsync(task1.getId())).andReturn(Futures.immediateFuture(startTime));
5145+
EasyMock.expect(taskClient.getStartTimeAsync(task2.getId())).andReturn(Futures.immediateFuture(startTime));
5146+
EasyMock.expect(taskClient.getStartTimeAsync(task3.getId())).andReturn(Futures.immediateFuture(startTime));
5147+
5148+
TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints1 = new TreeMap<>();
5149+
checkpoints1.put(0, singlePartitionMap(topic, 0, 0L));
5150+
TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints2 = new TreeMap<>();
5151+
checkpoints2.put(0, singlePartitionMap(topic, 1, 0L));
5152+
TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints3 = new TreeMap<>();
5153+
checkpoints3.put(0, singlePartitionMap(topic, 2, 0L));
5154+
5155+
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task1.getId()), EasyMock.anyBoolean()))
5156+
.andReturn(Futures.immediateFuture(checkpoints1))
5157+
.times(1);
5158+
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task2.getId()), EasyMock.anyBoolean()))
5159+
.andReturn(Futures.immediateFuture(checkpoints2))
5160+
.times(1);
5161+
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task3.getId()), EasyMock.anyBoolean()))
5162+
.andReturn(Futures.immediateFuture(checkpoints3))
5163+
.times(1);
5164+
5165+
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
5166+
replayAll();
5167+
5168+
// Start supervisor and discover the 3 existing tasks
5169+
supervisor.start();
5170+
supervisor.runInternal();
5171+
verifyAll();
5172+
5173+
// Verify we have 3 actively reading task groups after discovery
5174+
Assert.assertEquals(
5175+
"Should have 3 actively reading task groups after discovery",
5176+
3,
5177+
supervisor.getActivelyReadingTaskGroupsCount()
5178+
);
5179+
5180+
// Reset and setup mocks for gracefulShutdownInternal
5181+
EasyMock.reset(taskRunner, taskClient, taskQueue);
5182+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
5183+
EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes();
5184+
EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes();
5185+
EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes();
5186+
EasyMock.expect(taskClient.pauseAsync(task1.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 0, 100L)));
5187+
EasyMock.expect(taskClient.pauseAsync(task2.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 1, 100L)));
5188+
EasyMock.expect(taskClient.pauseAsync(task3.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 2, 100L)));
5189+
EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task1.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
5190+
EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task2.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
5191+
EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task3.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
5192+
5193+
EasyMock.replay(taskRunner, taskClient, taskQueue);
5194+
5195+
// Simulate autoscaler scale-down by calling gracefulShutdownInternal()
5196+
// This should move tasks from activelyReadingTaskGroups to pendingCompletionTaskGroups
5197+
supervisor.gracefulShutdownInternal();
5198+
5199+
verifyAll();
5200+
5201+
// After gracefulShutdownInternal, tasks should be moved to pendingCompletionTaskGroups
5202+
Assert.assertEquals(
5203+
"activelyReadingTaskGroups should be empty after gracefulShutdownInternal",
5204+
0,
5205+
supervisor.getActivelyReadingTaskGroupsCount()
5206+
);
5207+
5208+
// Verify pendingCompletionTaskGroups is NOT empty (tasks were moved there)
5209+
boolean hasPendingTasks = false;
5210+
for (int groupId = 0; groupId < 3; groupId++) {
5211+
if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
5212+
hasPendingTasks = true;
5213+
break;
5214+
}
5215+
}
5216+
Assert.assertTrue(
5217+
"pendingCompletionTaskGroups should contain task groups after gracefulShutdownInternal",
5218+
hasPendingTasks
5219+
);
5220+
5221+
// Now call clearAllocationInfo() - this is where the bug was
5222+
// The bug was that this method cleared pendingCompletionTaskGroups
5223+
supervisor.testClearAllocationInfo();
5224+
5225+
// THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT empty after clearAllocationInfo
5226+
// This is the fix - clearAllocationInfo should preserve pendingCompletionTaskGroups
5227+
boolean stillHasPendingTasks = false;
5228+
for (int groupId = 0; groupId < 3; groupId++) {
5229+
if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
5230+
stillHasPendingTasks = true;
5231+
break;
5232+
}
5233+
}
5234+
Assert.assertTrue(
5235+
"pendingCompletionTaskGroups should be preserved after clearAllocationInfo() " +
5236+
"to prevent autoscaler from creating duplicate history entries",
5237+
stillHasPendingTasks
5238+
);
5239+
5240+
// Verify activelyReadingTaskGroups is still empty
5241+
Assert.assertEquals(
5242+
"activelyReadingTaskGroups should remain empty after clearAllocationInfo",
5243+
0,
5244+
supervisor.getActivelyReadingTaskGroupsCount()
5245+
);
5246+
}
5247+
50505248
private void addSomeEvents(int numEventsPerPartition) throws Exception
50515249
{
50525250
// create topic manually
@@ -5753,6 +5951,24 @@ private SeekableStreamSupervisorStateManager getStateManager()
57535951
{
57545952
return stateManager;
57555953
}
5954+
5955+
public int getActivelyReadingTaskGroupsCount()
5956+
{
5957+
return getActiveTaskGroupsCount();
5958+
}
5959+
5960+
public int getPendingCompletionTaskGroupsCount(int groupId)
5961+
{
5962+
CopyOnWriteArrayList<?> groups = getPendingCompletionTaskGroups(groupId);
5963+
return groups != null ? groups.size() : 0;
5964+
}
5965+
5966+
public void testClearAllocationInfo() throws Exception
5967+
{
5968+
Method method = SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo");
5969+
method.setAccessible(true);
5970+
method.invoke(this);
5971+
}
57565972
}
57575973

57585974
private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -608,13 +608,19 @@ private void changeTaskCountInIOConfig(int desiredActiveTaskCount)
608608
}
609609
}
610610

611+
/**
612+
* Clears allocation information including active task groups, partition groups, partition offsets, and partition IDs.
613+
* <p>
614+
* Note: Does not clear {@link #pendingCompletionTaskGroups} so that the supervisor remembers that these
615+
* tasks are publishing and auto-scaler does not repeatedly attempt a scale down until these tasks
616+
* complete. If this is cleared, the next {@link #discoverTasks()} might add these tasks to
617+
* {@link #activelyReadingTaskGroups}.
618+
*/
611619
private void clearAllocationInfo()
612620
{
613621
activelyReadingTaskGroups.clear();
614622
partitionGroups.clear();
615623
partitionOffsets.clear();
616-
617-
pendingCompletionTaskGroups.clear();
618624
partitionIds.clear();
619625
}
620626

0 commit comments

Comments
 (0)