Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class BulletinDTO {
private String message;
private Date timestamp;
private String sourceType;
private String stackTrace;

/**
* @return id of this message
Expand Down Expand Up @@ -168,4 +169,13 @@ public String getSourceType() {
public void setSourceType(String sourceType) {
this.sourceType = sourceType;
}

@Schema(description = "The stack trace associated with the bulletin, if any.")
public String getStackTrace() {
return stackTrace;
}

public void setStackTrace(String stackTrace) {
this.stackTrace = stackTrace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public void onLogMessage(final LogMessage message) {
// Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
// the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
final String bulletinLevel = (message.getLogLevel() == LogLevel.WARN) ? Severity.WARNING.name() : message.getLogLevel().toString();
bulletinRepository.addBulletin(BulletinFactory.createBulletin(connectable, CATEGORY, bulletinLevel, message.getMessage(), message.getFlowFileUuid()));
bulletinRepository.addBulletin(
BulletinFactory.createBulletin(connectable, CATEGORY, bulletinLevel, message.getMessage(), message.getFlowFileUuid(), message.getThrowable())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void onLogMessage(final LogMessage message) {
final String groupName = pg == null ? null : pg.getName();

final Bulletin bulletin = BulletinFactory.createBulletin(groupId, groupName, serviceNode.getIdentifier(), ComponentType.CONTROLLER_SERVICE,
serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage(), message.getThrowable());
bulletinRepository.addBulletin(bulletin);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void onLogMessage(final LogMessage message) {
final String bulletinLevel = message.getLogLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLogLevel().toString();

final Bulletin bulletin = BulletinFactory.createBulletin(null, clientNode.getIdentifier(), ComponentType.FLOW_REGISTRY_CLIENT,
clientNode.getName(), "Log Message", bulletinLevel, message.getMessage());
clientNode.getName(), "Log Message", bulletinLevel, message.getMessage(), message.getThrowable());
bulletinRepository.addBulletin(bulletin);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public void onLogMessage(final LogMessage message) {
// Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
// the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
final String bulletinLevel = (message.getLogLevel() == LogLevel.WARN) ? Severity.WARNING.name() : message.getLogLevel().toString();
bulletinRepository.addBulletin(BulletinFactory.createBulletin(processorNode, CATEGORY, bulletinLevel, message.getMessage(), message.getFlowFileUuid()));
bulletinRepository.addBulletin(
BulletinFactory.createBulletin(processorNode, CATEGORY, bulletinLevel, message.getMessage(), message.getFlowFileUuid(), message.getThrowable())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void onLogMessage(final LogMessage message) {
final String bulletinLevel = message.getLogLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLogLevel().toString();

final Bulletin bulletin = BulletinFactory.createBulletin(null, taskNode.getIdentifier(), ComponentType.REPORTING_TASK,
taskNode.getName(), "Log Message", bulletinLevel, message.getMessage());
taskNode.getName(), "Log Message", bulletinLevel, message.getMessage(), message.getThrowable());
bulletinRepository.addBulletin(bulletin);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.ComponentType;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.atomic.AtomicLong;

public final class BulletinFactory {
Expand Down Expand Up @@ -50,6 +52,22 @@ public static Bulletin createBulletin(final Connectable connectable, final Strin
return createBulletin(groupId, groupName, connectable.getIdentifier(), type, connectable.getName(), category, severity, message, groupPath, flowFileUUID);
}

public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message, final String flowFileUUID, final Throwable t) {
final Bulletin bulletin = createBulletin(connectable, category, severity, message, flowFileUUID);
if (t != null) {
bulletin.setStackTrace(formatStackTrace(t));
}
return bulletin;
}

public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message, final Throwable t) {
final Bulletin bulletin = createBulletin(connectable, category, severity, message);
if (t != null) {
bulletin.setStackTrace(formatStackTrace(t));
}
return bulletin;
}

private static String buildGroupPath(ProcessGroup group) {
if (group == null) {
return null;
Expand Down Expand Up @@ -78,6 +96,15 @@ public static Bulletin createBulletin(final String groupId, final String sourceI
return bulletin;
}

public static Bulletin createBulletin(final String groupId, final String sourceId, final ComponentType sourceType, final String sourceName,
final String category, final String severity, final String message, final Throwable t) {
final Bulletin bulletin = createBulletin(groupId, sourceId, sourceType, sourceName, category, severity, message);
if (t != null) {
bulletin.setStackTrace(formatStackTrace(t));
}
return bulletin;
}

public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType,
final String sourceName, final String category, final String severity, final String message) {
final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement());
Expand All @@ -92,6 +119,15 @@ public static Bulletin createBulletin(final String groupId, final String groupNa
return bulletin;
}

public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType,
final String sourceName, final String category, final String severity, final String message, final Throwable t) {
final Bulletin bulletin = createBulletin(groupId, groupName, sourceId, sourceType, sourceName, category, severity, message);
if (t != null) {
bulletin.setStackTrace(formatStackTrace(t));
}
return bulletin;
}

public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType,
final String sourceName, final String category, final String severity, final String message, final String groupPath, final String flowFileUUID) {
final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement());
Expand All @@ -108,6 +144,15 @@ public static Bulletin createBulletin(final String groupId, final String groupNa
return bulletin;
}

public static Bulletin createBulletin(final String groupId, final String groupName, final String sourceId, final ComponentType sourceType,
final String sourceName, final String category, final String severity, final String message, final String groupPath, final String flowFileUUID, final Throwable t) {
final Bulletin bulletin = createBulletin(groupId, groupName, sourceId, sourceType, sourceName, category, severity, message, groupPath, flowFileUUID);
if (t != null) {
bulletin.setStackTrace(formatStackTrace(t));
}
return bulletin;
}

public static Bulletin createBulletin(final String category, final String severity, final String message) {
final Bulletin bulletin = new SystemBulletin(currentId.getAndIncrement());
bulletin.setCategory(category);
Expand All @@ -117,6 +162,14 @@ public static Bulletin createBulletin(final String category, final String severi
return bulletin;
}

public static Bulletin createBulletin(final String category, final String severity, final String message, final Throwable t) {
final Bulletin bulletin = createBulletin(category, severity, message);
if (t != null) {
bulletin.setStackTrace(formatStackTrace(t));
}
return bulletin;
}

private static ComponentType getComponentType(final Connectable connectable) {
return switch (connectable.getConnectableType()) {
case REMOTE_INPUT_PORT, REMOTE_OUTPUT_PORT -> ComponentType.REMOTE_PROCESS_GROUP;
Expand All @@ -126,4 +179,15 @@ private static ComponentType getComponentType(final Connectable connectable) {
default -> ComponentType.PROCESSOR;
};
}

private static String formatStackTrace(final Throwable t) {
try (final StringWriter sw = new StringWriter(); final PrintWriter pw = new PrintWriter(sw)) {
t.printStackTrace(pw);
pw.flush();
return sw.toString();
} catch (final Exception e) {
// Fallback to Throwable#toString if printing fails for any reason
return t.toString();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.events;

import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.ComponentType;
import org.junit.jupiter.api.Test;

import java.io.PrintWriter;
import java.io.StringWriter;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class BulletinFactoryStackTraceTest {

private static String toStackTrace(final Throwable t) {
final StringWriter sw = new StringWriter();
try (PrintWriter pw = new PrintWriter(sw)) {
t.printStackTrace(pw);
}
return sw.toString();
}

@Test
void testCreateBulletinWithThrowableIncludesPrintableStackTrace() {
final Exception cause = new IllegalStateException("inner");
final RuntimeException ex = new RuntimeException("outer", cause);

final Bulletin bulletin = BulletinFactory.createBulletin(
"pg1", "Process Group 1", "proc1", ComponentType.PROCESSOR, "MyProcessor",
"Log Message", "ERROR", "Something failed", "/root / Process Group 1", null, ex);

assertNotNull(bulletin);
final String stackTrace = bulletin.getStackTrace();
assertNotNull(stackTrace, "Stack trace should be set on bulletin");

final String expected = toStackTrace(ex);
assertEquals(expected, stackTrace, "Stack trace should match Throwable.printStackTrace output exactly");
assertTrue(stackTrace.contains(cause.getClass().getSimpleName()));
assertTrue(stackTrace.contains(ex.getClass().getSimpleName()));
assertTrue(stackTrace.contains("\n"), "Stack trace should contain newlines for multiline formatting");
assertTrue(stackTrace.contains("\tat ") || stackTrace.contains("\n\tat "), "Stack trace should include frame lines");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class AdaptedBulletin {
private String level;
private String category;
private String message;
private String stackTrace;

private String groupId;
private String groupName;
Expand Down Expand Up @@ -85,6 +86,14 @@ public void setMessage(String message) {
this.message = message;
}

public String getStackTrace() {
return stackTrace;
}

public void setStackTrace(String stackTrace) {
this.stackTrace = stackTrace;
}

public String getSourceId() {
return sourceId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ public Bulletin unmarshal(final AdaptedBulletin b) throws Exception {
return null;
}
// TODO - timestamp is overridden here with a new timestamp... address?
final Bulletin bulletin;
if (b.getSourceId() == null) {
return BulletinFactory.createBulletin(b.getCategory(), b.getLevel(), b.getMessage());
bulletin = BulletinFactory.createBulletin(b.getCategory(), b.getLevel(), b.getMessage());
} else {
return BulletinFactory.createBulletin(b.getGroupId(), b.getGroupName(), b.getSourceId(), b.getSourceType(),
bulletin = BulletinFactory.createBulletin(b.getGroupId(), b.getGroupName(), b.getSourceId(), b.getSourceType(),
b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage());
}
bulletin.setStackTrace(b.getStackTrace());
return bulletin;
}

@Override
Expand All @@ -55,6 +58,7 @@ public AdaptedBulletin marshal(final Bulletin b) throws Exception {
aBulletin.setCategory(b.getCategory());
aBulletin.setLevel(b.getLevel());
aBulletin.setMessage(b.getMessage());
aBulletin.setStackTrace(b.getStackTrace());
return aBulletin;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void onLogMessage(final LogMessage message) {
final String bulletinLevel = message.getLogLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLogLevel().toString();

final Bulletin bulletin = BulletinFactory.createBulletin(null, flowAnalysisRuleNode.getIdentifier(), ComponentType.FLOW_ANALYSIS_RULE,
flowAnalysisRuleNode.getName(), "Log Message", bulletinLevel, message.getMessage());
flowAnalysisRuleNode.getName(), "Log Message", bulletinLevel, message.getMessage(), message.getThrowable());
bulletinRepository.addBulletin(bulletin);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void onLogMessage(final LogMessage message) {
final String bulletinLevel = message.getLogLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLogLevel().toString();

final Bulletin bulletin = BulletinFactory.createBulletin(null, parameterProviderNode.getIdentifier(), ComponentType.PARAMETER_PROVIDER,
parameterProviderNode.getName(), "Log Message", bulletinLevel, message.getMessage());
parameterProviderNode.getName(), "Log Message", bulletinLevel, message.getMessage(), message.getThrowable());
bulletinRepository.addBulletin(bulletin);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jaxb;

import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.ComponentType;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

class BulletinAdapterStackTraceTest {

@Test
void testMarshalUnmarshalCarriesStackTrace() throws Exception {
final Throwable t = new NullPointerException("npe");
final Bulletin original = BulletinFactory.createBulletin(
"g", "G", "id", ComponentType.PROCESSOR, "Name",
"Category", "ERROR", "msg", "/G", null, t);

final BulletinAdapter adapter = new BulletinAdapter();
final AdaptedBulletin adapted = adapter.marshal(original);
assertNotNull(adapted);
assertEquals(original.getStackTrace(), adapted.getStackTrace(), "AdaptedBulletin must copy stackTrace");

final Bulletin roundTrip = adapter.unmarshal(adapted);
assertNotNull(roundTrip);
assertEquals(original.getStackTrace(), roundTrip.getStackTrace(), "Unmarshalled Bulletin must preserve stackTrace");
}
}

Loading
Loading