Skip to content

Add MemQ support for FlinkSQL 1.18#135

Merged
jeffxiang merged 2 commits intomainfrom
memq_flinksql_integration
Apr 15, 2026
Merged

Add MemQ support for FlinkSQL 1.18#135
jeffxiang merged 2 commits intomainfrom
memq_flinksql_integration

Conversation

@jeffxiang
Copy link
Copy Markdown
Contributor

@jeffxiang jeffxiang commented Apr 13, 2026

Summary

This PR makes PSC-Flink compatible with MemQ source for Flink 1.18+ jobs via the following changes.

Changes

memq-client version upgrade to 1.0.2

  • Bumps memq.version from 0.2.21 to 1.0.2 in psc/pom.xml, psc-examples/pom.xml, and psc-integration-test/pom.xml.

PscMemqMetadataClient implementation

  • New class: PscMemqMetadataClient — implements PscBackendMetadataClient for MemQ. Since MemQ has no dedicated admin client, this uses a transient MemqConsumer to fulfill metadata operations (describeTopicUris, listOffsets, listOffsetsForTimestamps, listOffsetsForConsumerGroup). listTopicRns throws UnsupportedOperationException as MemQ does not support listing all topics.
  • New class: PscMemqMetadataClientCreator — annotated with @PscMetadataClientCreatorPlugin(backend = "memq", priority = 1) for auto-discovery via the existing plugin registry.
  • New class: PscMetadataClientToMemqConsumerConfigConverter — converts PSC metadata client configuration to MemQ consumer properties.
  • MemqTopicUri constructor widened from package-private to public so it can be used by the metadata client.
  • PscMetadataClientCreatorManager changed from HashMap to TreeMap for deterministic iteration order of backend creator registrations.

MemQ-Flink metrics support

  • New class: MemqSourceReaderMetricsUtil — provides metric filter predicates for MemQ-specific metric names (bytes.consumed.total, notification.records.lag.max) under the memq-consumer-metrics group.
  • PscSourceReaderMetrics.registerNumBytesIn and maybeAddPendingRecordsMetric now dispatch to the correct metric filter based on backend type (Kafka vs MemQ) instead of hardcoding the Kafka filter.
  • PscSourceReaderMetrics.getBackendFromTags hardened against empty metrics maps (returns "unknown" instead of throwing NoSuchElementException).
  • PscSourceReaderMetrics.maybeAddPendingRecordsMetric catches IllegalStateException when the metric is not yet available (MemQ metrics may not be registered until the first poll) and returns null to retry on the next cycle.
  • PscMemqConsumer.metrics() now returns live Dropwizard metrics from the underlying MemqConsumer's MetricRegistry, wrapped in a LiveDropwizardMetric adapter that reads current values on each call rather than returning stale snapshots.

Reduce per-record hashcode computation via caching

  • TopicUriPartition.hashCode() now caches its result in a transient int cachedHashCode field, invalidated on setTopicUri(). This eliminates redundant String.hashCode() + TopicUri.hashCode() recomputation on every record during the Flink read loop (HashMap lookups in PscPartitionSplitRecords, offset tracking, etc.).

Per-record offset tracking optimization

  • PscSourceReaderMetrics.Offset class widened to public with public fields.
  • New getOffsetTracker(TopicUriPartition) method allows callers to cache the Offset reference.
  • PscPartitionSplitRecords caches the Offset tracker per partition and writes currentOffset directly (currentOffsetTracker.currentOffset = offset) instead of calling metrics.recordCurrentOffset(tp, offset) on every record, eliminating a HashMap lookup per message.

Proper MemQ offset management

  • PscMemqConsumer.startOffsets() and endOffsets() now encode raw Kafka notification offsets via kafkaOffsetToComposite() (wrapping them in MemqOffset(kafkaOffset, 0).toLong()). Previously, raw Kafka offsets were returned directly, which caused seekToOffset to misinterpret them — e.g., a raw offset of 2205646 would be decoded as batch offset 4 after the 19-bit right shift in MemqOffset.convertPscOffsetToMemqOffset.
  • New test class TestMemqOffsetRoundTrip validates the encoding round-trip and demonstrates the corruption that occurs without it.

Memory optimizations

  • PscMetricRegistryManager: replaced SlidingTimeWindowArrayReservoir(1, TimeUnit.MINUTES) with ExponentiallyDecayingReservoir() for histogram metrics. SlidingTimeWindowArrayReservoir retains all samples within the time window, which can grow unbounded under high throughput; ExponentiallyDecayingReservoir uses a fixed-size sample with exponential decay, providing bounded memory usage.

Other fixes

  • PscMemqConsumer.close() now calls scheduler.shutdown() to prevent thread leaks.
  • PscConsumerMessagesIterable now closes the underlying PscConsumerPollMessageIterator after draining it to asList(), preventing resource leaks (relevant for MemQ batch iterators that hold references to byte buffers).

Test plan

Tested via unit tests, integration tests, and e2e functional testing on Flink clusters.

Use one memq consumer

Fix group_id

make memqConsumer protected

omit memq numbytes in metric

Close iterator after persisting to messages list

Add MemqConsumer PscSourceReaderMetrics support

Update metrics capture

Fix memq offset conversion

Try to fix unit test failure

Clean up MetricRegistry scheduler + change histograms to ExponentiallyDecayingReservoir

Try to reduce hot path CPU util by caching offsets to prevent hashcode computation + hashmap lookup

Bump memq.version to 1.0.2
@jeffxiang jeffxiang requested a review from a team as a code owner April 13, 2026 20:56
public void registerNumBytesIn(PscConsumer<?, ?> consumer) throws ClientException {
Predicate<Map.Entry<MetricName, ? extends Metric>> filter =
KafkaSourceReaderMetricsUtil.createBytesConsumedFilter();
String backendType = getBackendFromTags(consumer.metrics());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle potential null return from this call?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in getBackendFromTags to return unknown if null

&& entry.getKey().name().equals(BYTES_CONSUMED_TOTAL);
}

protected static Predicate<Map.Entry<MetricName, ? extends Metric>> createRecordsLagFilter(TopicUriPartition tp) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm, is tp a no-op for this function?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's also a no-op on Kafka side. for some reason when we ported the Kafka connector implementation to PSC, filtering further by tp returned empty results, and it only returned metrics when we didn't filter further by tp. We left the method signature untouched but commented out the tp filtering logic.

Comment on lines +344 to +346
public static class Offset {
public long currentOffset;
public long committedOffset;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for making this part of the public API? Any potential harm?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it needs to be accessed from multiple sub packages. I don't think there is potential harm - it's just a data holder for metrics tracking and doesn't impact any business logic.

@jeffxiang jeffxiang merged commit ab09b1e into main Apr 15, 2026
2 of 3 checks passed
@jeffxiang jeffxiang deleted the memq_flinksql_integration branch April 15, 2026 20:53
jeffxiang added a commit that referenced this pull request Apr 15, 2026
* Add Memq Metadata Client

Use one memq consumer

Fix group_id

make memqConsumer protected

omit memq numbytes in metric

Close iterator after persisting to messages list

Add MemqConsumer PscSourceReaderMetrics support

Update metrics capture

Fix memq offset conversion

Try to fix unit test failure

Clean up MetricRegistry scheduler + change histograms to ExponentiallyDecayingReservoir

Try to reduce hot path CPU util by caching offsets to prevent hashcode computation + hashmap lookup

Bump memq.version to 1.0.2

* Handle null backend types

---------

Co-authored-by: artem <atetenkin@pinterest.com>
jeffxiang added a commit that referenced this pull request Apr 16, 2026
* Merge pull request #131 from KevBrowne/enable-flink-projection-pushdown

Enable Flink projection pushdown for PSC connector

* Add MemQ support for FlinkSQL 1.18 (#135)

* Add Memq Metadata Client

Use one memq consumer

Fix group_id

make memqConsumer protected

omit memq numbytes in metric

Close iterator after persisting to messages list

Add MemqConsumer PscSourceReaderMetrics support

Update metrics capture

Fix memq offset conversion

Try to fix unit test failure

Clean up MetricRegistry scheduler + change histograms to ExponentiallyDecayingReservoir

Try to reduce hot path CPU util by caching offsets to prevent hashcode computation + hashmap lookup

Bump memq.version to 1.0.2

* Handle null backend types

---------

Co-authored-by: artem <atetenkin@pinterest.com>

---------

Co-authored-by: Vahid Hashemian <vahid.hashemian@gmail.com>
Co-authored-by: artem <atetenkin@pinterest.com>
@jeffxiang jeffxiang mentioned this pull request Apr 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants