Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/lexicon/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# lexicon

## 2.1.0

### Minor Changes

- e453f6a: Support webhook_response in step:complete event when data available in results

## 2.0.0

### Major Changes
Expand Down
6 changes: 6 additions & 0 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ export type StepStartPayload = {
};
export type StepStartReply = void;

export type WebhookResponse = {
status?: number;
body?: Record<string, any> | any[];
};

export type StepCompletePayload = ExitReason & {
run_id?: string;
job_id: string;
Expand All @@ -223,6 +228,7 @@ export type StepCompletePayload = ExitReason & {
};
duration: number;
timestamp: TimeInMicroSeconds;
webhook_response?: WebhookResponse;
};
export type StepCompleteReply = void;

Expand Down
2 changes: 1 addition & 1 deletion packages/lexicon/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lexicon",
"version": "2.0.0",
"version": "2.1.0",
"description": "Central repo of names and type definitions",
"author": "Open Function Group <admin@openfn.org>",
"license": "ISC",
Expand Down
11 changes: 11 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# ws-worker

## 1.25.0

### Minor Changes

- e453f6a: Support webhook_response in step:complete event when data available in results

### Patch Changes

- Updated dependencies [e453f6a]
- @openfn/lexicon@2.1.0

## 1.24.2

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.24.2",
"version": "1.25.0",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
8 changes: 8 additions & 0 deletions packages/ws-worker/src/events/step-complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ export default async function onStepComplete(
timestamp: timeInMicroseconds(event.time),
} as StepCompletePayload;

// Feed through the webhook response if it's on state
// We do this on the event so that Lightning
// doesn't have the parse the dataclip
// (which may not be sent in zero persistence mode!)
if (outputState.webhookResponse) {
evt.webhook_response = outputState.webhookResponse;
}

if (event.redacted) {
state.withheldDataclips[dataclipId] = true;
evt.output_dataclip_error = 'DATACLIP_TOO_LARGE';
Expand Down
256 changes: 256 additions & 0 deletions packages/ws-worker/test/events/step-complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,262 @@ test('accumulate multiple leaf dataclips for branching workflow', async (t) => {
t.not(state.leafDataclipIds[0], state.leafDataclipIds[1]);
});

test('includes webhook_response in step:complete payload when webhookResponse is set on state', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { x: 10, webhookResponse: { status: 201, body: { ok: true } } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(lightningEvent.webhook_response, {
status: 201,
body: { ok: true },
});
});

test('includes webhook_response in step:complete payload when webhookResponse is set on state (status only)', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { x: 10, webhookResponse: { status: 201 } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(lightningEvent.webhook_response, {
status: 201,
});
});

test('includes webhook_response in step:complete payload when webhookResponse is set on state (body only)', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { x: 10, webhookResponse: { body: { ok: true } } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(lightningEvent.webhook_response, {
body: { ok: true },
});
});

test('includes webhook_response in step:complete payload when webhookResponse is set on state (no keys)', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { x: 10, webhookResponse: {} },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(lightningEvent.webhook_response, {});
});

test('omits webhook_response from payload when webhookResponse is not set on state', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = { state: { x: 10 } } as any;
await handleStepComplete({ channel, state } as any, event);

t.is(lightningEvent.webhook_response, undefined);
});

test('webhookResponse is included in dataclip', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

const channel = mockChannel({
[STEP_COMPLETE]: () => true,
});

const event = {
state: { x: 10, webhookResponse: { status: 201, body: {} } },
} as any;
await handleStepComplete({ channel, state } as any, event);

const [dataclip] = Object.values(state.dataclips);
t.deepEqual(dataclip, { x: 10, webhookResponse: { status: 201, body: {} } });
});

test('webhookResponse included in the serialized output_dataclip sent to Lightning', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { x: 10, webhookResponse: { status: 201, body: {} } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(JSON.parse(lightningEvent.output_dataclip), {
x: 10,
webhookResponse: { status: 201, body: {} },
});
});

test('handles webhookResponse with only a body', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { webhookResponse: { body: { message: 'hello' } } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(lightningEvent.webhook_response, { body: { message: 'hello' } });
});

test('handles webhookResponse body as a JSON array', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

const event = {
state: { webhookResponse: { status: 200, body: [{ id: 1 }, { id: 2 }] } },
} as any;
await handleStepComplete({ channel, state } as any, event);

t.deepEqual(lightningEvent.webhook_response, {
status: 200,
body: [{ id: 1 }, { id: 2 }],
});
});

test('does nothing with webhookResponse when event.state is empty', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

await handleStepComplete({ channel, state } as any, { state: {} } as any);
t.is(lightningEvent.webhook_response, undefined);
});

test('does nothing with webhookResponse when event.state is undefined', async (t) => {
const plan = createPlan();
const jobId = 'job-1';

const state = createRunState(plan);
state.activeJob = jobId;
state.activeStep = 'b';

let lightningEvent: any;
const channel = mockChannel({
[STEP_COMPLETE]: (evt) => {
lightningEvent = evt;
},
});

await t.notThrowsAsync(() =>
handleStepComplete({ channel, state } as any, {} as any)
);
t.is(lightningEvent.webhook_response, undefined);
});

// Single leaf reached by two paths: start → a → x, start → b → x
// x executes twice, both times with no downstream
test('accumulate two leaf dataclips when same node reached by two paths', async (t) => {
Expand Down