-
-
Notifications
You must be signed in to change notification settings - Fork 3
feat(client) Add the first chunk of the taskbroker_client #531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This adds the Application, Namespace and Task objects needed to define and send tasks. I'll add the worker and scheduler separately to keep pull requests reasonably sized. Refs STREAM-605
| produce_future.result(timeout=10) | ||
| except Exception: | ||
| logger.exception("Failed to wait for delivery") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: When wait_for_delivery=True, exceptions during task delivery are caught and logged but not propagated to the caller, causing silent failures.
Severity: CRITICAL
Suggested Fix
Modify the except Exception block in the send_task method. When wait_for_delivery is True and an exception is caught, the exception should be re-raised. This will ensure that the caller is notified of the delivery failure and can handle it appropriately, preventing silent task loss.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: clients/python/src/taskbroker_client/registry.py#L189-L191
Potential issue: When a task is sent using `apply_async` with the
`wait_for_delivery=True` option, the system is expected to wait for confirmation that
the task was delivered. However, if the delivery to Kafka fails (e.g., due to a timeout
or connectivity issue), the resulting exception is caught, logged, and then suppressed.
The calling code receives no indication of the failure and proceeds as if the task was
successfully scheduled. This can lead to silent failures where critical tasks are
dropped without any error being surfaced to the application logic that scheduled them.
Did we get this right? 👍 / 👎 to inform future reviews.
| class DefaultRouter(TaskRouter): | ||
| """ | ||
| Stub router that resolves all namespaces to a default topic | ||
| """ | ||
|
|
||
| def route_namespace(self, name: str) -> str: | ||
| return "taskbroker" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sentry will continue to have a Router implementation.
| [tool.mypy] | ||
| files = ["."] | ||
| mypy_path = ["src"] | ||
| explicit_package_bases = true | ||
| # minimal strictness settings | ||
| check_untyped_defs = true | ||
| no_implicit_reexport = true | ||
| warn_unreachable = true | ||
| warn_unused_configs = true | ||
| warn_unused_ignores = true | ||
| warn_redundant_casts = true | ||
| enable_error_code = ["ignore-without-code", "redundant-self"] | ||
| local_partial_types = true # compat with dmypy | ||
| disallow_any_generics = true | ||
| disallow_untyped_defs = true | ||
|
|
||
| # begin: missing 3rd party stubs | ||
| [[tool.mypy.overrides]] | ||
| module = [ | ||
| ".conftest", | ||
| "redis.*", | ||
| "rediscluster.*", | ||
| "confluent_kafka.*", | ||
| ] | ||
| ignore_missing_imports = true | ||
| # end: missing 3rd party stubs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't required as mypy configuration is shared with integration_tests in the top level pyproject.toml file.
| self, | ||
| name: str, | ||
| application: str, | ||
| producer_factory: ProducerFactory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client applications are responsible for providing a factory function that receives a topic name and returns an arroyo producer for that topic.
evanh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
This adds the Application, Namespace and Task objects needed to define and send tasks. I'll add the worker and scheduler separately to keep pull requests reasonably sized.
Refs STREAM-605