feat(ingestion): add Kestra pipeline connector#27742
feat(ingestion): add Kestra pipeline connector#27742puri-adityakumar wants to merge 3 commits intoopen-metadata:mainfrom
Conversation
Adds a new pipeline-service connector for Kestra (kestra.io), closing part of open-metadata#26656. Maps Kestra flows to OM Pipelines, executions to PipelineStatus, and Flow-trigger relationships to inter-pipeline lineage edges. Highlights: - Static DAG sourced from Kestra's /flows/{ns}/{id}/graph endpoint; transitive closure across synthetic GraphCluster nodes flattens flowable Parallel/Sequential/ForEach to clean OM tasks. - Tenant-aware client supports both pre-tenant Kestra (< 0.18) and tenant-scoped (0.18+) APIs via configurable tenantId. - Three auth modes: no-auth (default), basic, bearer token. - Filter-pattern support; disabled flows skipped. Tests: 14 unit tests (5 source + 9 client) against fixtures captured from a live Kestra v0.20.5 instance.
… trigger access Three review fixes on the connector: 1. CRITICAL: Pipeline FQN was registered as service.flowId (1-part) but _pipeline_fqn() builds service.namespace.flowId (3-part), so metadata.get_by_name silently returned None for any namespaced flow, suppressing all status and lineage emission. Pipeline names now include the Kestra namespace; lookups agree. 2. HIGH: KestraClient leaked its requests.Session on every ingestion run. Added close() and context-manager protocol. 3. HIGH: yield_pipeline_lineage_details and _schedule_from_triggers round-tripped trigger objects through model_dump(by_alias=True), then read them back as dicts. Replaced with direct typed-attribute access (trig.type, trig.cron, trig.conditions, trig.id). Tests: 14/14 still pass; added an FQN regression assertion.
Pipeline names contain dots (Kestra namespace) and OM s FQN builder wraps such names in double quotes when forming the FQN. Our _pipeline_fqn() and the upstream-flow lookup in yield_pipeline_lineage_details() were building unquoted FQNs, so the metadata.get_by_name() calls during status + lineage emission missed every namespaced pipeline. Found during a real ingestion against OM 1.12.0-SNAPSHOT: server stores kestra_demo."hackathon.demo.cron_etl" but the connector was looking up kestra_demo.hackathon.demo.cron_etl. Tests: existing 14/14 still green; live ingestion now emits with 100% workflow success.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
| return int(dt.timestamp() * 1000) | ||
|
|
||
|
|
||
| class KestraSource(PipelineServiceSource): |
There was a problem hiding this comment.
⚠️ Bug: Missing close() override leaks HTTP session
KestraSource does not override close() to call self.client.close(). The KestraClient wraps a requests.Session which holds open TCP connections. Other pipeline connectors (Airflow, KafkaConnect, DatabricksPipeline) all override close() for cleanup. Over long-running ingestion workflows this leaks sockets.
Suggested fix:
def close(self) -> None:
if hasattr(self, "client") and self.client:
self.client.close()
super().close()
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| page += 1 | ||
|
|
||
| def get_flow(self, namespace: str, flow_id: str) -> KestraFlow: | ||
| data = self._get_json(f"/flows/{namespace}/{flow_id}") |
There was a problem hiding this comment.
⚠️ Bug: Namespace not URL-encoded in API paths — dots break routing
In client.py, namespace values containing dots (e.g. hackathon.demo) are interpolated directly into URL paths like /flows/{namespace}/{flow_id}/graph. Kestra's REST API treats dots in the namespace literally, but if a namespace ever contains characters like / or %, the URL will be malformed. More importantly, some HTTP proxies or servers may interpret dots as path separators. The safer pattern used by other connectors is to URL-encode path segments via urllib.parse.quote(namespace, safe='').
Suggested fix:
from urllib.parse import quote
def get_flow(self, namespace: str, flow_id: str) -> KestraFlow:
data = self._get_json(f"/flows/{quote(namespace, safe='')}/{quote(flow_id, safe='')}")
return KestraFlow.model_validate(data)
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| for summary in summaries: | ||
| try: | ||
| detail = self.client.get_execution(summary.id) | ||
| ts = _ms(detail.state.startDate or detail.state.endDate) | ||
| pipeline_status = PipelineStatus( | ||
| timestamp=Timestamp(ts) if ts else None, | ||
| executionStatus=_map_state(detail.state.current), | ||
| taskStatus=self._task_statuses(detail), | ||
| ) | ||
| yield Either( | ||
| right=OMetaPipelineStatus( | ||
| pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status | ||
| ) | ||
| ) | ||
| except Exception as exc: # noqa: BLE001 |
There was a problem hiding this comment.
⚠️ Edge Case: search_executions fetches per-execution detail — N+1 API calls
In yield_pipeline_status, each execution summary from the search endpoint triggers an additional get_execution(summary.id) call (line 196). With page_size=50, this means up to 50 HTTP round-trips per flow. The search response already includes taskRunList and state (as visible in the test fixture kestra_dataset.json → executions.results[0]), so the detail call is redundant and can be eliminated by using the summary directly.
Suggested fix:
# In yield_pipeline_status, use summary directly:
for summary in summaries:
try:
ts = _ms(summary.state.startDate or summary.state.endDate)
pipeline_status = PipelineStatus(
timestamp=Timestamp(ts) if ts else None,
executionStatus=_map_state(summary.state.current),
taskStatus=self._task_statuses(summary),
)
yield Either(
right=OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn,
pipeline_status=pipeline_status,
)
)
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| queue = list(adj.get(start_uid, [])) | ||
| while queue: | ||
| uid = queue.pop(0) | ||
| if uid in seen: | ||
| continue | ||
| seen.add(uid) | ||
| tid = uid_to_taskid.get(uid) | ||
| if tid: | ||
| if tid != uid_to_taskid.get(start_uid) and tid not in out: | ||
| out.append(tid) | ||
| # don't traverse past a real task | ||
| continue | ||
| queue.extend(adj.get(uid, [])) |
There was a problem hiding this comment.
💡 Quality: BFS uses list as queue — O(n²) for large graphs
In _tasks_from_graph, downstream_tasks uses queue.pop(0) on a Python list, which is O(n) per pop, yielding O(n²) overall. For large Kestra flows with many nodes this is suboptimal. Use collections.deque instead.
Suggested fix:
from collections import deque
def downstream_tasks(start_uid: str) -> List[str]:
seen: set[str] = set()
out: List[str] = []
queue = deque(adj.get(start_uid, []))
while queue:
uid = queue.popleft()
if uid in seen:
continue
seen.add(uid)
tid = uid_to_taskid.get(uid)
if tid:
if tid != uid_to_taskid.get(start_uid) and tid not in out:
out.append(tid)
continue
queue.extend(adj.get(uid, []))
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
Describe your changes
Adds a new Kestra pipeline connector to the ingestion framework, addressing one of the requested integrations in #26656.
The connector ingests flows, executions, task DAGs, and inter-pipeline lineage from a Kestra server (OSS or EE) via its REST API.
Key implementation details
/flows/{namespace}/{id}/graphendpoint — no runtime-trace reconstruction.GraphClusterRoot/GraphClusterEnd/GraphTrigger) so the rendered OpenMetadata DAG only contains real, user-authored tasks.noAuth(default), HTTP Basic, and Bearer token.io.kestra.plugin.core.trigger.Flowtriggers — emitted asEntitiesEdgewithlineageDetails.source = PipelineLineage.kestra_demo."hackathon.demo.cron_etl")._pipeline_fqn()and the upstream-flow lookup inyield_pipeline_lineage_details()both honor this — caught and fixed during E2E (commitb480289).pipelineFilterPattern; skips disabled flows.Files added/changed
ingestion/src/metadata/ingestion/source/pipeline/kestra/—__init__.py,service_spec.py,models.py,client.py,connection.py,metadata.pyopenmetadata-spec/.../pipeline/kestraConnection.json(new schema)pipelineService.json—Kestraappended toenum,javaEnums,oneOfingestion/setup.py—"kestra": set()plugin entryingestion/tests/unit/topology/pipeline/+ a captured graph fixture from Kestra v0.20.5Type of change
How was this tested?
pytest tests/unit/topology/pipeline/test_kestra.py tests/unit/topology/pipeline/test_kestra_client.py→ 14 / 14 pass (5 metadata + 9 client)make generateregeneratesKestraConnection.pycleanly from the new JSON schemaextract → transform → loadforcron_etl)cron_etlschedule round-trips (0 2 * * *)/api/v1/pipelines/{fqn}/statuscron_etl → downstream_consumerwithdescription: "Kestra Flow trigger: on_cron_etl"demo/screenshots/api-01..04.jsonin the demo workspaceOut of scope (planned follow-ups)
TestConnectionDefinition— uses the defaultGetPipelinesstep for now; DB-migration adding a Kestra-specific row is deferred to a follow-up.openmetadata-docs/.../connectors/pipeline/kestra/— happy to add in a docs-only follow-up if maintainers prefer.markDeletedPipelinesinteraction. End-to-end testing surfaced a pre-existing core bug:register_recordbuilds the seen-set FQN viafqn.build()whose canonical form doesn't match the FQN thatdelete_entity_from_sourcequeries against, so all pipelines get soft-deleted at the end of each run. Worked around in the demo config (markDeletedPipelines: false); upstream fix tracked separately. Not a blocker for this connector.Checklist
feat(ingestion): add Kestra pipeline connector)Closes part of #26656.