diff --git a/bin/parser.java b/bin/parser.java new file mode 100755 index 000000000..1ae562e94 --- /dev/null +++ b/bin/parser.java @@ -0,0 +1,240 @@ +///usr/bin/env jbang "$0" "$@" ; exit $? +//DEPS info.picocli:picocli:4.6.3 + + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.stream.Collectors; + +import jdk.jfr.consumer.RecordedEvent; +import jdk.jfr.consumer.RecordedFrame; +import jdk.jfr.consumer.RecordedMethod; +import jdk.jfr.consumer.RecordedStackTrace; +import jdk.jfr.consumer.RecordingFile; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Parameters; + +/** + * A helper class to parse Java Flight Recorder (JFR) files + * and extract insights from custom Infinispan ProtoStream events. + * + * It specifically looks for 'org.infinispan.protostream.ResizeEvent' + * and 'org.infinispan.protostream.AllocateEvent' to generate a + * summary about buffer allocation and resizing patterns. + */ +@Command(name = "parser", mixinStandardHelpOptions = true, version = "parser 0.1", + description = "parser made with jbang") +class parser implements Callable { + + @Parameters(index = "0", description = "Path to JFR file") + private File file; + + public static void main(String... args) { + int exitCode = new CommandLine(new parser()).execute(args); + System.exit(exitCode); + } + + @Override + public Integer call() throws Exception { + Path p = file.toPath(); + try (PrintWriter writer = new PrintWriter(System.out)) { + parseAndSummarize(p, writer); + } + return 0; + } + + private static final String RESIZE_EVENT_NAME = "org.infinispan.protostream.ResizeEvent"; + private static final String ALLOCATE_EVENT_NAME = "org.infinispan.protostream.AllocateEvent"; + + /** + * Parses a JFR file to analyze ProtoStream buffer events and prints a summary. + * + * @param jfrFilePath The path to the JFR file. + * @param writer The PrintWriter to which the summary report will be written. + * @throws IOException If an error occurs while reading the JFR file. + */ + public void parseAndSummarize(Path jfrFilePath, PrintWriter writer) throws IOException { + if (jfrFilePath == null || writer == null) { + throw new IllegalArgumentException("JFR file path and PrintWriter cannot be null."); + } + + // --- Data Collectors --- + // Metrics for Resize Events + long resizeEventCount = 0; + long totalBytesResized = 0; + int maxResizeFrom = 0; + int maxResizeTo = 0; + Map resizeHotspots = new HashMap<>(); + + // Metrics for Allocate Events + long allocateEventCount = 0; + long totalBytesAllocated = 0; + int maxAllocationSize = 0; + Map allocationHotspots = new HashMap<>(); + + try (RecordingFile recordingFile = new RecordingFile(jfrFilePath)) { + while (recordingFile.hasMoreEvents()) { + RecordedEvent event = recordingFile.readEvent(); + + switch (event.getEventType().getName()) { + case RESIZE_EVENT_NAME: + resizeEventCount++; + int fromSize = event.getValue("before"); + int toSize = event.getValue("after"); + totalBytesResized += (long) toSize - fromSize; + + if (toSize > maxResizeTo) { + maxResizeTo = toSize; + maxResizeFrom = fromSize; + } + + // Use the first frame of the stack trace as the hotspot identifier + Optional.ofNullable(event.getStackTrace()).ifPresent(stackTrace -> { + if (!stackTrace.getFrames().isEmpty()) { + String topFrame = stackTrace.getFrames().get(0).toString(); + resizeHotspots.computeIfAbsent(topFrame, k -> new HotSpot(stackTrace)).inc(); + } + }); + break; + + case ALLOCATE_EVENT_NAME: + allocateEventCount++; + int newSize = event.getValue("size"); + totalBytesAllocated += newSize; + + if (newSize > maxAllocationSize) { + maxAllocationSize = newSize; + } + + Optional.ofNullable(event.getStackTrace()).ifPresent(stackTrace -> { + if (!stackTrace.getFrames().isEmpty()) { + String topFrame = stackTrace.getFrames().get(0).toString(); + allocationHotspots.computeIfAbsent(topFrame, k -> new HotSpot(stackTrace)).inc(); + } + }); + break; + } + } + } + + // --- Generate Summary Report --- + generateReport( + writer, + resizeEventCount, totalBytesResized, maxResizeFrom, maxResizeTo, resizeHotspots, + allocateEventCount, totalBytesAllocated, maxAllocationSize, allocationHotspots + ); + } + + private void generateReport(PrintWriter writer, + long resizeEventCount, long totalBytesResized, int maxResizeFrom, int maxResizeTo, Map resizeHotspots, + long allocateEventCount, long totalBytesAllocated, int maxAllocationSize, Map allocationHotspots) { + + writer.println("========================================================="); + writer.println(" Infinispan ProtoStream Buffer Events JFR Summary "); + writer.println("========================================================="); + writer.println(); + + // --- Resize Events Section --- + writer.println("--- Buffer Resize Events (" + RESIZE_EVENT_NAME + ") ---"); + if (resizeEventCount > 0) { + writer.printf("Total Resize Events: %,d%n", resizeEventCount); + writer.printf("Total Bytes Added by Resizing: %,d bytes%n", totalBytesResized); + writer.printf("Average Resize Increase: %,.2f bytes%n", (double) totalBytesResized / resizeEventCount); + writer.printf("Largest Single Resize: from %,d to %,d bytes (an increase of %,d bytes)%n", maxResizeFrom, maxResizeTo, maxResizeTo - maxResizeFrom); + writer.println(); + writer.println("Top 5 Most Common Resize Locations (Stack Trace):"); + printTopHotspots(writer, resizeHotspots, 5); + } else { + writer.println("No resize events found in this recording."); + } + writer.println(); + + // --- Allocate Events Section --- + writer.println("--- Buffer Allocate Events (" + ALLOCATE_EVENT_NAME + ") ---"); + if (allocateEventCount > 0) { + writer.printf("Total Allocation Events: %,d%n", allocateEventCount); + writer.printf("Total Bytes Allocated: %,d bytes%n", totalBytesAllocated); + writer.printf("Average Allocation Size: %,.2f bytes%n", (double) totalBytesAllocated / allocateEventCount); + writer.printf("Largest Single Allocation: %,d bytes%n", maxAllocationSize); + writer.println(); + writer.println("Top 5 Most Common Allocation Locations (Stack Trace):"); + printTopHotspots(writer, allocationHotspots, 5); + } else { + writer.println("No allocation events found in this recording."); + } + writer.println(); + writer.println("========================================================="); + writer.flush(); + } + + /** + * Helper to sort and print the most frequent call sites. + */ + private void printTopHotspots(PrintWriter writer, Map hotspots, int limit) { + if (hotspots.isEmpty()) { + writer.println(" (No stack trace information available)"); + return; + } + + // Sort the map by value (count) in descending order + LinkedHashMap sortedHotspots = hotspots.entrySet() + .stream() + .sorted(Map.Entry.comparingByValue((a, b) -> Integer.compare(b.times(), a.times()))) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e1, + LinkedHashMap::new + )); + + int count = 0; + for (Map.Entry entry : sortedHotspots.entrySet()) { + if (count++ >= limit) { + break; + } + HotSpot hs = entry.getValue(); + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (RecordedFrame frame : hs.stackTrace.getFrames()) { + if (first) { + sb.append(frameToString(frame)); + first = false; + } else { + sb.append('\t').append("at ").append(frameToString(frame)); + } + sb.append(System.lineSeparator()); + } + writer.printf(" - [%,d times]:%n%s%n", hs.times(), sb); + } + } + + private String frameToString(RecordedFrame frame) { + RecordedMethod method = frame.getMethod(); + return String.format("%s.%s:%d [%s]", method.getType().getName(), method.getName(), frame.getLineNumber(), frame.getType()); + } + + private static class HotSpot { + private final RecordedStackTrace stackTrace; + private int times; + + private HotSpot(RecordedStackTrace stackTrace) { + this.stackTrace = stackTrace; + } + + public void inc() { + times++; + } + + public int times() { + return times; + } + } +} diff --git a/core/src/main/java/module-info.java b/core/src/main/java/module-info.java index c6b121a68..30b11e51a 100644 --- a/core/src/main/java/module-info.java +++ b/core/src/main/java/module-info.java @@ -6,6 +6,7 @@ requires static jcip.annotations; requires org.jboss.logging; requires static org.jboss.logging.annotations; + requires jdk.jfr; opens org.infinispan.protostream; exports org.infinispan.protostream; exports org.infinispan.protostream.annotations; diff --git a/core/src/main/java/org/infinispan/protostream/impl/ProtoStreamReaderImpl.java b/core/src/main/java/org/infinispan/protostream/impl/ProtoStreamReaderImpl.java index c153afb59..5fcb1587f 100644 --- a/core/src/main/java/org/infinispan/protostream/impl/ProtoStreamReaderImpl.java +++ b/core/src/main/java/org/infinispan/protostream/impl/ProtoStreamReaderImpl.java @@ -27,6 +27,7 @@ import org.infinispan.protostream.descriptors.MapDescriptor; import org.infinispan.protostream.descriptors.Type; import org.infinispan.protostream.descriptors.WireType; +import org.infinispan.protostream.impl.jfr.JfrEventPublisher; import org.jboss.logging.Logger; /** @@ -187,6 +188,7 @@ public Integer readInt(String fieldName) throws IOException { public int[] readInts(String fieldName) throws IOException { List values = readCollection(fieldName, new ArrayList<>(), Integer.class); int[] result = new int[values.size()]; + JfrEventPublisher.intBufferAllocateEvent(result.length); for (int i = 0; i < values.size(); i++) { result[i] = values.get(i); } @@ -202,6 +204,7 @@ public Long readLong(String fieldName) throws IOException { public long[] readLongs(String fieldName) throws IOException { List values = readCollection(fieldName, new ArrayList<>(), Long.class); long[] result = new long[values.size()]; + JfrEventPublisher.longBufferAllocateEvent(result.length); for (int i = 0; i < values.size(); i++) { result[i] = values.get(i); } @@ -229,6 +232,7 @@ public Float readFloat(String fieldName) throws IOException { public float[] readFloats(String fieldName) throws IOException { List values = readCollection(fieldName, new ArrayList<>(), Float.class); float[] result = new float[values.size()]; + JfrEventPublisher.floatBufferAllocateEvent(result.length); for (int i = 0; i < values.size(); i++) { result[i] = values.get(i); } @@ -244,6 +248,7 @@ public Double readDouble(String fieldName) throws IOException { public double[] readDoubles(String fieldName) throws IOException { List values = readCollection(fieldName, new ArrayList<>(), Double.class); double[] result = new double[values.size()]; + JfrEventPublisher.doubleBufferAllocateEvent(result.length); for (int i = 0; i < values.size(); i++) { result[i] = values.get(i); } @@ -259,6 +264,7 @@ public Boolean readBoolean(String fieldName) throws IOException { public boolean[] readBooleans(String fieldName) throws IOException { List values = readCollection(fieldName, new ArrayList<>(), Boolean.class); boolean[] result = new boolean[values.size()]; + JfrEventPublisher.bufferAllocateEvent(result.length); for (int i = 0; i < values.size(); i++) { result[i] = values.get(i); } diff --git a/core/src/main/java/org/infinispan/protostream/impl/ProtoStreamWriterImpl.java b/core/src/main/java/org/infinispan/protostream/impl/ProtoStreamWriterImpl.java index 3361becd3..65a344260 100644 --- a/core/src/main/java/org/infinispan/protostream/impl/ProtoStreamWriterImpl.java +++ b/core/src/main/java/org/infinispan/protostream/impl/ProtoStreamWriterImpl.java @@ -20,6 +20,7 @@ import org.infinispan.protostream.descriptors.MapDescriptor; import org.infinispan.protostream.descriptors.Type; import org.infinispan.protostream.descriptors.WireType; +import org.infinispan.protostream.impl.jfr.JfrEventPublisher; import org.jboss.logging.Logger; /** @@ -452,10 +453,12 @@ public void writeBytes(String fieldName, InputStream input) throws IOException { int len = 0; List chunks = new LinkedList<>(); int bufLen; + JfrEventPublisher.bufferAllocateEvent(CHUNK_SIZE); byte[] buffer = new byte[CHUNK_SIZE]; while ((bufLen = input.read(buffer)) != -1) { chunks.add(buffer); len += bufLen; + JfrEventPublisher.bufferAllocateEvent(CHUNK_SIZE); buffer = new byte[CHUNK_SIZE]; } input.close(); diff --git a/core/src/main/java/org/infinispan/protostream/impl/RandomAccessOutputStreamImpl.java b/core/src/main/java/org/infinispan/protostream/impl/RandomAccessOutputStreamImpl.java index f60ffdb19..ac7c48ab9 100644 --- a/core/src/main/java/org/infinispan/protostream/impl/RandomAccessOutputStreamImpl.java +++ b/core/src/main/java/org/infinispan/protostream/impl/RandomAccessOutputStreamImpl.java @@ -9,6 +9,7 @@ import org.infinispan.protostream.RandomAccessOutputStream; import net.jcip.annotations.NotThreadSafe; +import org.infinispan.protostream.impl.jfr.JfrEventPublisher; @NotThreadSafe public class RandomAccessOutputStreamImpl extends OutputStream implements RandomAccessOutputStream { @@ -27,6 +28,7 @@ public RandomAccessOutputStreamImpl(int capacity) { if (capacity < 0) throw new IllegalArgumentException("Negative initial capacity: " + capacity); this.buf = new byte[capacity]; + JfrEventPublisher.bufferAllocateEvent(capacity); } @Override @@ -73,11 +75,16 @@ public void move(int startPos, int length, int newPos) { @Override public void ensureCapacity(int capacity) { if (buf == null) { - buf = new byte[Math.max(MIN_SIZE, capacity)]; + int cap = Math.max(MIN_SIZE, capacity); + buf = new byte[cap]; + JfrEventPublisher.bufferAllocateEvent(cap); } else if (capacity > buf.length) { - byte[] newbuf = new byte[getNewBufferSize(buf.length, capacity)]; + int before = buf.length; + int cap = getNewBufferSize(buf.length, capacity); + byte[] newbuf = new byte[cap]; System.arraycopy(buf, 0, newbuf, 0, pos); buf = newbuf; + JfrEventPublisher.bufferResizeEvent(before, cap); } } @@ -100,6 +107,7 @@ public void setPosition(int position) { @Override public byte[] toByteArray() { + JfrEventPublisher.bufferAllocateEvent(pos); return Arrays.copyOf(buf, pos); } diff --git a/core/src/main/java/org/infinispan/protostream/impl/StringUtil.java b/core/src/main/java/org/infinispan/protostream/impl/StringUtil.java index b4fbf82cb..e11409d18 100644 --- a/core/src/main/java/org/infinispan/protostream/impl/StringUtil.java +++ b/core/src/main/java/org/infinispan/protostream/impl/StringUtil.java @@ -13,6 +13,7 @@ import java.util.function.Function; import java.util.function.Predicate; +import org.infinispan.protostream.impl.jfr.JfrEventPublisher; import sun.misc.Unsafe; class StringUtil { @@ -52,7 +53,9 @@ public boolean providesLatin1Bytes() { @Override public byte[] getBytes(String s) { - return s.getBytes(StandardCharsets.UTF_8); + byte[] b = s.getBytes(StandardCharsets.UTF_8); + JfrEventPublisher.bufferAllocateEvent(b.length); + return b; } }; } diff --git a/core/src/main/java/org/infinispan/protostream/impl/TagReaderImpl.java b/core/src/main/java/org/infinispan/protostream/impl/TagReaderImpl.java index 5d89d8751..6742e8254 100644 --- a/core/src/main/java/org/infinispan/protostream/impl/TagReaderImpl.java +++ b/core/src/main/java/org/infinispan/protostream/impl/TagReaderImpl.java @@ -21,6 +21,7 @@ import org.infinispan.protostream.ProtobufTagMarshaller; import org.infinispan.protostream.TagReader; import org.infinispan.protostream.descriptors.WireType; +import org.infinispan.protostream.impl.jfr.JfrEventPublisher; /** * @author anistor@redhat.com @@ -520,6 +521,7 @@ byte[] getBufferArray(int offset) { if (offset == 0) { return array; } + JfrEventPublisher.bufferAllocateEvent(array.length - offset); return Arrays.copyOfRange(array, offset, array.length); } @@ -532,6 +534,7 @@ boolean isAtEnd() { String readString() throws IOException { int length = readVarint32(); if (length > 0 && length <= end - pos) { + JfrEventPublisher.bufferAllocateEvent(length); String value = new String(array, pos, length, UTF8); pos += length; return value; @@ -638,6 +641,7 @@ byte[] readRawByteArray(int length) throws IOException { if (length > 0 && length <= end - pos) { int from = pos; pos += length; + JfrEventPublisher.bufferAllocateEvent(pos - from); return Arrays.copyOfRange(array, from, pos); } if (length == 0) { @@ -728,6 +732,7 @@ byte[] getBufferArray(int offset) { if (offset == 0) { return array; } + JfrEventPublisher.bufferAllocateEvent(array.length - offset); return Arrays.copyOfRange(array, offset, array.length); } @@ -741,6 +746,7 @@ boolean isAtEnd() { String readString() throws IOException { int length = readVarint32(); if (length > 0 && length <= end - buf.position()) { + JfrEventPublisher.bufferAllocateEvent(length); byte[] bytes = new byte[length]; buf.get(bytes); return new String(bytes, 0, length, UTF8); @@ -842,6 +848,7 @@ byte readRawByte() throws IOException { @Override byte[] readRawByteArray(int length) throws IOException { if (length > 0 && length <= end - buf.position()) { + JfrEventPublisher.bufferAllocateEvent(length); byte[] bytes = new byte[length]; buf.get(bytes); return bytes; @@ -1085,6 +1092,7 @@ byte[] readRawByteArray(int length) throws IOException { } int readTotal = 0; int readAmount; + JfrEventPublisher.bufferAllocateEvent(length); byte[] array = new byte[length]; while ((readAmount = in.read(array, readTotal, length - readTotal)) != -1) { readTotal += readAmount; diff --git a/core/src/main/java/org/infinispan/protostream/impl/TagWriterImpl.java b/core/src/main/java/org/infinispan/protostream/impl/TagWriterImpl.java index 814b9dd16..e71c07b88 100644 --- a/core/src/main/java/org/infinispan/protostream/impl/TagWriterImpl.java +++ b/core/src/main/java/org/infinispan/protostream/impl/TagWriterImpl.java @@ -20,6 +20,7 @@ import org.infinispan.protostream.RandomAccessOutputStream; import org.infinispan.protostream.TagWriter; import org.infinispan.protostream.descriptors.WireType; +import org.infinispan.protostream.impl.jfr.JfrEventPublisher; /** * @author anistor@redhat.com @@ -584,6 +585,7 @@ void writeUTF8Field(int fieldNumber, String s) { if (c > 127) { // TODO: do this without allocating the byte[] count = s.getBytes(StandardCharsets.UTF_8).length; + JfrEventPublisher.bufferAllocateEvent(count); break; } } @@ -607,13 +609,16 @@ void resize(int maxDepth) { if (nestedPositions == null) { // We are guessing most objects won't have larger than 4 sub elements nestedPositions = new int[10]; + JfrEventPublisher.intBufferAllocateEvent(nestedPositions.length); } else { if (head == nestedPositions.length) { + int before = nestedPositions.length; int newLength = Math.min(nestedPositions.length + 10, maxDepth); if (newLength == maxDepth) { throw log.maxNestedMessageDepth(maxDepth, null); } nestedPositions = Arrays.copyOf(nestedPositions, newLength); + JfrEventPublisher.intBufferResizeEvent(before, newLength); } } } @@ -1007,6 +1012,7 @@ void writeBytes(ByteBuffer value) throws IOException { if (value.hasArray()) { out.write(value.array(), value.arrayOffset(), value.remaining()); } else { + JfrEventPublisher.bufferAllocateEvent(value.remaining()); byte[] buffer = new byte[value.remaining()]; value.get(buffer, value.position(), value.remaining()); out.write(buffer); @@ -1035,6 +1041,7 @@ private static final class OutputStreamEncoder extends Encoder { // least their length varint should fit. bufferSize = Math.max(bufferSize, MAX_VARINT_SIZE * 2); buffer = new ByteArrayEncoder(new byte[bufferSize], 0, bufferSize); + JfrEventPublisher.bufferAllocateEvent(bufferSize); this.out = out; } @@ -1348,6 +1355,7 @@ void writeUTF8FieldWithRewind(int number, String s) throws IOException { } byte[] utf8buffer = s.getBytes(StandardCharsets.UTF_8); + JfrEventPublisher.bufferAllocateEvent(utf8buffer.length); out.ensureCapacity(startPos + MAX_INT_VARINT_SIZE + utf8buffer.length); startPos = writeVarInt32Direct(startPos, utf8buffer.length); @@ -1430,12 +1438,14 @@ Encoder subEncoder(int number, int maxDepth) throws IOException { out.setPosition(pos + 1); if (positions == null) { positions = new int[10]; + JfrEventPublisher.intBufferAllocateEvent(positions.length); } else if (head == positions.length) { int newSize = Math.min(head + 10, maxDepth); if (newSize == maxDepth) { throw log.maxNestedMessageDepth(maxDepth, null); } positions = Arrays.copyOf(positions, newSize); + JfrEventPublisher.intBufferResizeEvent(head, newSize); } positions[head++] = pos + 1; return this; diff --git a/core/src/main/java/org/infinispan/protostream/impl/UnknownFieldSetImpl.java b/core/src/main/java/org/infinispan/protostream/impl/UnknownFieldSetImpl.java index 15ea13936..38bbdb851 100644 --- a/core/src/main/java/org/infinispan/protostream/impl/UnknownFieldSetImpl.java +++ b/core/src/main/java/org/infinispan/protostream/impl/UnknownFieldSetImpl.java @@ -16,6 +16,7 @@ import org.infinispan.protostream.TagWriter; import org.infinispan.protostream.UnknownFieldSet; import org.infinispan.protostream.descriptors.WireType; +import org.infinispan.protostream.impl.jfr.JfrEventPublisher; /** * {@link UnknownFieldSet} implementation. This is not thread-safe. This class should never be directly instantiated by @@ -208,6 +209,7 @@ public void writeExternal(ObjectOutput out) throws IOException { @Override public void readExternal(ObjectInput in) throws IOException { int len = in.readInt(); + JfrEventPublisher.bufferAllocateEvent(len); byte[] bytes = new byte[len]; in.readFully(bytes); readAllFields(TagReaderImpl.newInstance(null, bytes)); diff --git a/core/src/main/java/org/infinispan/protostream/impl/jfr/AbstractAllocatorEvent.java b/core/src/main/java/org/infinispan/protostream/impl/jfr/AbstractAllocatorEvent.java new file mode 100644 index 000000000..cbfc44e36 --- /dev/null +++ b/core/src/main/java/org/infinispan/protostream/impl/jfr/AbstractAllocatorEvent.java @@ -0,0 +1,20 @@ +package org.infinispan.protostream.impl.jfr; + +import jdk.jfr.Category; +import jdk.jfr.DataAmount; +import jdk.jfr.Description; +import jdk.jfr.Enabled; +import jdk.jfr.Event; + +@Enabled(value = false) +@Category("ProtoStream") +class AbstractAllocatorEvent extends Event { + + @DataAmount + @Description("Allocation size") + int size; + + AbstractAllocatorEvent(int size) { + this.size = size; + } +} diff --git a/core/src/main/java/org/infinispan/protostream/impl/jfr/BufferAllocateEvent.java b/core/src/main/java/org/infinispan/protostream/impl/jfr/BufferAllocateEvent.java new file mode 100644 index 000000000..029f09f52 --- /dev/null +++ b/core/src/main/java/org/infinispan/protostream/impl/jfr/BufferAllocateEvent.java @@ -0,0 +1,22 @@ +package org.infinispan.protostream.impl.jfr; + +import jdk.jfr.Description; +import jdk.jfr.Label; +import jdk.jfr.Name; + +@Name(BufferAllocateEvent.NAME) +@Label("Buffer Allocation") +@Description("Triggered when a new buffer is allocated") +final class BufferAllocateEvent extends AbstractAllocatorEvent { + static final String NAME = "org.infinispan.protostream.AllocateEvent"; + + private static final BufferAllocateEvent INSTANCE = new BufferAllocateEvent(0); + + BufferAllocateEvent(int size) { + super(size); + } + + public static boolean isEventEnabled() { + return INSTANCE.isEnabled(); + } +} diff --git a/core/src/main/java/org/infinispan/protostream/impl/jfr/BufferResizeEvent.java b/core/src/main/java/org/infinispan/protostream/impl/jfr/BufferResizeEvent.java new file mode 100644 index 000000000..225ee14d7 --- /dev/null +++ b/core/src/main/java/org/infinispan/protostream/impl/jfr/BufferResizeEvent.java @@ -0,0 +1,32 @@ +package org.infinispan.protostream.impl.jfr; + +import jdk.jfr.DataAmount; +import jdk.jfr.Description; +import jdk.jfr.Label; +import jdk.jfr.Name; + +@Name(BufferResizeEvent.NAME) +@Label("Buffer Resize") +@Description("Triggered when a buffer is resized") +final class BufferResizeEvent extends AbstractAllocatorEvent { + static final String NAME = "org.infinispan.protostream.ResizeEvent"; + private static final BufferResizeEvent INSTANCE = new BufferResizeEvent(0, 0, 0); + + BufferResizeEvent(int before, int after, int size) { + super(size); + this.before = before; + this.after = after; + } + + public static boolean isEventEnabled() { + return INSTANCE.isEnabled(); + } + + @DataAmount + @Description("Buffer size before resizing") + int before; + + @DataAmount + @Description("Buffer size after resizing") + int after; +} diff --git a/core/src/main/java/org/infinispan/protostream/impl/jfr/JfrEventPublisher.java b/core/src/main/java/org/infinispan/protostream/impl/jfr/JfrEventPublisher.java new file mode 100644 index 000000000..f1c93b0ee --- /dev/null +++ b/core/src/main/java/org/infinispan/protostream/impl/jfr/JfrEventPublisher.java @@ -0,0 +1,50 @@ +package org.infinispan.protostream.impl.jfr; + +public final class JfrEventPublisher { + + private static void allocateEvent(int size, int scale) { + if (BufferAllocateEvent.isEventEnabled()) { + BufferAllocateEvent ev = new BufferAllocateEvent(size * scale); + ev.commit(); + } + } + + public static void bufferAllocateEvent(int size) { + if (BufferAllocateEvent.isEventEnabled()) { + BufferAllocateEvent ev = new BufferAllocateEvent(size); + ev.commit(); + } + } + + public static void intBufferAllocateEvent(int size) { + allocateEvent(size, Integer.BYTES); + } + + public static void longBufferAllocateEvent(int size) { + allocateEvent(size, Long.BYTES); + } + + public static void doubleBufferAllocateEvent(int size) { + allocateEvent(size, Double.BYTES); + } + + public static void floatBufferAllocateEvent(int size) { + allocateEvent(size, Float.BYTES); + } + + public static void bufferResizeEvent(int before, int after) { + if (BufferResizeEvent.isEventEnabled()) { + BufferResizeEvent ev = new BufferResizeEvent(before, after, after - before); + ev.commit(); + } + } + + public static void intBufferResizeEvent(int before, int after) { + if (BufferResizeEvent.isEventEnabled()) { + int actualBefore = before * Integer.BYTES; + int actualAfter = after * Integer.BYTES; + BufferResizeEvent ev = new BufferResizeEvent(actualBefore, actualAfter, actualAfter - actualBefore); + ev.commit(); + } + } +} diff --git a/core/src/main/java/org/infinispan/protostream/impl/json/JsonReader.java b/core/src/main/java/org/infinispan/protostream/impl/json/JsonReader.java index 1e8e9c7f7..3c5ab360c 100644 --- a/core/src/main/java/org/infinispan/protostream/impl/json/JsonReader.java +++ b/core/src/main/java/org/infinispan/protostream/impl/json/JsonReader.java @@ -48,6 +48,7 @@ import org.infinispan.protostream.descriptors.Type; import org.infinispan.protostream.impl.RandomAccessOutputStreamImpl; import org.infinispan.protostream.impl.TagWriterImpl; +import org.infinispan.protostream.impl.jfr.JfrEventPublisher; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; @@ -349,6 +350,7 @@ private static void processArray(ImmutableSerializationContext ctx, JsonParser p token = parser.nextToken(); } + JfrEventPublisher.bufferAllocateEvent(result.size()); byte[] binary = new byte[result.size()]; for (int i = 0; i < result.size(); i++) { binary[i] = result.get(i); diff --git a/core/src/test/java/org/infinispan/protostream/impl/jfr/JfrEventPublisherTest.java b/core/src/test/java/org/infinispan/protostream/impl/jfr/JfrEventPublisherTest.java new file mode 100644 index 000000000..08eb526df --- /dev/null +++ b/core/src/test/java/org/infinispan/protostream/impl/jfr/JfrEventPublisherTest.java @@ -0,0 +1,127 @@ +package org.infinispan.protostream.impl.jfr; + +import static org.infinispan.protostream.domain.Account.Currency.BRL; +import static org.infinispan.protostream.domain.Account.Currency.USD; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import jdk.jfr.consumer.RecordingStream; +import org.infinispan.protostream.ProtobufUtil; +import org.infinispan.protostream.SerializationContext; +import org.infinispan.protostream.domain.Account; +import org.infinispan.protostream.test.AbstractProtoStreamTest; +import org.junit.Test; + +public class JfrEventPublisherTest extends AbstractProtoStreamTest { + + @Test + public void testAllocationEventPublished() throws Exception { + Account account = createAccount(); + SerializationContext context = createContext(); + try (RecordingStream stream = new RecordingStream()) { + stream.enable(BufferAllocateEvent.class); + CompletableFuture cf = new CompletableFuture<>(); + stream.onEvent(BufferAllocateEvent.NAME, e -> cf.complete(e.getValue("size"))); + stream.startAsync(); + + byte[] bytes = ProtobufUtil.toWrappedByteArray(context, account); + Account acc = ProtobufUtil.fromWrappedByteArray(context, bytes); + assertEquals(acc, account); + + cf.get(10, TimeUnit.SECONDS); + } + } + + @Test + public void testResizeEventPublished() throws Exception { + Account account = createAccount(); + SerializationContext context = createContext(); + try (RecordingStream stream = new RecordingStream()) { + stream.enable(BufferResizeEvent.class); + CompletableFuture cf = new CompletableFuture<>(); + stream.onEvent(BufferResizeEvent.NAME, e -> cf.complete(e.getValue("size"))); + stream.startAsync(); + + account.setDescription(generateLargeDescription()); + + byte[] bytes = ProtobufUtil.toWrappedByteArray(context, account); + Account acc = ProtobufUtil.fromWrappedByteArray(context, bytes); + assertEquals(acc, account); + cf.get(10, TimeUnit.SECONDS); + } + } + + @Test + public void testDifferentPrimitivesEvents() throws Exception { + try (RecordingStream stream = new RecordingStream()) { + stream.enable(BufferAllocateEvent.class); + stream.enable(BufferResizeEvent.class); + + AtomicInteger index = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(5); + List values = List.of( + Integer.BYTES * 2, + Long.BYTES * 2, + Float.BYTES * 2, + Double.BYTES * 2); + + stream.onEvent(BufferAllocateEvent.NAME, e -> { + assertEquals(values.get(index.getAndIncrement()), e.getInt("size")); + latch.countDown(); + }); + stream.onEvent(BufferResizeEvent.NAME, e -> { + assertEquals(2 * Integer.BYTES, e.getInt("size")); + latch.countDown(); + }); + stream.startAsync(); + + JfrEventPublisher.intBufferAllocateEvent(2); + JfrEventPublisher.longBufferAllocateEvent(2); + JfrEventPublisher.floatBufferAllocateEvent(2); + JfrEventPublisher.doubleBufferAllocateEvent(2); + JfrEventPublisher.intBufferResizeEvent(2, 4); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertEquals(4, index.get()); + } + } + + private String generateLargeDescription() { + return "B".repeat(4096); + } + + + private Account createAccount() { + Account account = new Account(); + account.setId(1); + account.setDescription("test account"); + Account.Limits limits = new Account.Limits(); + limits.setMaxDailyLimit(1.5); + limits.setMaxTransactionLimit(3.5); + limits.setPayees(new String[]{"Madoff", "Ponzi"}); + account.setLimits(limits); + Account.Limits hardLimits = new Account.Limits(); + hardLimits.setMaxDailyLimit(5d); + hardLimits.setMaxTransactionLimit(35d); + account.setHardLimits(hardLimits); + Date creationDate = Date.from(LocalDate.of(2017, 7, 20).atStartOfDay().toInstant(ZoneOffset.UTC)); + account.setCreationDate(creationDate); + List blurb = new ArrayList<>(); + blurb.add(new byte[0]); + blurb.add(new byte[]{123}); + blurb.add(new byte[]{1, 2, 3, 4}); + account.setBlurb(blurb); + account.setCurrencies(new Account.Currency[]{USD, BRL}); + return account; + } +}