feat(mcp): add tools to run, monitor, and alert on ingestion pipelines#27743
feat(mcp): add tools to run, monitor, and alert on ingestion pipelines#27743puri-adityakumar wants to merge 2 commits intoopen-metadata:mainfrom
Conversation
…tools Adds three new MCP tools to the OM MCP server, closing part of open-metadata#26609: run_ingestion triggers an ingestion pipeline by FQN get_ingestion_status returns the most recent N pipeline runs create_alert registers an EventSubscription that posts to a webhook on ingestionPipeline failure Each is implemented as a McpTool, registered in DefaultToolContext.callTool and declared in tools.json so Claude Desktop can discover them. No new REST endpoints; no auth/authz changes; no LLM calls inside tools. run_ingestion currently reads the @Setter-only pipelineServiceClient field on IngestionPipelineRepository via reflection -- a small isolated workaround pending a follow-up PR that exposes runIngestion() publicly. create_alert is opinionated for v1: resourceType=ingestionPipeline only, eventType=pipelineFailed only, destination=webhook only. Extending to multi-event/multi-destination is a follow-up. Tests: 10 JUnit tests across the three tools.
EventSubscriptionMapper.createToEntity passes every destination through Fernet.encryptWebhookSecretKey; an empty string was being encrypted in place of being left absent, which silently broke any future webhook signature verification at delivery time. Pass null instead so the encryption step skips the field. Tests: 10/10 still pass.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
| String resourceFqn = requireString(params, "resourceFqn"); | ||
| if (resourceFqn == null) { | ||
| return errorMap("resourceFqn is required"); |
There was a problem hiding this comment.
🚨 Bug: resourceFqn is validated but never used to scope the alert
The resourceFqn parameter is validated (line 77-79) and echoed back in the response (line 117), but it is never passed to buildRequest() or otherwise used to configure the EventSubscription filter. The resulting alert fires for all ingestionPipeline failures rather than just the specific pipeline the user requested. This silently violates the API contract described in the JSON schema ("FQN of the specific resource instance to watch") and will flood the webhook with unrelated failure notifications.
The buildRequest method needs to accept resourceFqn and configure an appropriate filter rule (e.g., via FilteringRules or equivalent) on the CreateEventSubscription to restrict the alert to the named pipeline.
Suggested fix:
Pass `resourceFqn` into `buildRequest` and configure a
filtering rule that restricts the subscription to the
specific pipeline entity, e.g.:
FilteringRules rules = new FilteringRules();
rules.setResources(List.of(resourceFqn));
r.setInput(rules);
(Exact API depends on EventSubscription schema.)
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| public Map<String, Object> execute( | ||
| Authorizer authorizer, | ||
| Limits limits, | ||
| CatalogSecurityContext securityContext, | ||
| Map<String, Object> params) | ||
| throws IOException { | ||
| final String fqn = requireString(params, PARAM_FQN); | ||
| if (fqn == null) { | ||
| return errorMap(PARAM_FQN + " is required"); | ||
| } | ||
|
|
||
| authorizer.authorize( | ||
| securityContext, new OperationContext(RESOURCE, EDIT_ALL), new ResourceContext<>(RESOURCE)); | ||
|
|
||
| IngestionPipeline pipeline = |
There was a problem hiding this comment.
⚠️ Bug: RunIngestionTool accepts Limits but never enforces them
The execute(authorizer, limits, securityContext, params) overload in RunIngestionTool receives a Limits parameter (to be consistent with the DefaultToolContext.callTool dispatch) but never calls limits.enforceLimits(...). Compare with CreateAlertTool (line 102) which explicitly enforces limits before proceeding. If the deployment has rate/quota limits configured for ingestion triggers, they will be silently bypassed via this MCP tool.
Suggested fix:
Add limits enforcement before the authorize call:
OperationContext opCtx = new OperationContext(RESOURCE, EDIT_ALL);
limits.enforceLimits(securityContext, resourceContext, opCtx);
authorizer.authorize(securityContext, opCtx, resourceContext);
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| authorizer.authorize( | ||
| securityContext, new OperationContext(RESOURCE, EDIT_ALL), new ResourceContext<>(RESOURCE)); |
There was a problem hiding this comment.
⚠️ Security: Authorization checks resource type, not specific entity instance
RunIngestionTool (line 68-69) authorizes with new ResourceContext<>(RESOURCE), which checks EDIT_ALL against the resource type (ingestionPipeline) rather than the specific pipeline entity being triggered. The actual IngestionPipelineResource uses getResourceContextById(pipeline.getId()) for entity-level authorization. This means a user who is allowed to edit some pipelines can trigger any pipeline through this MCP tool, bypassing entity-level RBAC policies.
The same pattern applies to GetIngestionStatusTool (line 54-55).
Suggested fix:
Use entity-level ResourceContext after resolving the pipeline:
ResourceContext<?> resourceContext =
new ResourceContext<>(RESOURCE, pipeline.getId(), null);
authorizer.authorize(
securityContext, operationContext, resourceContext);
Note: move the authorize call after the entity lookup.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| if (!SUPPORTED_EVENT_TYPE.equals(eventType)) { | ||
| return errorMap("v1 supports eventType=" + SUPPORTED_EVENT_TYPE + " only"); | ||
| } |
There was a problem hiding this comment.
⚠️ Bug: CreateAlertTool does not set eventType filter on subscription
Similar to the resourceFqn issue, the eventType parameter ("pipelineFailed") is validated (line 83-85) and echoed in the response (line 118), but buildRequest() never configures a filter for the specific event type. The alert will trigger on all events for the ingestionPipeline resource (creation, update, deletion, etc.), not just pipelineFailed as the user requested. This makes the tool's described behavior ("alert when pipeline fails") incorrect.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| private static PipelineServiceClientInterface readPipelineServiceClient( | ||
| IngestionPipelineRepository repo) { | ||
| try { | ||
| Field field = IngestionPipelineRepository.class.getDeclaredField("pipelineServiceClient"); | ||
| field.setAccessible(true); | ||
| return (PipelineServiceClientInterface) field.get(repo); | ||
| } catch (ReflectiveOperationException exc) { | ||
| LOG.warn( | ||
| "Could not access IngestionPipelineRepository.pipelineServiceClient: {}", | ||
| exc.getMessage()); | ||
| return null; | ||
| } | ||
| } |
There was a problem hiding this comment.
💡 Quality: Reflection to access private field is fragile
RunIngestionTool.readPipelineServiceClient uses reflection to access IngestionPipelineRepository.pipelineServiceClient (line 116-128). If the field is renamed or its access is changed, this breaks silently at runtime (returning null and reporting a misleading "not configured" error). The PR description acknowledges this as a known issue. Consider adding a public accessor to the repository in this PR rather than deferring it, since the workaround introduces a runtime fragility with a misleading error message.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review 🚫 Blocked 0 resolved / 5 findingsAdds ingestion and alert tools, but fails to apply resource scoping, ingestion limits, entity-level authorization, and event filtering, resulting in security and functional gaps. 🚨 Bug:
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
Describe your changes
Part of #26609 (New MCP Tools epic).
Adds three new MCP tools to
openmetadata-mcpso AI assistants (e.g. Claude Desktop) can drive day-2 ops on OpenMetadata via the existing MCP server:run_ingestion— triggers an ingestion pipeline by FQN. WrapsIngestionPipelineResource.triggerIngestion.get_ingestion_status— read-only; returns recentPipelineStatusrows viaIngestionPipelineRepository.listPipelineStatus.create_alert— creates anEventSubscriptionforpipelineFailedoningestionPipelinewith a webhook destination. Mirrors the existingGlossaryToolpattern (mapper +repo.createOrUpdate).Each tool implements the existing
McpToolinterface and is dispatched fromDefaultToolContext.callTool(). JSON Schemas for inputs are added toopenmetadata-mcp/src/main/resources/json/data/mcp/tools.json.No new REST endpoints, no auth/authz changes. Authorization flows through the existing
OperationContext+Authorizer.authorize()(EDIT_ALL / VIEW_ALL / CREATE). Tools are deterministic REST wrappers — no LLM calls server-side; the LLM lives in the MCP client.Files added/changed
openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/RunIngestionTool.java(new)openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetIngestionStatusTool.java(new)openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CreateAlertTool.java(new)openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java— 3 new switch arms incallTool()openmetadata-mcp/src/main/resources/json/data/mcp/tools.json— 3 new tool entriesopenmetadata-mcp/src/test/java/org/openmetadata/mcp/tools/— 3 new test classesKnown issues / follow-ups
RunIngestionTooluses reflection to read the privatepipelineServiceClientfield onIngestionPipelineRepository(no public getter today). Flagged in javadoc. A clean follow-up is to add a publicrunIngestion(...)on the repository and drop the reflection.CreateAlertToolinitially passedsecretKey=""for the webhook destination; switched tonullafter observing Fernet-encryption behavior on empty strings. Unit-tested both paths.v1 scope for
create_alert:resourceType=ingestionPipeline,eventType=pipelineFailed, webhook destination only. Slack / email / additional event types are intentionally out of scope (separate PRs under #26609).Type of change
How was this tested?
mvn -pl openmetadata-mcp test -Dtest="RunIngestionToolTest,GetIngestionStatusToolTest,CreateAlertToolTest"→ 10 / 10 pass (2 + 3 + 5)mvn -pl openmetadata-mcp spotless:apply— cleanDefaultToolContext.javaedittools/listresponse and are dispatchable from a Claude Desktop clientChecklist
RunIngestionToolis documented in javadoc)tools.jsonis the MCP tool registry, not a stored entity schema — no migration neededCloses part of #26609.