From de4afb9f80e9176e4e4e5f1a3eaa2c8a63a32e49 Mon Sep 17 00:00:00 2001 From: Claudio Bisegni Date: Wed, 4 Feb 2026 17:25:36 -0800 Subject: [PATCH 1/2] add respect_push_order property to SnapshotProperties and update Broker to include it in snapshot data --- .devcontainer/docker-compose.yml | 1 + k2eg/broker.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 8b3bc98..c13257e 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -53,6 +53,7 @@ services: k2eg: image: ghcr.io/slaclab/k2eg/ubuntu:latest + platform: linux/amd64 pull_policy: always entrypoint: k2eg depends_on: diff --git a/k2eg/broker.py b/k2eg/broker.py index 3341492..b9957c3 100644 --- a/k2eg/broker.py +++ b/k2eg/broker.py @@ -38,6 +38,8 @@ class SnapshotProperties: # The delay in milliseconds to push the snapshot data before the time window excpires # This permit to reduce latency when snapshot windows is to big with a lot of PVs data sub_push_delay_msec: int = 0 + # Define if the snapshot need to respect the push order + respect_push_order: bool = True # Define if a snapshot need to be automatically or manually triggered triggered: bool = False # The type of the snapshot @@ -423,6 +425,7 @@ def send_repeating_snapshot_command(self, properties:SnapshotProperties, reply_i "repeat_delay_msec": properties.repeat_delay, "time_window_msec": properties.time_window, "sub_push_delay_msec": properties.sub_push_delay_msec, + "respect_push_order": properties.respect_push_order, "triggered": properties.triggered, "type": properties.type.value } From f57047f25080c9547f769e70b4f2b52a154e624c Mon Sep 17 00:00:00 2001 From: Claudio Bisegni Date: Wed, 4 Feb 2026 17:43:02 -0800 Subject: [PATCH 2/2] update Kafka configuration in docker-compose to use Apache Kafka 4.1.0 and adjust environment variables --- .devcontainer/docker-compose.yml | 29 ++++++++++++++--------------- docker-compose.yml | 29 ++++++++++++++--------------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index c13257e..08825b0 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -75,22 +75,21 @@ services: "--configuration-server-host=consul-server"] kafka: - image: docker.io/bitnami/kafka:3.6 - ports: - - "9092:9092" - volumes: - - "kafka_data:/bitnami" + # platform: linux/amd64 + image: apache/kafka:4.1.0 environment: - # KRaft settings - - KAFKA_CFG_NODE_ID=0 - - KAFKA_CFG_PROCESS_ROLES=controller,broker - - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - # Listeners - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_NUM_PARTITIONS: 1 volumes: kafka_data: driver: local diff --git a/docker-compose.yml b/docker-compose.yml index 85fd037..e1f6ce5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -47,22 +47,21 @@ services: - kafka kafka: - image: docker.io/bitnami/kafka:3.6 - ports: - - "9092:9092" - volumes: - - "kafka_data:/bitnami" + # platform: linux/amd64 + image: apache/kafka:4.1.0 environment: - # KRaft settings - - KAFKA_CFG_NODE_ID=0 - - KAFKA_CFG_PROCESS_ROLES=controller,broker - - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - # Listeners - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_NUM_PARTITIONS: 1 volumes: kafka_data: driver: local