diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 8b3bc98..08825b0 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -53,6 +53,7 @@ services: k2eg: image: ghcr.io/slaclab/k2eg/ubuntu:latest + platform: linux/amd64 pull_policy: always entrypoint: k2eg depends_on: @@ -74,22 +75,21 @@ services: "--configuration-server-host=consul-server"] kafka: - image: docker.io/bitnami/kafka:3.6 - ports: - - "9092:9092" - volumes: - - "kafka_data:/bitnami" + # platform: linux/amd64 + image: apache/kafka:4.1.0 environment: - # KRaft settings - - KAFKA_CFG_NODE_ID=0 - - KAFKA_CFG_PROCESS_ROLES=controller,broker - - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - # Listeners - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_NUM_PARTITIONS: 1 volumes: kafka_data: driver: local diff --git a/docker-compose.yml b/docker-compose.yml index 85fd037..e1f6ce5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -47,22 +47,21 @@ services: - kafka kafka: - image: docker.io/bitnami/kafka:3.6 - ports: - - "9092:9092" - volumes: - - "kafka_data:/bitnami" + # platform: linux/amd64 + image: apache/kafka:4.1.0 environment: - # KRaft settings - - KAFKA_CFG_NODE_ID=0 - - KAFKA_CFG_PROCESS_ROLES=controller,broker - - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - # Listeners - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_NUM_PARTITIONS: 1 volumes: kafka_data: driver: local diff --git a/k2eg/broker.py b/k2eg/broker.py index 3341492..b9957c3 100644 --- a/k2eg/broker.py +++ b/k2eg/broker.py @@ -38,6 +38,8 @@ class SnapshotProperties: # The delay in milliseconds to push the snapshot data before the time window excpires # This permit to reduce latency when snapshot windows is to big with a lot of PVs data sub_push_delay_msec: int = 0 + # Define if the snapshot need to respect the push order + respect_push_order: bool = True # Define if a snapshot need to be automatically or manually triggered triggered: bool = False # The type of the snapshot @@ -423,6 +425,7 @@ def send_repeating_snapshot_command(self, properties:SnapshotProperties, reply_i "repeat_delay_msec": properties.repeat_delay, "time_window_msec": properties.time_window, "sub_push_delay_msec": properties.sub_push_delay_msec, + "respect_push_order": properties.respect_push_order, "triggered": properties.triggered, "type": properties.type.value }