Conversation
Use one memq consumer Fix group_id make memqConsumer protected omit memq numbytes in metric Close iterator after persisting to messages list Add MemqConsumer PscSourceReaderMetrics support Update metrics capture Fix memq offset conversion Try to fix unit test failure Clean up MetricRegistry scheduler + change histograms to ExponentiallyDecayingReservoir Try to reduce hot path CPU util by caching offsets to prevent hashcode computation + hashmap lookup Bump memq.version to 1.0.2
| public void registerNumBytesIn(PscConsumer<?, ?> consumer) throws ClientException { | ||
| Predicate<Map.Entry<MetricName, ? extends Metric>> filter = | ||
| KafkaSourceReaderMetricsUtil.createBytesConsumedFilter(); | ||
| String backendType = getBackendFromTags(consumer.metrics()); |
There was a problem hiding this comment.
Do we need to handle potential null return from this call?
There was a problem hiding this comment.
fixed in getBackendFromTags to return unknown if null
| && entry.getKey().name().equals(BYTES_CONSUMED_TOTAL); | ||
| } | ||
|
|
||
| protected static Predicate<Map.Entry<MetricName, ? extends Metric>> createRecordsLagFilter(TopicUriPartition tp) { |
There was a problem hiding this comment.
To confirm, is tp a no-op for this function?
There was a problem hiding this comment.
yes, it's also a no-op on Kafka side. for some reason when we ported the Kafka connector implementation to PSC, filtering further by tp returned empty results, and it only returned metrics when we didn't filter further by tp. We left the method signature untouched but commented out the tp filtering logic.
| public static class Offset { | ||
| public long currentOffset; | ||
| public long committedOffset; |
There was a problem hiding this comment.
What's the reason for making this part of the public API? Any potential harm?
There was a problem hiding this comment.
it needs to be accessed from multiple sub packages. I don't think there is potential harm - it's just a data holder for metrics tracking and doesn't impact any business logic.
* Add Memq Metadata Client Use one memq consumer Fix group_id make memqConsumer protected omit memq numbytes in metric Close iterator after persisting to messages list Add MemqConsumer PscSourceReaderMetrics support Update metrics capture Fix memq offset conversion Try to fix unit test failure Clean up MetricRegistry scheduler + change histograms to ExponentiallyDecayingReservoir Try to reduce hot path CPU util by caching offsets to prevent hashcode computation + hashmap lookup Bump memq.version to 1.0.2 * Handle null backend types --------- Co-authored-by: artem <atetenkin@pinterest.com>
* Merge pull request #131 from KevBrowne/enable-flink-projection-pushdown Enable Flink projection pushdown for PSC connector * Add MemQ support for FlinkSQL 1.18 (#135) * Add Memq Metadata Client Use one memq consumer Fix group_id make memqConsumer protected omit memq numbytes in metric Close iterator after persisting to messages list Add MemqConsumer PscSourceReaderMetrics support Update metrics capture Fix memq offset conversion Try to fix unit test failure Clean up MetricRegistry scheduler + change histograms to ExponentiallyDecayingReservoir Try to reduce hot path CPU util by caching offsets to prevent hashcode computation + hashmap lookup Bump memq.version to 1.0.2 * Handle null backend types --------- Co-authored-by: artem <atetenkin@pinterest.com> --------- Co-authored-by: Vahid Hashemian <vahid.hashemian@gmail.com> Co-authored-by: artem <atetenkin@pinterest.com>
Summary
This PR makes PSC-Flink compatible with MemQ source for Flink 1.18+ jobs via the following changes.
Changes
memq-client version upgrade to 1.0.2
memq.versionfrom0.2.21to1.0.2inpsc/pom.xml,psc-examples/pom.xml, andpsc-integration-test/pom.xml.PscMemqMetadataClientimplementationPscMemqMetadataClient— implementsPscBackendMetadataClientfor MemQ. Since MemQ has no dedicated admin client, this uses a transientMemqConsumerto fulfill metadata operations (describeTopicUris,listOffsets,listOffsetsForTimestamps,listOffsetsForConsumerGroup).listTopicRnsthrowsUnsupportedOperationExceptionas MemQ does not support listing all topics.PscMemqMetadataClientCreator— annotated with@PscMetadataClientCreatorPlugin(backend = "memq", priority = 1)for auto-discovery via the existing plugin registry.PscMetadataClientToMemqConsumerConfigConverter— converts PSC metadata client configuration to MemQ consumer properties.MemqTopicUriconstructor widened from package-private topublicso it can be used by the metadata client.PscMetadataClientCreatorManagerchanged fromHashMaptoTreeMapfor deterministic iteration order of backend creator registrations.MemQ-Flink metrics support
MemqSourceReaderMetricsUtil— provides metric filter predicates for MemQ-specific metric names (bytes.consumed.total,notification.records.lag.max) under thememq-consumer-metricsgroup.PscSourceReaderMetrics.registerNumBytesInandmaybeAddPendingRecordsMetricnow dispatch to the correct metric filter based on backend type (Kafka vs MemQ) instead of hardcoding the Kafka filter.PscSourceReaderMetrics.getBackendFromTagshardened against empty metrics maps (returns"unknown"instead of throwingNoSuchElementException).PscSourceReaderMetrics.maybeAddPendingRecordsMetriccatchesIllegalStateExceptionwhen the metric is not yet available (MemQ metrics may not be registered until the first poll) and returnsnullto retry on the next cycle.PscMemqConsumer.metrics()now returns live Dropwizard metrics from the underlyingMemqConsumer'sMetricRegistry, wrapped in aLiveDropwizardMetricadapter that reads current values on each call rather than returning stale snapshots.Reduce per-record hashcode computation via caching
TopicUriPartition.hashCode()now caches its result in atransient int cachedHashCodefield, invalidated onsetTopicUri(). This eliminates redundantString.hashCode()+TopicUri.hashCode()recomputation on every record during the Flink read loop (HashMap lookups inPscPartitionSplitRecords, offset tracking, etc.).Per-record offset tracking optimization
PscSourceReaderMetrics.Offsetclass widened topublicwith public fields.getOffsetTracker(TopicUriPartition)method allows callers to cache theOffsetreference.PscPartitionSplitRecordscaches theOffsettracker per partition and writescurrentOffsetdirectly (currentOffsetTracker.currentOffset = offset) instead of callingmetrics.recordCurrentOffset(tp, offset)on every record, eliminating a HashMap lookup per message.Proper MemQ offset management
PscMemqConsumer.startOffsets()andendOffsets()now encode raw Kafka notification offsets viakafkaOffsetToComposite()(wrapping them inMemqOffset(kafkaOffset, 0).toLong()). Previously, raw Kafka offsets were returned directly, which causedseekToOffsetto misinterpret them — e.g., a raw offset of2205646would be decoded as batch offset4after the 19-bit right shift inMemqOffset.convertPscOffsetToMemqOffset.TestMemqOffsetRoundTripvalidates the encoding round-trip and demonstrates the corruption that occurs without it.Memory optimizations
PscMetricRegistryManager: replacedSlidingTimeWindowArrayReservoir(1, TimeUnit.MINUTES)withExponentiallyDecayingReservoir()for histogram metrics.SlidingTimeWindowArrayReservoirretains all samples within the time window, which can grow unbounded under high throughput;ExponentiallyDecayingReservoiruses a fixed-size sample with exponential decay, providing bounded memory usage.Other fixes
PscMemqConsumer.close()now callsscheduler.shutdown()to prevent thread leaks.PscConsumerMessagesIterablenow closes the underlyingPscConsumerPollMessageIteratorafter draining it toasList(), preventing resource leaks (relevant for MemQ batch iterators that hold references to byte buffers).Test plan
Tested via unit tests, integration tests, and e2e functional testing on Flink clusters.