Skip to content

Conversation

@Shiyang-Zhao
Copy link
Contributor

This PR fixes nondeterministic behavior in the following flaky tests:

  • org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithSchemaDiscovery
  • org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery
  • org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTest.testKillBadPartitionAssignment
  • org.apache.druid.indexing.overlord.TaskQueueTest.testGetActiveTaskRedactsPassword

Description

The KafkaInputFormatTest.testWithSchemaDiscovery and
KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery tests failed intermittently due to nondeterministic ordering of discovered schema dimensions.

These tests check that Kafka input parsing discovers all schema fields. However, field names were collected from unordered structures, causing inconsistent dimension order and intermittent assertion failures.

Failure messages:

[ERROR] org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithSchemaDiscovery -- Time elapsed: 0.609 s <<< FAILURE!
java.lang.AssertionError: expected:<[kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, bar, kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[jq_omg2, kafka.newheader.kafkapkc, foo, kafka.newts.timestamp, kafka.newkey.key, kafka.newheader.encoding, path_omg2, path_omg, root_baz, root_baz2, kafka.newtopic.topic, bar, o, jq_omg, baz]>
	at org.junit.Assert.fail(Assert.java:89)
	at org.junit.Assert.failNotEquals(Assert.java:835)
	at org.junit.Assert.assertEquals(Assert.java:120)
	at org.junit.Assert.assertEquals(Assert.java:146)
	at org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithSchemaDiscovery(KafkaInputFormatTest.java:645)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)

[ERROR] Failures: 
[ERROR] org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithSchemaDiscovery
[ERROR]   Run 1: KafkaInputFormatTest.testWithSchemaDiscovery:645 expected:<[kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, bar, kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[jq_omg2, kafka.newheader.kafkapkc, foo, kafka.newts.timestamp, kafka.newkey.key, kafka.newheader.encoding, path_omg2, path_omg, root_baz, root_baz2, kafka.newtopic.topic, bar, o, jq_omg, baz]>
[ERROR]   Run 2: KafkaInputFormatTest.testWithSchemaDiscovery:645 expected:<[kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, bar, kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[baz, kafka.newts.timestamp, jq_omg, bar, o, root_baz2, path_omg2, kafka.newtopic.topic, kafka.newheader.kafkapkc, root_baz, path_omg, foo, jq_omg2, kafka.newkey.key, kafka.newheader.encoding]>
[ERROR]   Run 3: KafkaInputFormatTest.testWithSchemaDiscovery:645 expected:<[kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, bar, kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[o, kafka.newheader.encoding, path_omg, bar, jq_omg, root_baz, root_baz2, kafka.newheader.kafkapkc, foo, jq_omg2, kafka.newkey.key, path_omg2, baz, kafka.newts.timestamp, kafka.newtopic.topic]>
[ERROR]   Run 4: KafkaInputFormatTest.testWithSchemaDiscovery:645 expected:<[kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, bar, kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[foo, jq_omg, root_baz, jq_omg2, kafka.newkey.key, kafka.newts.timestamp, bar, baz, kafka.newtopic.topic, path_omg, kafka.newheader.encoding, o, kafka.newheader.kafkapkc, path_omg2, root_baz2]>
[ERROR] org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery -- Time elapsed: 0.622 s <<< FAILURE!
java.lang.AssertionError: expected:<[bar, kafka.newheader.kafkapkc, kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[bar, kafka.newheader.kafkapkc, o, kafka.newkey.key, path_omg, kafka.newheader.encoding, kafka.newtopic.topic, baz, path_omg2, jq_omg2, kafka.newts.timestamp, root_baz2, root_baz, jq_omg, foo]>
	at org.junit.Assert.fail(Assert.java:89)
	at org.junit.Assert.failNotEquals(Assert.java:835)
	at org.junit.Assert.assertEquals(Assert.java:120)
	at org.junit.Assert.assertEquals(Assert.java:146)
	at org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery(KafkaInputFormatTest.java:908)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)

[ERROR] Failures: 
[ERROR] org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery
[ERROR]   Run 1: KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery:908 expected:<[bar, kafka.newheader.kafkapkc, kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[bar, kafka.newheader.kafkapkc, o, kafka.newkey.key, path_omg, kafka.newheader.encoding, kafka.newtopic.topic, baz, path_omg2, jq_omg2, kafka.newts.timestamp, root_baz2, root_baz, jq_omg, foo]>
[ERROR]   Run 2: KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery:908 expected:<[bar, kafka.newheader.kafkapkc, kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[bar, kafka.newheader.kafkapkc, o, foo, root_baz, kafka.newts.timestamp, baz, jq_omg2, kafka.newtopic.topic, path_omg2, path_omg, jq_omg, kafka.newheader.encoding, kafka.newkey.key, root_baz2]>
[ERROR]   Run 3: KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery:908 expected:<[bar, kafka.newheader.kafkapkc, kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[bar, kafka.newheader.kafkapkc, kafka.newkey.key, path_omg, kafka.newheader.encoding, root_baz2, jq_omg2, baz, root_baz, kafka.newtopic.topic, path_omg2, o, foo, kafka.newts.timestamp, jq_omg]>
[ERROR]   Run 4: KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery:908 expected:<[bar, kafka.newheader.kafkapkc, kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, o, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[bar, kafka.newheader.kafkapkc, baz, jq_omg2, kafka.newheader.encoding, path_omg2, jq_omg, root_baz, kafka.newkey.key, foo, kafka.newts.timestamp, path_omg, root_baz2, kafka.newtopic.topic, o]>

Proposed Changes:

  • Ensured schema validation compares contents, not ordering.

The KinesisSupervisorTest.testKillBadPartitionAssignment failed intermittently due to nondeterministic ordering of task entries.

The test verifies that the Kinesis supervisor correctly identifies and terminates invalid task assignments. Since task metadata was stored in a HashMap, iteration order varied between runs, causing inconsistent comparisons of task IDs and flaky assertions.

Failure messages:

[ERROR] org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTest.testKillBadPartitionAssignment -- Time elapsed: 0.777 s <<< FAILURE!
java.lang.AssertionError: 

  Unexpected method call TaskQueue.shutdown("id2", "Task[%s] failed to return status, killing task", "id2"):
    TaskQueue.shutdown("id4", "Task[%s] failed to return status, killing task", "id4"): expected: 1, actual: 0
	at org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46)
	at org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:100)
	at org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.interceptSuperCallable(ClassProxyFactory.java:100)
	at org.apache.druid.indexing.overlord.TaskQueue$$$EasyMock$2.shutdown(Unknown Source)
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.killTask(SeekableStreamSupervisor.java:2021)
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.discoverTasks(SeekableStreamSupervisor.java:2282)
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.runInternal(SeekableStreamSupervisor.java:1728)
	at org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTest.testKillBadPartitionAssignment(KinesisSupervisorTest.java:1107)

[WARNING] Flakes: 
[WARNING] org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTest.testKillBadPartitionAssignment
[ERROR]   Run 1: KinesisSupervisorTest.testKillBadPartitionAssignment:1107 
  Unexpected method call TaskQueue.shutdown("id2", "Task[%s] failed to return status, killing task", "id2"):
    TaskQueue.shutdown("id4", "Task[%s] failed to return status, killing task", "id4"): expected: 1, actual: 0
[ERROR]   Run 2: KinesisSupervisorTest.testKillBadPartitionAssignment:1107 
  Unexpected method call TaskQueue.shutdown("id2", "Task[%s] failed to return status, killing task", "id2"):
    TaskQueue.shutdown("id4", "Task[%s] failed to return status, killing task", "id4"): expected: 1, actual: 0
[INFO]   Run 3: PASS

Proposed Changes:

  • Made task comparison stable and deterministic across runs.

The TaskQueueTest.testGetActiveTaskRedactsPassword failed intermittently due to inconsistent JSON key ordering in serialized task payloads.

The test ensures that sensitive information (like passwords) is properly redacted when serializing active task metadata. However, since JSON objects do not preserve field order, direct string comparison caused false failures when the same data appeared with keys in different order.

Failure messages:

[ERROR] org.apache.druid.indexing.overlord.TaskQueueTest.testGetActiveTaskRedactsPassword -- Time elapsed: 4.451 s <<< FAILURE!
org.junit.ComparisonFailure: expected:<...s":[]}},"context":{"[forceTimeChunkLock":true,"useLineageBasedSegmentAllocation]":true},"dataSource"...> but was:<...s":[]}},"context":{"[useLineageBasedSegmentAllocation":true,"forceTimeChunkLock]":true},"dataSource"...>
	at org.junit.Assert.assertEquals(Assert.java:117)
	at org.junit.Assert.assertEquals(Assert.java:146)
	at org.apache.druid.indexing.overlord.TaskQueueTest.testGetActiveTaskRedactsPassword(TaskQueueTest.java:625)

[WARNING] Flakes: 
[WARNING] org.apache.druid.indexing.overlord.TaskQueueTest.testGetActiveTaskRedactsPassword
[ERROR]   Run 1: TaskQueueTest.testGetActiveTaskRedactsPassword:625 expected:<...s":[]}},"context":{"[forceTimeChunkLock":true,"useLineageBasedSegmentAllocation]":true},"dataSource"...> but was:<...s":[]}},"context":{"[useLineageBasedSegmentAllocation":true,"forceTimeChunkLock]":true},"dataSource"...>
[ERROR]   Run 2: TaskQueueTest.testGetActiveTaskRedactsPassword:625 expected:<...s":[]}},"context":{"[forceTimeChunkLock":true,"useLineageBasedSegmentAllocation]":true},"dataSource"...> but was:<...s":[]}},"context":{"[useLineageBasedSegmentAllocation":true,"forceTimeChunkLock]":true},"dataSource"...>
[ERROR]   Run 3: TaskQueueTest.testGetActiveTaskRedactsPassword:625 expected:<...s":[]}},"context":{"[forceTimeChunkLock":true,"useLineageBasedSegmentAllocation]":true},"dataSource"...> but was:<...s":[]}},"context":{"[useLineageBasedSegmentAllocation":true,"forceTimeChunkLock]":true},"dataSource"...>
[INFO]   Run 4: PASS

Proposed Changes:

  • Modified assertions to perform JSON object-level comparison rather than raw string equality.

This PR has:

  • been self-reviewed.
  • ensured no production logic changes beyond test stabilization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant