Skip to content

Commit b6d9fc7

Browse files
committed
NIFI-15180 Log initialization errors in ConsumeKinesis
1 parent 40b8e75 commit b6d9fc7

File tree

2 files changed

+108
-1
lines changed

2 files changed

+108
-1
lines changed

nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.nifi.components.PropertyDescriptor;
3232
import org.apache.nifi.controller.NodeTypeProvider;
3333
import org.apache.nifi.flowfile.FlowFile;
34+
import org.apache.nifi.logging.ComponentLog;
3435
import org.apache.nifi.processor.AbstractProcessor;
3536
import org.apache.nifi.processor.DataUnit;
3637
import org.apache.nifi.processor.ProcessContext;
@@ -66,6 +67,7 @@
6667
import software.amazon.kinesis.common.InitialPositionInStream;
6768
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
6869
import software.amazon.kinesis.coordinator.Scheduler;
70+
import software.amazon.kinesis.coordinator.WorkerStateChangeListener;
6971
import software.amazon.kinesis.lifecycle.events.InitializationInput;
7072
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
7173
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@@ -320,6 +322,7 @@ Ensure that the credentials provided have access to Kinesis, DynamoDB and (optio
320322
private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
321323

322324
private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
325+
private volatile SchedulerChangeListener errorListener;
323326

324327
// An instance filed, so that it can be read in getRelationships.
325328
private volatile ProcessingStrategy processingStrategy = ProcessingStrategy.from(
@@ -387,6 +390,8 @@ public void setup(final ProcessContext context) {
387390
recordBuffer = memoryBoundRecordBuffer;
388391
final ShardRecordProcessorFactory recordProcessorFactory = () -> new ConsumeKinesisRecordProcessor(memoryBoundRecordBuffer);
389392

393+
errorListener = new SchedulerChangeListener(getLogger());
394+
390395
final String applicationName = context.getProperty(APPLICATION_NAME).getValue();
391396
final String workerId = generateWorkerId();
392397
final ConfigsBuilder configsBuilder = new ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient, cloudWatchClient, workerId, recordProcessorFactory);
@@ -395,7 +400,7 @@ public void setup(final ProcessContext context) {
395400

396401
kinesisScheduler = new Scheduler(
397402
configsBuilder.checkpointConfig(),
398-
configsBuilder.coordinatorConfig(),
403+
configsBuilder.coordinatorConfig().workerStateChangeListener(errorListener),
399404
configsBuilder.leaseManagementConfig(),
400405
configsBuilder.lifecycleConfig(),
401406
configsBuilder.metricsConfig().metricsFactory(metricsFactory),
@@ -528,6 +533,7 @@ public void onStopped() {
528533
}
529534

530535
recordBuffer = null;
536+
errorListener = null;
531537
readerRecordProcessor = null;
532538
}
533539

@@ -560,6 +566,12 @@ private void shutdownScheduler() {
560566

561567
@Override
562568
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
569+
if (errorListener.holdsInitializationFailure()) {
570+
getLogger().error("Failed to initialize the processor. Correct the error and restart the processor.", errorListener.failure());
571+
context.yield();
572+
return;
573+
}
574+
563575
final Optional<Lease> leaseAcquired = recordBuffer.acquireBufferLease();
564576

565577
leaseAcquired.ifPresentOrElse(
@@ -684,6 +696,35 @@ public void shutdownRequested(final ShutdownRequestedInput shutdownRequestedInpu
684696
}
685697
}
686698

699+
private static final class SchedulerChangeListener implements WorkerStateChangeListener {
700+
701+
private final ComponentLog logger;
702+
703+
private volatile @Nullable Throwable initializationFailure;
704+
705+
SchedulerChangeListener(final ComponentLog logger) {
706+
this.logger = logger;
707+
}
708+
709+
@Override
710+
public void onWorkerStateChange(final WorkerState newState) {
711+
logger.debug("Kinesis Scheduler changed state to: {}", newState);
712+
}
713+
714+
@Override
715+
public void onAllInitializationAttemptsFailed(final Throwable e) {
716+
initializationFailure = e;
717+
}
718+
719+
boolean holdsInitializationFailure() {
720+
return initializationFailure != null;
721+
}
722+
723+
@Nullable Throwable failure() {
724+
return initializationFailure;
725+
}
726+
}
727+
687728
enum ProcessingStrategy implements DescribedValue {
688729
FLOW_FILE("Write one FlowFile for each consumed Kinesis Record"),
689730
RECORD("Write one FlowFile containing multiple consumed Kinesis Records processed with Record Reader and Record Writer");

nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,28 @@
1717
package org.apache.nifi.processors.aws.kinesis;
1818

1919
import org.apache.nifi.processor.Relationship;
20+
import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
21+
import org.apache.nifi.processors.aws.region.RegionUtil;
22+
import org.apache.nifi.reporting.InitializationException;
23+
import org.apache.nifi.util.LogMessage;
2024
import org.apache.nifi.util.TestRunner;
2125
import org.apache.nifi.util.TestRunners;
2226
import org.junit.jupiter.api.BeforeEach;
2327
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.api.Timeout;
2429

30+
import java.util.List;
2531
import java.util.Set;
2632

33+
import static java.util.concurrent.TimeUnit.MINUTES;
2734
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.PROCESSING_STRATEGY;
2835
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.FLOW_FILE;
2936
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.ProcessingStrategy.RECORD;
3037
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_PARSE_FAILURE;
3138
import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesis.REL_SUCCESS;
3239
import static org.junit.jupiter.api.Assertions.assertEquals;
40+
import static org.junit.jupiter.api.Assertions.assertTrue;
41+
import static org.junit.jupiter.api.Timeout.ThreadMode.SEPARATE_THREAD;
3342

3443
class ConsumeKinesisTest {
3544

@@ -57,4 +66,61 @@ void getRelationshipsForRecordProcessingStrategy() {
5766

5867
assertEquals(Set.of(REL_SUCCESS, REL_PARSE_FAILURE), relationships);
5968
}
69+
70+
@Test
71+
@Timeout(value = 2, unit = MINUTES, threadMode = SEPARATE_THREAD)
72+
void logInitializationErrors() {
73+
// With dummy values KCL Scheduler initialization will fail.
74+
setDummyValues(testRunner);
75+
76+
awaitErrorMessages(testRunner);
77+
78+
final List<LogMessage> errorMessages = testRunner.getLogger().getErrorMessages();
79+
80+
final boolean hasInitializationError = errorMessages.stream()
81+
.map(LogMessage::getMsg)
82+
.anyMatch(msg -> msg.contains("Failed to initialize the processor. Correct the error and restart the processor."));
83+
84+
assertTrue(hasInitializationError, "Logs should contain initialization error message");
85+
}
86+
87+
private static void awaitErrorMessages(final TestRunner runner) {
88+
runner.run(1, false, true);
89+
90+
while (runner.getLogger().getErrorMessages().isEmpty()) {
91+
runner.run(1, false, false);
92+
93+
try {
94+
Thread.sleep(100);
95+
} catch (final InterruptedException e) {
96+
Thread.currentThread().interrupt();
97+
throw new RuntimeException("Interrupted while waiting for error logs", e);
98+
}
99+
}
100+
101+
runner.run(1, true, false);
102+
}
103+
104+
private static void setDummyValues(final TestRunner runner) {
105+
final AWSCredentialsProviderControllerService credentialsService = new AWSCredentialsProviderControllerService();
106+
try {
107+
runner.addControllerService("credentials", credentialsService);
108+
} catch (final InitializationException e) {
109+
throw new RuntimeException(e);
110+
}
111+
runner.setProperty(credentialsService, AWSCredentialsProviderControllerService.ACCESS_KEY_ID, "123");
112+
runner.setProperty(credentialsService, AWSCredentialsProviderControllerService.SECRET_KEY, "123");
113+
runner.enableControllerService(credentialsService);
114+
115+
runner.setProperty(ConsumeKinesis.AWS_CREDENTIALS_PROVIDER_SERVICE, "credentials");
116+
runner.setProperty(ConsumeKinesis.STREAM_NAME, "stream");
117+
runner.setProperty(ConsumeKinesis.APPLICATION_NAME, "application");
118+
runner.setProperty(RegionUtil.REGION, "us-west-2");
119+
runner.setProperty(ConsumeKinesis.INITIAL_STREAM_POSITION, ConsumeKinesis.InitialPosition.TRIM_HORIZON);
120+
runner.setProperty(ConsumeKinesis.PROCESSING_STRATEGY, ConsumeKinesis.ProcessingStrategy.FLOW_FILE);
121+
122+
runner.setProperty(ConsumeKinesis.METRICS_PUBLISHING, ConsumeKinesis.MetricsPublishing.CLOUDWATCH);
123+
124+
runner.setProperty(ConsumeKinesis.MAX_BYTES_TO_BUFFER, "10 MB");
125+
}
60126
}

0 commit comments

Comments
 (0)