Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion .github/workflows/BuildAndTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,51 @@ jobs:
if: always()
run: |
docker compose logs
docker compose down -v --rmi all
docker compose down -v --rmi all

publish_dev:
name: Publish Dev Package
runs-on: ubuntu-latest
needs:
- execute_test
if: ${{ github.event.pull_request.head.repo.fork == false }}
steps:
- name: Checkout source code
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Set up Python "3.9"
uses: actions/setup-python@v4
with:
python-version: 3.9
- name: Install GitVersion
uses: gittools/actions/gitversion/setup@v0
with:
versionSpec: '5.x'
- name: Find Version
id: gitversion
uses: gittools/actions/gitversion/execute@v0
with:
useConfigFile: true
- name: Manage Version (PR dev tag)
run: |
PR_NUMBER='${{ github.event.pull_request.number }}'
# Create a PEP 440-compliant dev version that signals PR builds, e.g. 0.2.21.dev50{commits}
VERSION_DEV="$GitVersion_MajorMinorPatch.dev${PR_NUMBER}$GitVersion_CommitsSinceVersionSource"
echo "Setting PR dev version: $VERSION_DEV"
sed -i "s#^version = \"0.0.0\"#version = \"$VERSION_DEV\"#" pyproject.toml
grep '^version =' pyproject.toml
- name: Install build deps
run: |
python -m pip install --upgrade pip
pip install build twine
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Build package
run: |
python -m build .
- name: Upload on TestPyPi
env:
TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }}
TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }}
run: |
twine upload -u "$TWINE_USERNAME" -p "$TWINE_PASSWORD" --verbose --skip-existing dist/*
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ k2eg/__pycache__*
dist
k2eg.egg-info
k2eg/cli/__pycache__*
kafka.log
pv_list.txt
k2eg/environment/lcls*
28 changes: 25 additions & 3 deletions k2eg/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import logging
import logging.handlers
import re
import threading
import configparser
Expand All @@ -11,8 +12,19 @@
from confluent_kafka import Consumer, TopicPartition, Producer, OFFSET_END, OFFSET_BEGINNING
from confluent_kafka import KafkaError, KafkaException


logger = logging.getLogger(__name__)

kafka_syslog_logger = logging.getLogger("k2eg-rdkafka")
kafka_syslog_logger.setLevel(logging.DEBUG)
syslog_handler = logging.handlers.RotatingFileHandler("kafka.log", maxBytes=10*1024*1024, backupCount=2)
# syslog_handler = logging.handlers.SysLogHandler(address="/dev/log")
formatter = logging.Formatter('%(name)s: %(levelname)s %(message)s')
syslog_handler.setFormatter(formatter)
kafka_syslog_logger.addHandler(syslog_handler)
# Prevent propagation to root logger to avoid console output
kafka_syslog_logger.propagate = False

class SnapshotType(Enum):
"""
Enum to define the type of snapshot
Expand Down Expand Up @@ -163,7 +175,8 @@ def __init__(
enable_kafka_debug = os.getenv(
'K2EG_PYTHON_ENABLE_KAFKA_DEBUG_LOG',
'false'
).lower in ("yes", "true", "t", "1")
).lower() in ("yes", "true", "t", "1")


self.__enviroment_set = enviroment_set
# Create a new ConfigParser object
Expand Down Expand Up @@ -193,17 +206,26 @@ def __init__(
'session.timeout.ms': 30000, # Session timeout
'heartbeat.interval.ms': 3000, # Heartbeat interval
'max.poll.interval.ms': 300000, # Max poll interval

# Ensure we capture rdkafka internal errors and warnings
'log_level': 4, # Capture WARNING and above (errors, critical, etc.)
}
if enable_kafka_debug is True:
config_consumer['debug'] = 'consumer,fetch'
config_consumer['log_level'] = 7 # Capture all debug messages when debug enabled

self.__consumer = Consumer(config_consumer)
self.__consumer = Consumer(config_consumer, logger=kafka_syslog_logger)
config_producer = {
'bootstrap.servers': self.__config.get(
self.__enviroment_set, 'kafka_broker_url'
),
# Ensure we capture rdkafka internal errors and warnings for producer too
'log_level': 4, # Capture WARNING and above
#'debug': 'consumer,cgrp,topic,fetch',
}
if enable_kafka_debug is True:
config_producer['log_level'] = 7 # Capture all debug messages when debug enabled

self.__producer = Producer(config_producer)
self.__reply_topic = app_name + '-reply'
self.__reply_partition_assigned = threading.Event()
Expand Down Expand Up @@ -271,7 +293,7 @@ def wait_for_reply_available(self, timeout):
end_time = start_time + timeout
while self.__reply_partition_assigned.wait(1) is False:
if time.time() > end_time:
raise TimeoutError("Function timed out")
raise TimeoutError(f"Timeout waiting to join the reply topic {self.__reply_topic}")
logger.debug("waiting for reply topic to join")
self.__consumer.poll(1)

Expand Down
Loading