Skip to content

feat(ingestion): add Kestra pipeline connector#27742

Open
puri-adityakumar wants to merge 3 commits intoopen-metadata:mainfrom
puri-adityakumar:feat/kestra-pipeline-connector
Open

feat(ingestion): add Kestra pipeline connector#27742
puri-adityakumar wants to merge 3 commits intoopen-metadata:mainfrom
puri-adityakumar:feat/kestra-pipeline-connector

Conversation

@puri-adityakumar
Copy link
Copy Markdown

Describe your changes

Adds a new Kestra pipeline connector to the ingestion framework, addressing one of the requested integrations in #26656.

The connector ingests flows, executions, task DAGs, and inter-pipeline lineage from a Kestra server (OSS or EE) via its REST API.

Key implementation details

  • Static DAG source. Tasks and edges come straight from Kestra's /flows/{namespace}/{id}/graph endpoint — no runtime-trace reconstruction.
  • Synthetic-node flattening. A BFS transitive-closure pass collapses Kestra's wrapper nodes (GraphClusterRoot / GraphClusterEnd / GraphTrigger) so the rendered OpenMetadata DAG only contains real, user-authored tasks.
  • Tenant-aware client. Auto-detects pre-tenant Kestra (< 0.18) vs tenant-scoped Kestra (>= 0.18) and rewrites paths accordingly.
  • Auth modes: noAuth (default), HTTP Basic, and Bearer token.
  • Inter-pipeline lineage derived from io.kestra.plugin.core.trigger.Flow triggers — emitted as EntitiesEdge with lineageDetails.source = PipelineLineage.
  • FQN handling. Pipeline names contain dots (Kestra namespace), so the FQN must be quoted (kestra_demo."hackathon.demo.cron_etl"). _pipeline_fqn() and the upstream-flow lookup in yield_pipeline_lineage_details() both honor this — caught and fixed during E2E (commit b480289).
  • Honors pipelineFilterPattern; skips disabled flows.

Files added/changed

  • ingestion/src/metadata/ingestion/source/pipeline/kestra/__init__.py, service_spec.py, models.py, client.py, connection.py, metadata.py
  • openmetadata-spec/.../pipeline/kestraConnection.json (new schema)
  • pipelineService.jsonKestra appended to enum, javaEnums, oneOf
  • ingestion/setup.py"kestra": set() plugin entry
  • Unit tests under ingestion/tests/unit/topology/pipeline/ + a captured graph fixture from Kestra v0.20.5

Type of change

  • New feature (non-breaking change which adds functionality)

How was this tested?

  • Unit tests: pytest tests/unit/topology/pipeline/test_kestra.py tests/unit/topology/pipeline/test_kestra_client.py14 / 14 pass (5 metadata + 9 client)
  • make generate regenerates KestraConnection.py cleanly from the new JSON schema
  • End-to-end against a real OpenMetadata 1.12 server (custom build from this branch, ES 9.3, Kestra v0.20.5):
    • Workflow Success: 100 % (Kestra 16/16, OpenMetadata 15/15)
    • 11 Pipeline entities created (3 hackathon flows + 8 tutorial flows)
    • Task DAGs round-trip (e.g. extract → transform → load for cron_etl)
    • cron_etl schedule round-trips (0 2 * * *)
    • 2 SUCCESS executions per flow visible via /api/v1/pipelines/{fqn}/status
    • Flow-trigger lineage edge persisted: cron_etl → downstream_consumer with description: "Kestra Flow trigger: on_cron_etl"
    • Evidence snapshots: demo/screenshots/api-01..04.json in the demo workspace

Out of scope (planned follow-ups)

  • Column-level / dataset lineage — Kestra OSS doesn't expose dataset I/O metadata; will follow up via OpenLineage facets once the Kestra-side plugin lands.
  • Custom TestConnectionDefinition — uses the default GetPipelines step for now; DB-migration adding a Kestra-specific row is deferred to a follow-up.
  • Connector docs under openmetadata-docs/.../connectors/pipeline/kestra/ — happy to add in a docs-only follow-up if maintainers prefer.
  • markDeletedPipelines interaction. End-to-end testing surfaced a pre-existing core bug: register_record builds the seen-set FQN via fqn.build() whose canonical form doesn't match the FQN that delete_entity_from_source queries against, so all pipelines get soft-deleted at the end of each run. Worked around in the demo config (markDeletedPipelines: false); upstream fix tracked separately. Not a blocker for this connector.

Checklist

  • I have performed a self-review of my code
  • I have added unit tests covering the new code paths
  • New and existing unit tests pass locally
  • Conventional-commit title (feat(ingestion): add Kestra pipeline connector)
  • Linked to tracking issue

Closes part of #26656.

Aditya Puri added 3 commits April 26, 2026 12:51
Adds a new pipeline-service connector for Kestra (kestra.io), closing
part of open-metadata#26656. Maps Kestra flows to OM Pipelines, executions to
PipelineStatus, and Flow-trigger relationships to inter-pipeline
lineage edges.

Highlights:
- Static DAG sourced from Kestra's /flows/{ns}/{id}/graph endpoint;
  transitive closure across synthetic GraphCluster nodes flattens
  flowable Parallel/Sequential/ForEach to clean OM tasks.
- Tenant-aware client supports both pre-tenant Kestra (< 0.18) and
  tenant-scoped (0.18+) APIs via configurable tenantId.
- Three auth modes: no-auth (default), basic, bearer token.
- Filter-pattern support; disabled flows skipped.

Tests: 14 unit tests (5 source + 9 client) against fixtures captured
from a live Kestra v0.20.5 instance.
… trigger access

Three review fixes on the connector:

1. CRITICAL: Pipeline FQN was registered as service.flowId (1-part) but
   _pipeline_fqn() builds service.namespace.flowId (3-part), so
   metadata.get_by_name silently returned None for any namespaced flow,
   suppressing all status and lineage emission. Pipeline names now
   include the Kestra namespace; lookups agree.

2. HIGH: KestraClient leaked its requests.Session on every ingestion
   run. Added close() and context-manager protocol.

3. HIGH: yield_pipeline_lineage_details and _schedule_from_triggers
   round-tripped trigger objects through model_dump(by_alias=True), then
   read them back as dicts. Replaced with direct typed-attribute access
   (trig.type, trig.cron, trig.conditions, trig.id).

Tests: 14/14 still pass; added an FQN regression assertion.
Pipeline names contain dots (Kestra namespace) and OM s FQN builder
wraps such names in double quotes when forming the FQN. Our
_pipeline_fqn() and the upstream-flow lookup in
yield_pipeline_lineage_details() were building unquoted FQNs, so the
metadata.get_by_name() calls during status + lineage emission missed
every namespaced pipeline.

Found during a real ingestion against OM 1.12.0-SNAPSHOT: server
stores kestra_demo."hackathon.demo.cron_etl" but the connector was
looking up kestra_demo.hackathon.demo.cron_etl.

Tests: existing 14/14 still green; live ingestion now emits with 100%
workflow success.
@puri-adityakumar puri-adityakumar requested a review from a team as a code owner April 26, 2026 18:18
@github-actions
Copy link
Copy Markdown
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

return int(dt.timestamp() * 1000)


class KestraSource(PipelineServiceSource):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: Missing close() override leaks HTTP session

KestraSource does not override close() to call self.client.close(). The KestraClient wraps a requests.Session which holds open TCP connections. Other pipeline connectors (Airflow, KafkaConnect, DatabricksPipeline) all override close() for cleanup. Over long-running ingestion workflows this leaks sockets.

Suggested fix:

def close(self) -> None:
    if hasattr(self, "client") and self.client:
        self.client.close()
    super().close()

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

page += 1

def get_flow(self, namespace: str, flow_id: str) -> KestraFlow:
data = self._get_json(f"/flows/{namespace}/{flow_id}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: Namespace not URL-encoded in API paths — dots break routing

In client.py, namespace values containing dots (e.g. hackathon.demo) are interpolated directly into URL paths like /flows/{namespace}/{flow_id}/graph. Kestra's REST API treats dots in the namespace literally, but if a namespace ever contains characters like / or %, the URL will be malformed. More importantly, some HTTP proxies or servers may interpret dots as path separators. The safer pattern used by other connectors is to URL-encode path segments via urllib.parse.quote(namespace, safe='').

Suggested fix:

from urllib.parse import quote

def get_flow(self, namespace: str, flow_id: str) -> KestraFlow:
    data = self._get_json(f"/flows/{quote(namespace, safe='')}/{quote(flow_id, safe='')}")
    return KestraFlow.model_validate(data)

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

Comment on lines +194 to +208
for summary in summaries:
try:
detail = self.client.get_execution(summary.id)
ts = _ms(detail.state.startDate or detail.state.endDate)
pipeline_status = PipelineStatus(
timestamp=Timestamp(ts) if ts else None,
executionStatus=_map_state(detail.state.current),
taskStatus=self._task_statuses(detail),
)
yield Either(
right=OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status
)
)
except Exception as exc: # noqa: BLE001
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: search_executions fetches per-execution detail — N+1 API calls

In yield_pipeline_status, each execution summary from the search endpoint triggers an additional get_execution(summary.id) call (line 196). With page_size=50, this means up to 50 HTTP round-trips per flow. The search response already includes taskRunList and state (as visible in the test fixture kestra_dataset.jsonexecutions.results[0]), so the detail call is redundant and can be eliminated by using the summary directly.

Suggested fix:

# In yield_pipeline_status, use summary directly:
for summary in summaries:
    try:
        ts = _ms(summary.state.startDate or summary.state.endDate)
        pipeline_status = PipelineStatus(
            timestamp=Timestamp(ts) if ts else None,
            executionStatus=_map_state(summary.state.current),
            taskStatus=self._task_statuses(summary),
        )
        yield Either(
            right=OMetaPipelineStatus(
                pipeline_fqn=pipeline_fqn,
                pipeline_status=pipeline_status,
            )
        )

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

Comment on lines +297 to +309
queue = list(adj.get(start_uid, []))
while queue:
uid = queue.pop(0)
if uid in seen:
continue
seen.add(uid)
tid = uid_to_taskid.get(uid)
if tid:
if tid != uid_to_taskid.get(start_uid) and tid not in out:
out.append(tid)
# don't traverse past a real task
continue
queue.extend(adj.get(uid, []))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: BFS uses list as queue — O(n²) for large graphs

In _tasks_from_graph, downstream_tasks uses queue.pop(0) on a Python list, which is O(n) per pop, yielding O(n²) overall. For large Kestra flows with many nodes this is suboptimal. Use collections.deque instead.

Suggested fix:

from collections import deque

def downstream_tasks(start_uid: str) -> List[str]:
    seen: set[str] = set()
    out: List[str] = []
    queue = deque(adj.get(start_uid, []))
    while queue:
        uid = queue.popleft()
        if uid in seen:
            continue
        seen.add(uid)
        tid = uid_to_taskid.get(uid)
        if tid:
            if tid != uid_to_taskid.get(start_uid) and tid not in out:
                out.append(tid)
            continue
        queue.extend(adj.get(uid, []))

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Apr 26, 2026

Code Review ⚠️ Changes requested 0 resolved / 4 findings

Adds Kestra pipeline connector, but the implementation lacks proper session resource management, fails to encode API namespaces, triggers N+1 API calls during status fetches, and uses an inefficient queue structure for graph traversal.

⚠️ Bug: Missing close() override leaks HTTP session

📄 ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py:92

KestraSource does not override close() to call self.client.close(). The KestraClient wraps a requests.Session which holds open TCP connections. Other pipeline connectors (Airflow, KafkaConnect, DatabricksPipeline) all override close() for cleanup. Over long-running ingestion workflows this leaks sockets.

Suggested fix
def close(self) -> None:
    if hasattr(self, "client") and self.client:
        self.client.close()
    super().close()
⚠️ Bug: Namespace not URL-encoded in API paths — dots break routing

📄 ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py:105 📄 ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py:109 📄 ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py:139

In client.py, namespace values containing dots (e.g. hackathon.demo) are interpolated directly into URL paths like /flows/{namespace}/{flow_id}/graph. Kestra's REST API treats dots in the namespace literally, but if a namespace ever contains characters like / or %, the URL will be malformed. More importantly, some HTTP proxies or servers may interpret dots as path separators. The safer pattern used by other connectors is to URL-encode path segments via urllib.parse.quote(namespace, safe='').

Suggested fix
from urllib.parse import quote

def get_flow(self, namespace: str, flow_id: str) -> KestraFlow:
    data = self._get_json(f"/flows/{quote(namespace, safe='')}/{quote(flow_id, safe='')}")
    return KestraFlow.model_validate(data)
⚠️ Edge Case: search_executions fetches per-execution detail — N+1 API calls

📄 ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py:194-208

In yield_pipeline_status, each execution summary from the search endpoint triggers an additional get_execution(summary.id) call (line 196). With page_size=50, this means up to 50 HTTP round-trips per flow. The search response already includes taskRunList and state (as visible in the test fixture kestra_dataset.jsonexecutions.results[0]), so the detail call is redundant and can be eliminated by using the summary directly.

Suggested fix
# In yield_pipeline_status, use summary directly:
for summary in summaries:
    try:
        ts = _ms(summary.state.startDate or summary.state.endDate)
        pipeline_status = PipelineStatus(
            timestamp=Timestamp(ts) if ts else None,
            executionStatus=_map_state(summary.state.current),
            taskStatus=self._task_statuses(summary),
        )
        yield Either(
            right=OMetaPipelineStatus(
                pipeline_fqn=pipeline_fqn,
                pipeline_status=pipeline_status,
            )
        )
