Skip to content
5 changes: 5 additions & 0 deletions .changeset/frank-canyons-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/deploy': minor
---

Add support for webhook_response_config
5 changes: 5 additions & 0 deletions .changeset/salty-areas-juggle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/lexicon': minor
---

Support `cron_cursor_job_id`, `webhook_reply` and `webhook_response_config` in Provisioner types
5 changes: 5 additions & 0 deletions .changeset/shy-needles-attack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/cli': minor
---

Support webhook responses in sync & deploy
5 changes: 5 additions & 0 deletions .changeset/wide-seals-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openfn/project': patch
---

Support more trigger keys
14 changes: 14 additions & 0 deletions packages/deploy/src/stateTransform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ function mergeTriggers(
if (specTrigger.type === 'webhook' && specTrigger.webhook_reply) {
trigger.webhook_reply = specTrigger.webhook_reply;
}
if (
specTrigger.type === 'webhook' &&
specTrigger.webhook_response_config
) {
trigger.webhook_response_config =
specTrigger.webhook_response_config;
}

if (specTrigger.type === 'cron') {
trigger.cron_expression = specTrigger.cron_expression;
Expand Down Expand Up @@ -202,6 +209,13 @@ function mergeTriggers(
if (specTrigger!.type === 'webhook' && specTrigger!.webhook_reply) {
trigger.webhook_reply = specTrigger!.webhook_reply;
}
if (
specTrigger!.type === 'webhook' &&
specTrigger!.webhook_response_config
) {
trigger.webhook_response_config =
specTrigger!.webhook_response_config;
}

if (specTrigger!.type === 'cron') {
trigger.cron_expression = specTrigger!.cron_expression;
Expand Down
7 changes: 7 additions & 0 deletions packages/deploy/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,19 @@ export type SpecKafkaConfiguration = {
connect_timeout: number;
};

export type WebhookResponseConfig = {
error_code: number | null;
success_code: number | null;
};

export type WebhookReply = 'before_start' | 'after_completion';

export type SpecTrigger = {
type: string;
cron_expression?: string;
cron_cursor_job?: string;
webhook_reply?: WebhookReply;
webhook_response_config?: WebhookResponseConfig | null;
enabled?: boolean;
kafka_configuration?: SpecKafkaConfiguration;
};
Expand All @@ -55,6 +61,7 @@ export type StateTrigger = {
cron_expression?: string;
cron_cursor_job_id?: string | null;
webhook_reply?: WebhookReply;
webhook_response_config?: WebhookResponseConfig | null;
delete?: boolean;
enabled?: boolean;
kafka_configuration?: StateKafkaConfiguration;
Expand Down
137 changes: 134 additions & 3 deletions packages/deploy/test/stateTransform.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,125 @@ test('toNextState omits webhook_reply on existing trigger when not specified', (
t.false('webhook_reply' in result.workflows.w.triggers.t);
});

test('toNextState sets webhook_response_config when specified', (t) => {
const state = { workflows: {} };
const spec = {
name: 'my project',
workflows: {
w: {
name: 'workflow',
jobs: {},
triggers: {
t: {
type: 'webhook',
webhook_response_config: { success_code: 200, error_code: 400 },
},
},
edges: {},
},
},
};

const result = mergeSpecIntoState(state, spec);
t.deepEqual(result.workflows.w.triggers.t.webhook_response_config, {
success_code: 200,
error_code: 400,
});
});

test('toNextState omits webhook_response_config when not specified', (t) => {
const state = { workflows: {} };
const spec = {
name: 'my project',
workflows: {
w: {
name: 'workflow',
jobs: {},
triggers: {
t: { type: 'webhook' },
},
edges: {},
},
},
};

const result = mergeSpecIntoState(state, spec);
t.false('webhook_response_config' in result.workflows.w.triggers.t);
});

test('toNextState sets webhook_response_config on existing trigger', (t) => {
const triggerId = 'aaa-bbb-ccc';
const state = {
workflows: {
w: {
id: 'wf-1',
name: 'workflow',
jobs: {},
triggers: {
t: { id: triggerId, type: 'webhook', enabled: true },
},
edges: {},
},
},
};
const spec = {
name: 'my project',
workflows: {
w: {
name: 'workflow',
jobs: {},
triggers: {
t: {
type: 'webhook',
webhook_response_config: { success_code: 201, error_code: 500 },
},
},
edges: {},
},
},
};

const result = mergeSpecIntoState(state, spec);
t.is(result.workflows.w.triggers.t.id, triggerId);
t.deepEqual(result.workflows.w.triggers.t.webhook_response_config, {
success_code: 201,
error_code: 500,
});
});

test('toNextState omits webhook_response_config on existing trigger when not specified', (t) => {
const triggerId = 'aaa-bbb-ccc';
const state = {
workflows: {
w: {
id: 'wf-1',
name: 'workflow',
jobs: {},
triggers: {
t: { id: triggerId, type: 'webhook', enabled: true },
},
edges: {},
},
},
};
const spec = {
name: 'my project',
workflows: {
w: {
name: 'workflow',
jobs: {},
triggers: {
t: { type: 'webhook' },
},
edges: {},
},
},
};

const result = mergeSpecIntoState(state, spec);
t.false('webhook_response_config' in result.workflows.w.triggers.t);
});

test('toNextState sets cron_cursor_job_id on existing trigger', (t) => {
const triggerId = 'aaa-bbb-ccc';
const jobId = 'job-uuid-111';
Expand All @@ -663,10 +782,18 @@ test('toNextState sets cron_cursor_job_id on existing trigger', (t) => {
w: {
name: 'workflow',
jobs: {
'job-a': { name: 'job a', adaptor: '@openfn/language-http', body: 'fn()' },
'job-a': {
name: 'job a',
adaptor: '@openfn/language-http',
body: 'fn()',
},
},
triggers: {
t: { type: 'cron', cron_expression: '0 * * * *', cron_cursor_job: 'job-a' },
t: {
type: 'cron',
cron_expression: '0 * * * *',
cron_cursor_job: 'job-a',
},
},
edges: {},
},
Expand Down Expand Up @@ -702,7 +829,11 @@ test('toNextState omits cron_cursor_job_id on existing trigger when not specifie
w: {
name: 'workflow',
jobs: {
'job-a': { name: 'job a', adaptor: '@openfn/language-http', body: 'fn()' },
'job-a': {
name: 'job a',
adaptor: '@openfn/language-http',
body: 'fn()',
},
},
triggers: {
t: { type: 'cron', cron_expression: '0 * * * *' },
Expand Down
8 changes: 8 additions & 0 deletions packages/lexicon/lightning.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,18 @@ export namespace Provisioner {
delete?: boolean;
};

export type WebhookResponseConfig = {
error_code: number | null;
success_code: number | null;
};

export type Trigger = {
id: string;
type: string;
cron_expression?: string;
cron_cursor_job_id?: string | null;
webhook_reply?: 'before_start' | 'after_completion';
webhook_response_config?: WebhookResponseConfig | null;
delete?: boolean;
enabled?: boolean;
kafka_configuration?: KafkaConfiguration;
Expand Down
6 changes: 5 additions & 1 deletion packages/lexicon/portability.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ export interface Trigger extends Step {

enabled?: boolean;

webhook_reply?: string;
webhook_reply?: 'before_start' | 'after_completion';
webhook_response_config?: {
error_code?: number;
success_code?: number;
};
cron_cursor_job_id?: string;

/** Allow arbitrary properties on trigger nodes (as configuration options) */
Expand Down
51 changes: 33 additions & 18 deletions packages/project/src/parse/from-app-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Provisioner } from '@openfn/lexicon/lightning';
import { Project } from '../Project';
import renameKeys from '../util/rename-keys';
import slugify from '../util/slugify';
import omitNil from '../util/omit-nil';
import ensureJson from '../util/ensure-json';
import getCredentialName from '../util/get-credential-name';

Expand Down Expand Up @@ -120,31 +121,45 @@ export const mapWorkflow = (
// TODO what do we do if the condition is disabled?
// I don't think that's the same as edge condition false?
Object.values(workflow.triggers).forEach((trigger: Provisioner.Trigger) => {
const { type, enabled, ...otherProps } = trigger;
const {
type,
enabled,
cron_expression,
cron_cursor_job_id,
webhook_reply,
webhook_response_config,
...otherProps
} = trigger;
if (!mapped.start) {
mapped.start = type;
}

const connectedEdges = Object.values(edges).filter(
(e) => e.source_trigger_id === trigger.id
);
mapped.steps.push({
id: type,
type,
enabled,
openfn: renameKeys(otherProps, { id: 'uuid' }),
next: connectedEdges.reduce((obj: any, edge) => {
const target = Object.values(jobs).find(
(j) => j.id === edge.target_job_id
);
if (!target) {
throw new Error(`Failed to find ${edge.target_job_id}`);
}
// we use the name, not the id, to reference
obj[slugify(target.name)] = mapEdge(edge);
return obj;
}, {}),
} as l.Trigger);
mapped.steps.push(
omitNil({
id: type,
type,
enabled,
cron_expression,
cron_cursor_job_id,
webhook_reply,
webhook_response_config,
openfn: renameKeys(otherProps, { id: 'uuid' }),
next: connectedEdges.reduce((obj: any, edge) => {
const target = Object.values(jobs).find(
(j) => j.id === edge.target_job_id
);
if (!target) {
throw new Error(`Failed to find ${edge.target_job_id}`);
}
// we use the name, not the id, to reference
obj[slugify(target.name)] = mapEdge(edge);
return obj;
}, {}),
}) as l.Trigger
);
});

Object.values(workflow.jobs).forEach((step: Provisioner.Job) => {
Expand Down
6 changes: 4 additions & 2 deletions packages/project/src/serialize/to-app-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ export const mapWorkflow = (

if (s.type) {
isTrigger = true;

const { type, id, next, openfn, ...rest } = s;
node = {
...rest,
type: s.type ?? 'webhook', // this is mostly for tests
enabled: s.enabled,
...renameKeys(s.openfn, { uuid: 'id' }),
...renameKeys(openfn, { uuid: 'id' }),
} as Provisioner.Trigger;
wfState.triggers[node.type] = node;
} else {
Expand Down
4 changes: 2 additions & 2 deletions packages/project/src/util/version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ export const generateHash = (
// this means we can match keys with lightning
// and everything gets cleaner
const wfState = mapWorkflow(workflow);

// These are the keys we hash against
const wfKeys = ['name', 'positions'].sort();

Expand All @@ -56,9 +55,10 @@ export const generateHash = (

const triggerKeys = [
'type',
'cron_expression',
'enabled',
'cron_expression',
'webhook_reply',
'webhook_response_config',
'cron_cursor_job_id',
].sort();

Expand Down
Loading