diff --git a/.changeset/frank-canyons-report.md b/.changeset/frank-canyons-report.md new file mode 100644 index 000000000..bd6da2ea7 --- /dev/null +++ b/.changeset/frank-canyons-report.md @@ -0,0 +1,5 @@ +--- +'@openfn/deploy': minor +--- + +Add support for webhook_response_config diff --git a/.changeset/salty-areas-juggle.md b/.changeset/salty-areas-juggle.md new file mode 100644 index 000000000..b954927d9 --- /dev/null +++ b/.changeset/salty-areas-juggle.md @@ -0,0 +1,5 @@ +--- +'@openfn/lexicon': minor +--- + +Support `cron_cursor_job_id`, `webhook_reply` and `webhook_response_config` in Provisioner types diff --git a/.changeset/shy-needles-attack.md b/.changeset/shy-needles-attack.md new file mode 100644 index 000000000..703e0b7f3 --- /dev/null +++ b/.changeset/shy-needles-attack.md @@ -0,0 +1,5 @@ +--- +'@openfn/cli': minor +--- + +Support webhook responses in sync & deploy diff --git a/.changeset/wide-seals-tease.md b/.changeset/wide-seals-tease.md new file mode 100644 index 000000000..3d7dc5b75 --- /dev/null +++ b/.changeset/wide-seals-tease.md @@ -0,0 +1,5 @@ +--- +'@openfn/project': patch +--- + +Support more trigger keys diff --git a/packages/deploy/src/stateTransform.ts b/packages/deploy/src/stateTransform.ts index 474b8a148..0cf633080 100644 --- a/packages/deploy/src/stateTransform.ts +++ b/packages/deploy/src/stateTransform.ts @@ -164,6 +164,13 @@ function mergeTriggers( if (specTrigger.type === 'webhook' && specTrigger.webhook_reply) { trigger.webhook_reply = specTrigger.webhook_reply; } + if ( + specTrigger.type === 'webhook' && + specTrigger.webhook_response_config + ) { + trigger.webhook_response_config = + specTrigger.webhook_response_config; + } if (specTrigger.type === 'cron') { trigger.cron_expression = specTrigger.cron_expression; @@ -202,6 +209,13 @@ function mergeTriggers( if (specTrigger!.type === 'webhook' && specTrigger!.webhook_reply) { trigger.webhook_reply = specTrigger!.webhook_reply; } + if ( + specTrigger!.type === 'webhook' && + specTrigger!.webhook_response_config + ) { + trigger.webhook_response_config = + specTrigger!.webhook_response_config; + } if (specTrigger!.type === 'cron') { trigger.cron_expression = specTrigger!.cron_expression; diff --git a/packages/deploy/src/types.ts b/packages/deploy/src/types.ts index 2cba41176..1857e6428 100644 --- a/packages/deploy/src/types.ts +++ b/packages/deploy/src/types.ts @@ -38,6 +38,11 @@ export type SpecKafkaConfiguration = { connect_timeout: number; }; +export type WebhookResponseConfig = { + error_code: number | null; + success_code: number | null; +}; + export type WebhookReply = 'before_start' | 'after_completion'; export type SpecTrigger = { @@ -45,6 +50,7 @@ export type SpecTrigger = { cron_expression?: string; cron_cursor_job?: string; webhook_reply?: WebhookReply; + webhook_response_config?: WebhookResponseConfig | null; enabled?: boolean; kafka_configuration?: SpecKafkaConfiguration; }; @@ -55,6 +61,7 @@ export type StateTrigger = { cron_expression?: string; cron_cursor_job_id?: string | null; webhook_reply?: WebhookReply; + webhook_response_config?: WebhookResponseConfig | null; delete?: boolean; enabled?: boolean; kafka_configuration?: StateKafkaConfiguration; diff --git a/packages/deploy/test/stateTransform.test.ts b/packages/deploy/test/stateTransform.test.ts index 4d6b91e4e..997dc9c6e 100644 --- a/packages/deploy/test/stateTransform.test.ts +++ b/packages/deploy/test/stateTransform.test.ts @@ -639,6 +639,125 @@ test('toNextState omits webhook_reply on existing trigger when not specified', ( t.false('webhook_reply' in result.workflows.w.triggers.t); }); +test('toNextState sets webhook_response_config when specified', (t) => { + const state = { workflows: {} }; + const spec = { + name: 'my project', + workflows: { + w: { + name: 'workflow', + jobs: {}, + triggers: { + t: { + type: 'webhook', + webhook_response_config: { success_code: 200, error_code: 400 }, + }, + }, + edges: {}, + }, + }, + }; + + const result = mergeSpecIntoState(state, spec); + t.deepEqual(result.workflows.w.triggers.t.webhook_response_config, { + success_code: 200, + error_code: 400, + }); +}); + +test('toNextState omits webhook_response_config when not specified', (t) => { + const state = { workflows: {} }; + const spec = { + name: 'my project', + workflows: { + w: { + name: 'workflow', + jobs: {}, + triggers: { + t: { type: 'webhook' }, + }, + edges: {}, + }, + }, + }; + + const result = mergeSpecIntoState(state, spec); + t.false('webhook_response_config' in result.workflows.w.triggers.t); +}); + +test('toNextState sets webhook_response_config on existing trigger', (t) => { + const triggerId = 'aaa-bbb-ccc'; + const state = { + workflows: { + w: { + id: 'wf-1', + name: 'workflow', + jobs: {}, + triggers: { + t: { id: triggerId, type: 'webhook', enabled: true }, + }, + edges: {}, + }, + }, + }; + const spec = { + name: 'my project', + workflows: { + w: { + name: 'workflow', + jobs: {}, + triggers: { + t: { + type: 'webhook', + webhook_response_config: { success_code: 201, error_code: 500 }, + }, + }, + edges: {}, + }, + }, + }; + + const result = mergeSpecIntoState(state, spec); + t.is(result.workflows.w.triggers.t.id, triggerId); + t.deepEqual(result.workflows.w.triggers.t.webhook_response_config, { + success_code: 201, + error_code: 500, + }); +}); + +test('toNextState omits webhook_response_config on existing trigger when not specified', (t) => { + const triggerId = 'aaa-bbb-ccc'; + const state = { + workflows: { + w: { + id: 'wf-1', + name: 'workflow', + jobs: {}, + triggers: { + t: { id: triggerId, type: 'webhook', enabled: true }, + }, + edges: {}, + }, + }, + }; + const spec = { + name: 'my project', + workflows: { + w: { + name: 'workflow', + jobs: {}, + triggers: { + t: { type: 'webhook' }, + }, + edges: {}, + }, + }, + }; + + const result = mergeSpecIntoState(state, spec); + t.false('webhook_response_config' in result.workflows.w.triggers.t); +}); + test('toNextState sets cron_cursor_job_id on existing trigger', (t) => { const triggerId = 'aaa-bbb-ccc'; const jobId = 'job-uuid-111'; @@ -663,10 +782,18 @@ test('toNextState sets cron_cursor_job_id on existing trigger', (t) => { w: { name: 'workflow', jobs: { - 'job-a': { name: 'job a', adaptor: '@openfn/language-http', body: 'fn()' }, + 'job-a': { + name: 'job a', + adaptor: '@openfn/language-http', + body: 'fn()', + }, }, triggers: { - t: { type: 'cron', cron_expression: '0 * * * *', cron_cursor_job: 'job-a' }, + t: { + type: 'cron', + cron_expression: '0 * * * *', + cron_cursor_job: 'job-a', + }, }, edges: {}, }, @@ -702,7 +829,11 @@ test('toNextState omits cron_cursor_job_id on existing trigger when not specifie w: { name: 'workflow', jobs: { - 'job-a': { name: 'job a', adaptor: '@openfn/language-http', body: 'fn()' }, + 'job-a': { + name: 'job a', + adaptor: '@openfn/language-http', + body: 'fn()', + }, }, triggers: { t: { type: 'cron', cron_expression: '0 * * * *' }, diff --git a/packages/lexicon/lightning.d.ts b/packages/lexicon/lightning.d.ts index bab1bdc9a..a57045ad7 100644 --- a/packages/lexicon/lightning.d.ts +++ b/packages/lexicon/lightning.d.ts @@ -309,10 +309,18 @@ export namespace Provisioner { delete?: boolean; }; + export type WebhookResponseConfig = { + error_code: number | null; + success_code: number | null; + }; + export type Trigger = { id: string; type: string; cron_expression?: string; + cron_cursor_job_id?: string | null; + webhook_reply?: 'before_start' | 'after_completion'; + webhook_response_config?: WebhookResponseConfig | null; delete?: boolean; enabled?: boolean; kafka_configuration?: KafkaConfiguration; diff --git a/packages/lexicon/portability.d.ts b/packages/lexicon/portability.d.ts index 6193ec2df..1ab556a2c 100644 --- a/packages/lexicon/portability.d.ts +++ b/packages/lexicon/portability.d.ts @@ -78,7 +78,11 @@ export interface Trigger extends Step { enabled?: boolean; - webhook_reply?: string; + webhook_reply?: 'before_start' | 'after_completion'; + webhook_response_config?: { + error_code?: number; + success_code?: number; + }; cron_cursor_job_id?: string; /** Allow arbitrary properties on trigger nodes (as configuration options) */ diff --git a/packages/project/src/parse/from-app-state.ts b/packages/project/src/parse/from-app-state.ts index b6ed9324c..f98c11969 100644 --- a/packages/project/src/parse/from-app-state.ts +++ b/packages/project/src/parse/from-app-state.ts @@ -6,6 +6,7 @@ import { Provisioner } from '@openfn/lexicon/lightning'; import { Project } from '../Project'; import renameKeys from '../util/rename-keys'; import slugify from '../util/slugify'; +import omitNil from '../util/omit-nil'; import ensureJson from '../util/ensure-json'; import getCredentialName from '../util/get-credential-name'; @@ -120,7 +121,15 @@ export const mapWorkflow = ( // TODO what do we do if the condition is disabled? // I don't think that's the same as edge condition false? Object.values(workflow.triggers).forEach((trigger: Provisioner.Trigger) => { - const { type, enabled, ...otherProps } = trigger; + const { + type, + enabled, + cron_expression, + cron_cursor_job_id, + webhook_reply, + webhook_response_config, + ...otherProps + } = trigger; if (!mapped.start) { mapped.start = type; } @@ -128,23 +137,29 @@ export const mapWorkflow = ( const connectedEdges = Object.values(edges).filter( (e) => e.source_trigger_id === trigger.id ); - mapped.steps.push({ - id: type, - type, - enabled, - openfn: renameKeys(otherProps, { id: 'uuid' }), - next: connectedEdges.reduce((obj: any, edge) => { - const target = Object.values(jobs).find( - (j) => j.id === edge.target_job_id - ); - if (!target) { - throw new Error(`Failed to find ${edge.target_job_id}`); - } - // we use the name, not the id, to reference - obj[slugify(target.name)] = mapEdge(edge); - return obj; - }, {}), - } as l.Trigger); + mapped.steps.push( + omitNil({ + id: type, + type, + enabled, + cron_expression, + cron_cursor_job_id, + webhook_reply, + webhook_response_config, + openfn: renameKeys(otherProps, { id: 'uuid' }), + next: connectedEdges.reduce((obj: any, edge) => { + const target = Object.values(jobs).find( + (j) => j.id === edge.target_job_id + ); + if (!target) { + throw new Error(`Failed to find ${edge.target_job_id}`); + } + // we use the name, not the id, to reference + obj[slugify(target.name)] = mapEdge(edge); + return obj; + }, {}), + }) as l.Trigger + ); }); Object.values(workflow.jobs).forEach((step: Provisioner.Job) => { diff --git a/packages/project/src/serialize/to-app-state.ts b/packages/project/src/serialize/to-app-state.ts index 644147bb2..bcde55620 100644 --- a/packages/project/src/serialize/to-app-state.ts +++ b/packages/project/src/serialize/to-app-state.ts @@ -117,10 +117,12 @@ export const mapWorkflow = ( if (s.type) { isTrigger = true; + + const { type, id, next, openfn, ...rest } = s; node = { + ...rest, type: s.type ?? 'webhook', // this is mostly for tests - enabled: s.enabled, - ...renameKeys(s.openfn, { uuid: 'id' }), + ...renameKeys(openfn, { uuid: 'id' }), } as Provisioner.Trigger; wfState.triggers[node.type] = node; } else { diff --git a/packages/project/src/util/version.ts b/packages/project/src/util/version.ts index df2d70913..c9e8f2f27 100644 --- a/packages/project/src/util/version.ts +++ b/packages/project/src/util/version.ts @@ -41,7 +41,6 @@ export const generateHash = ( // this means we can match keys with lightning // and everything gets cleaner const wfState = mapWorkflow(workflow); - // These are the keys we hash against const wfKeys = ['name', 'positions'].sort(); @@ -56,9 +55,10 @@ export const generateHash = ( const triggerKeys = [ 'type', - 'cron_expression', 'enabled', + 'cron_expression', 'webhook_reply', + 'webhook_response_config', 'cron_cursor_job_id', ].sort(); diff --git a/packages/project/test/canonical.test.ts b/packages/project/test/canonical.test.ts new file mode 100644 index 000000000..ec8e8bc90 --- /dev/null +++ b/packages/project/test/canonical.test.ts @@ -0,0 +1,212 @@ +import test from 'ava'; +import { ProjectSpec } from '@openfn/lexicon'; +import { Project } from '../src/Project'; + +/** + * This file tests a kitchen sink, canonical v2 project spec file + * + * It should build it without type errors, then serialize to json and yaml formats + */ +const project: ProjectSpec = { + id: 'kitchen-sink', + name: 'Kitchen Sink Test', + description: 'Everything including the kitchen sink', + schema_version: '4.0', + credentials: [{ owner: 'admin@openfn.org', name: 'secret-squirrel' }], + collections: ['nut-stash'], + workflows: [ + { + id: 'wf-webhook', + name: 'Webhook Workflow', + start: 'webhook', + options: { timeout: 60_000, run_memory_limit_mb: 512 }, + steps: [ + { + id: 'webhook', + name: 'Webhook Trigger', + type: 'webhook', + enabled: true, + webhook_reply: 'before_start', + webhook_response: { + success_code: 202, + error_code: 500, + }, + next: 'fetch', + }, + { + id: 'fetch', + name: 'Fetch Data', + adaptor: '@openfn/language-http@latest', + expression: 'get("/data");', + next: { + transform: true, + log: 'state.data.length > 0', + archive: { + condition: '!state.errors', + label: 'No errors', + disabled: false, + }, + }, + }, + { + id: 'transform', + name: 'Transform', + adaptor: '@openfn/language-common@latest', + expression: 'fn(state => state);', + }, + { + id: 'log', + adaptor: '@openfn/language-common@latest', + expression: 'fn(state => { console.log(state); return state; });', + }, + { + id: 'archive', + adaptor: '@openfn/language-common@latest', + expression: 'fn(state => state);', + }, + ], + }, + { + id: 'wf-cron', + name: 'Cron Workflow', + schema_version: '1.0', + start: 'cron', + steps: [ + { + id: 'cron', + name: 'Cron Trigger', + type: 'cron', + enabled: false, + cron_expression: '0 0 * * *', + cron_cursor_job_id: 'cron-job', + webhook_reply: 'after_completion', + next: { 'cron-job': true }, + }, + { + id: 'cron-job', + name: 'Daily Sync', + adaptor: '@openfn/language-dhis2@latest', + expression: 'create("trackedEntityInstances", state.data);', + }, + ], + }, + ], +}; + +test('create a canonical project', (t) => { + const p = new Project(project); + + t.is(p.id, 'kitchen-sink'); + t.is(p.name, 'Kitchen Sink Test'); + t.is(p.description, 'Everything including the kitchen sink'); + + t.is(p.workflows.length, 2); + t.deepEqual(p.credentials, [ + { owner: 'admin@openfn.org', name: 'secret-squirrel' }, + ]); + t.deepEqual(p.collections, ['nut-stash']); + + const [webhook, cron] = p.workflows; + + t.is(webhook.id, 'wf-webhook'); + t.is(webhook.name, 'Webhook Workflow'); + t.is(webhook.start, 'webhook'); + t.is(webhook.steps.length, 5); + t.is(webhook.steps[0].id, 'webhook'); + t.is(webhook.steps[0].type, 'webhook'); + + t.is(cron.id, 'wf-cron'); + t.is(cron.steps.length, 2); + t.is(cron.steps[0].type, 'cron'); + t.is(cron.steps[0].cron_expression, '0 0 * * *'); +}); + +test('convert to v2 yaml', (t) => { + const p = new Project(project); + const yaml = p.serialize('project') as string; + + const expected = `id: kitchen-sink +name: Kitchen Sink Test +schema_version: '4.0' +description: Everything including the kitchen sink +collections: + - nut-stash +credentials: + - owner: admin@openfn.org + name: secret-squirrel +openfn: {} +options: {} +workflows: + - id: wf-webhook + name: Webhook Workflow + start: webhook + options: + timeout: 60000 + run_memory_limit_mb: 512 + steps: + - id: archive + adaptor: '@openfn/language-common@latest' + expression: fn(state => state); + - id: fetch + name: Fetch Data + adaptor: '@openfn/language-http@latest' + expression: get("/data"); + next: + transform: true + log: state.data.length > 0 + archive: + condition: '!state.errors' + label: No errors + disabled: false + - id: log + adaptor: '@openfn/language-common@latest' + expression: fn(state => { console.log(state); return state; }); + - id: transform + name: Transform + adaptor: '@openfn/language-common@latest' + expression: fn(state => state); + - id: webhook + name: Webhook Trigger + type: webhook + enabled: true + webhook_reply: before_start + webhook_response: + success_code: 202 + error_code: 500 + next: fetch + history: [] + - id: wf-cron + name: Cron Workflow + schema_version: '1.0' + start: cron + steps: + - id: cron + name: Cron Trigger + type: cron + enabled: false + cron_expression: 0 0 * * * + cron_cursor_job_id: cron-job + webhook_reply: after_completion + next: + cron-job: true + - id: cron-job + name: Daily Sync + adaptor: '@openfn/language-dhis2@latest' + expression: create("trackedEntityInstances", state.data); + history: []`.trim(); + + t.is(yaml.trim(), expected); +}); + +// skipped because serialized step order is different +test.skip('convert to v2 json', (t) => { + const p = new Project(project); + const json = p.serialize('project', { format: 'json' }) as string; + + t.deepEqual(json, project); +}); + +// I'd like to load the canonical json then convert it into json format +// but dropping all state keys +// that means supporting Project.serialize('project', { state: false }) +test.todo('roundtrip'); diff --git a/packages/project/test/parse/from-app-state.test.ts b/packages/project/test/parse/from-app-state.test.ts index e90421796..973abfef0 100644 --- a/packages/project/test/parse/from-app-state.test.ts +++ b/packages/project/test/parse/from-app-state.test.ts @@ -161,6 +161,7 @@ test('mapWorkflow: map a cron trigger', (t) => { id: '1234', type: 'cron', cron_expression: '0 1 0 0', + cron_cursor_job_id: 'x', enabled: true, }, }, @@ -174,15 +175,30 @@ test('mapWorkflow: map a cron trigger', (t) => { type: 'cron', next: {}, enabled: true, + cron_expression: '0 1 0 0', + cron_cursor_job_id: 'x', openfn: { uuid: '1234', - cron_expression: '0 1 0 0', }, }); }); test('mapWorkflow: map a webhook trigger', (t) => { - const mapped = mapWorkflow(state.workflows['my-workflow']); + const mapped = mapWorkflow({ + ...state.workflows['my-workflow'], + triggers: { + webhook: { + id: '4a06289c-15aa-4662-8dc6-f0aaacd8a058', + type: 'webhook', + enabled: true, + webhook_reply: 'before_start', + webhook_response_config: { + success_code: 202, + error_code: 500, + }, + }, + }, + }); const [trigger] = mapped.steps; @@ -190,6 +206,11 @@ test('mapWorkflow: map a webhook trigger', (t) => { id: 'webhook', type: 'webhook', enabled: true, + webhook_reply: 'before_start', + webhook_response_config: { + success_code: 202, + error_code: 500, + }, next: { 'transform-data': { condition: 'always', diff --git a/packages/project/test/project.test.ts b/packages/project/test/project.test.ts index 7322ac173..249ed7cc6 100644 --- a/packages/project/test/project.test.ts +++ b/packages/project/test/project.test.ts @@ -193,10 +193,7 @@ test('should convert a state file to a project and back again', async (t) => { t.is(project.openfn?.uuid, state.id); t.is(project.name, state.name); - // TODO: this hack is needed right now to serialize the state as json - project.config.formats.project = 'json'; - - const newState = project.serialize('state'); + const newState = project.serialize('state', { format: 'json' }); t.deepEqual(newState, state); });