💡 Quality: BFS uses list as queue — O(n²) for large graphs

📄 ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py:297-309

In _tasks_from_graph, downstream_tasks uses queue.pop(0) on a Python list, which is O(n) per pop, yielding O(n²) overall. For large Kestra flows with many nodes this is suboptimal. Use collections.deque instead.

Suggested fix
from collections import deque

def downstream_tasks(start_uid: str) -> List[str]:
    seen: set[str] = set()
    out: List[str] = []
    queue = deque(adj.get(start_uid, []))
    while queue:
        uid = queue.popleft()
        if uid in seen:
            continue
        seen.add(uid)
        tid = uid_to_taskid.get(uid)
        if tid:
            if tid != uid_to_taskid.get(start_uid) and tid not in out:
                out.append(tid)
            continue
        queue.extend(adj.get(uid, []))
🤖 Prompt for agents
Code Review: Adds Kestra pipeline connector, but the implementation lacks proper session resource management, fails to encode API namespaces, triggers N+1 API calls during status fetches, and uses an inefficient queue structure for graph traversal.

1. ⚠️ Bug: Missing close() override leaks HTTP session
   Files: ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py:92

   KestraSource does not override `close()` to call `self.client.close()`. The `KestraClient` wraps a `requests.Session` which holds open TCP connections. Other pipeline connectors (Airflow, KafkaConnect, DatabricksPipeline) all override `close()` for cleanup. Over long-running ingestion workflows this leaks sockets.

   Suggested fix:
   def close(self) -> None:
       if hasattr(self, "client") and self.client:
           self.client.close()
       super().close()

2. ⚠️ Bug: Namespace not URL-encoded in API paths — dots break routing
   Files: ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py:105, ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py:109, ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py:139

   In `client.py`, namespace values containing dots (e.g. `hackathon.demo`) are interpolated directly into URL paths like `/flows/{namespace}/{flow_id}/graph`. Kestra's REST API treats dots in the namespace literally, but if a namespace ever contains characters like `/` or `%`, the URL will be malformed. More importantly, some HTTP proxies or servers may interpret dots as path separators. The safer pattern used by other connectors is to URL-encode path segments via `urllib.parse.quote(namespace, safe='')`.

   Suggested fix:
   from urllib.parse import quote
   
   def get_flow(self, namespace: str, flow_id: str) -> KestraFlow:
       data = self._get_json(f"/flows/{quote(namespace, safe='')}/{quote(flow_id, safe='')}")
       return KestraFlow.model_validate(data)

3. ⚠️ Edge Case: search_executions fetches per-execution detail — N+1 API calls
   Files: ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py:194-208

   In `yield_pipeline_status`, each execution summary from the search endpoint triggers an additional `get_execution(summary.id)` call (line 196). With `page_size=50`, this means up to 50 HTTP round-trips per flow. The search response already includes `taskRunList` and `state` (as visible in the test fixture `kestra_dataset.json` → `executions.results[0]`), so the detail call is redundant and can be eliminated by using the summary directly.

   Suggested fix:
   # In yield_pipeline_status, use summary directly:
   for summary in summaries:
       try:
           ts = _ms(summary.state.startDate or summary.state.endDate)
           pipeline_status = PipelineStatus(
               timestamp=Timestamp(ts) if ts else None,
               executionStatus=_map_state(summary.state.current),
               taskStatus=self._task_statuses(summary),
           )
           yield Either(
               right=OMetaPipelineStatus(
                   pipeline_fqn=pipeline_fqn,
                   pipeline_status=pipeline_status,
               )
           )

4. 💡 Quality: BFS uses list as queue — O(n²) for large graphs
   Files: ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py:297-309

   In `_tasks_from_graph`, `downstream_tasks` uses `queue.pop(0)` on a Python list, which is O(n) per pop, yielding O(n²) overall. For large Kestra flows with many nodes this is suboptimal. Use `collections.deque` instead.

   Suggested fix:
   from collections import deque
   
   def downstream_tasks(start_uid: str) -> List[str]:
       seen: set[str] = set()
       out: List[str] = []
       queue = deque(adj.get(start_uid, []))
       while queue:
           uid = queue.popleft()
           if uid in seen:
               continue
           seen.add(uid)
           tid = uid_to_taskid.get(uid)
           if tid:
               if tid != uid_to_taskid.get(start_uid) and tid not in out:
                   out.append(tid)
               continue
           queue.extend(adj.get(uid, []))

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant