Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
45c284d
feat: add Workflows support
connyay Jan 28, 2026
93f3c5a
clippy?
connyay Jan 28, 2026
d19624f
backout toolchain bump
connyay Jan 29, 2026
4536c0c
fix: wrap workflow JsFuture calls with SendFuture for axum compatibility
connyay Jan 29, 2026
b270455
Merge branch 'main' into cjh-workflow
guybedford Feb 21, 2026
95c8f7e
fix: wrap workflow callback with AssertUnwindSafe for panic-unwind co…
guybedford Feb 21, 2026
23c317e
fix: use JS NonRetryableError, async bindings, and retryable callbacks
connyay Feb 26, 2026
8c2e668
Merge branch 'main' into cjh-workflow
connyay Feb 26, 2026
446dc86
test NonRetryableError
connyay Feb 26, 2026
55ab563
Simplify workflow code: fix __wf_ handler leak, deduplicate test help…
connyay Mar 12, 2026
501c3c8
test: add wait_for_event/send_event workflow integration test
connyay Mar 12, 2026
e6a264b
Merge branch 'main' into cjh-workflow
connyay Mar 31, 2026
99b52f9
add test coverage for pause(), resume(), restart(), and terminate()
connyay Apr 1, 2026
92d6778
fix: resolve CI failures for clippy and panic-unwind
guybedford Apr 1, 2026
66d6fb8
Merge branch 'main' into cjh-workflow
guybedford Apr 1, 2026
84b0120
Merge branch 'main' into cjh-workflow
guybedford Apr 1, 2026
44b2bfd
Merge branch 'main' into cjh-workflow
guybedford Apr 1, 2026
7eaee7a
fix: wrap handler! async blocks in SendFuture to satisfy axum Send bound
guybedford Apr 1, 2026
187dad2
Merge branch 'main' into cjh-workflow
guybedford Apr 1, 2026
c89bee9
pr comments
connyay Apr 2, 2026
51c2b1d
use WorkflowSleepDuration everywhere
connyay Apr 2, 2026
834db2c
thread through WorkflowStepContext
connyay Apr 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ web-sys = { version = "0.3.90", features = [
"WritableStream",
"WritableStreamDefaultWriter",
] }
worker = { version = "0.7.5", path = "worker", features = ["queue", "d1", "axum", "timezone"] }
worker = { version = "0.7.5", path = "worker", features = ["queue", "d1", "axum", "timezone", "workflow"] }
worker-codegen = { path = "worker-codegen", version = "0.2.0" }
worker-macros = { version = "0.7.5", path = "worker-macros", features = ["queue"] }
worker-sys = { version = "0.7.5", path = "worker-sys", features = ["d1", "queue"] }
Expand Down
15 changes: 15 additions & 0 deletions examples/workflow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "workflow-example"
version = "0.1.0"
edition = "2021"

[package.metadata.release]
release = false

[lib]
crate-type = ["cdylib"]

[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
worker = { path = "../../worker", features = ["workflow"] }
178 changes: 178 additions & 0 deletions examples/workflow/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use serde::{Deserialize, Serialize};
use worker::wasm_bindgen::JsValue;
use worker::*;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MyParams {
pub email: String,
pub name: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MyOutput {
pub message: String,
pub steps_completed: u32,
}

#[workflow]
pub struct MyWorkflow {
#[allow(dead_code)]
env: Env,
}

impl WorkflowEntrypoint for MyWorkflow {
fn new(_ctx: Context, env: Env) -> Self {
Self { env }
}

async fn run(&self, event: WorkflowEvent, step: WorkflowStep) -> Result<JsValue> {
console_log!("Workflow started with instance ID: {}", event.instance_id);

let params: MyParams = serde_wasm_bindgen::from_value(event.payload)?;

let email_for_validation = params.email.clone();
step.do_with_config(
"validate-params",
StepConfig {
retries: Some(RetryConfig {
limit: 3,
delay: "1 second".into(),
backoff: None,
}),
timeout: None,
},
move |_ctx| {
let email = email_for_validation.clone();
async move {
if !email.contains('@') {
return Err(NonRetryableError::new("invalid email address").into());
}
Ok(serde_json::json!({ "valid": true }))
}
},
)
.await?;

let name_for_step1 = params.name.clone();
let step1_result = step
.do_("initial-processing", move |_ctx| {
let name = name_for_step1.clone();
async move {
console_log!("Processing for user: {}", name);
Ok(serde_json::json!({
"processed": true,
"user": name
}))
}
})
.await?;

console_log!("Step 1 completed: {:?}", step1_result);

console_log!("Step 2: Sleeping for 10 seconds...");
step.sleep("wait-for-processing", "10 seconds").await?;

let email_for_step3 = params.email.clone();
let notification_result = step
.do_with_config(
"send-notification",
StepConfig {
retries: Some(RetryConfig {
limit: 3,
delay: "5 seconds".into(),
backoff: Some(Backoff::Exponential),
}),
timeout: Some("1 minute".into()),
},
move |_ctx| {
let email = email_for_step3.clone();
async move {
console_log!("Sending notification to: {}", email);
if js_sys::Math::random() < 0.5 {
return Err("notification service temporarily unavailable".into());
}
Ok(serde_json::json!({
"notification_sent": true,
"email": email
}))
}
},
)
.await?;

console_log!("Step 3 completed: {:?}", notification_result);

let output = MyOutput {
message: format!("Workflow completed for {}", params.name),
steps_completed: 3,
};

Ok(serialize_as_object(&output)?)
}
}

#[event(fetch)]
async fn fetch(mut req: Request, env: Env, _ctx: Context) -> Result<Response> {
let url = req.url()?;
let path = url.path();
let workflow = env.workflow("MY_WORKFLOW")?;

match (req.method(), path) {
(Method::Post, "/workflow") => {
let params: MyParams = req.json().await?;

let instance = workflow
.create(Some(CreateOptions {
params: Some(params),
..Default::default()
}))
.await?;

Response::from_json(&serde_json::json!({
"id": instance.id(),
"message": "Workflow created"
}))
}

(Method::Get, path) if path.starts_with("/workflow/") => {
let id = path.trim_start_matches("/workflow/");
let instance = workflow.get(id).await?;
let status = instance.status().await?;

Response::from_json(&serde_json::json!({
"id": instance.id(),
"status": format!("{:?}", status.status),
"error": status.error,
"output": status.output
}))
}

(Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/pause") => {
let id = path
.trim_start_matches("/workflow/")
.trim_end_matches("/pause");
let instance = workflow.get(id).await?;
instance.pause().await?;

Response::from_json(&serde_json::json!({
"id": instance.id(),
"message": "Workflow paused"
}))
}

(Method::Post, path) if path.starts_with("/workflow/") && path.ends_with("/resume") => {
let id = path
.trim_start_matches("/workflow/")
.trim_end_matches("/resume");
let instance = workflow.get(id).await?;
instance.resume().await?;

Response::from_json(&serde_json::json!({
"id": instance.id(),
"message": "Workflow resumed"
}))
}

_ => Response::error("Not Found", 404),
}
}
13 changes: 13 additions & 0 deletions examples/workflow/wrangler.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name = "workflow-example"
main = "build/worker/shim.mjs"
compatibility_date = "2024-10-22"

[build]
# For development: use local worker-build binary
# For production: command = "cargo install -q worker-build && worker-build --release"
command = "RUSTFLAGS='--cfg=web_sys_unstable_apis' ../../target/release/worker-build --release"

[[workflows]]
name = "my-workflow"
binding = "MY_WORKFLOW"
class_name = "MyWorkflow"
20 changes: 10 additions & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod socket;
mod sql_counter;
mod sql_iterator;
mod user;
mod workflow;
mod ws;

#[derive(Deserialize, Serialize)]
Expand Down
24 changes: 19 additions & 5 deletions test/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
alarm, analytics_engine, assets, auto_response, cache, container, counter, d1, durable, fetch,
form, js_snippets, kv, put_raw, queue, r2, rate_limit, request, secret_store, service, socket,
sql_counter, sql_iterator, user, ws, SomeSharedData, GLOBAL_STATE,
sql_counter, sql_iterator, user, workflow, ws, SomeSharedData, GLOBAL_STATE,
};
#[cfg(feature = "http")]
use std::convert::TryInto;
Expand All @@ -17,6 +17,8 @@ use axum::{
routing::{delete, get, head, options, patch, post, put},
Extension,
};
#[cfg(feature = "http")]
use worker::send::SendFuture;

// Transform the argument into the correct form for the router.
// For axum::Router:
Expand Down Expand Up @@ -59,16 +61,16 @@ macro_rules! format_route (
#[cfg(feature = "http")]
macro_rules! handler (
($name:path) => {
|Extension(env): Extension<Env>, Extension(data): Extension<SomeSharedData>, req: axum::extract::Request| async {
|Extension(env): Extension<Env>, Extension(data): Extension<SomeSharedData>, req: axum::extract::Request| SendFuture::new(async {
let resp = $name(req.try_into().expect("convert request"), env, data).await.expect("handler result");
Into::<http::Response<axum::body::Body>>::into(resp)
}
})
};
($name:path, sync) => {
|Extension(env): Extension<Env>, Extension(data): Extension<SomeSharedData>, req: axum::extract::Request| async {
|Extension(env): Extension<Env>, Extension(data): Extension<SomeSharedData>, req: axum::extract::Request| SendFuture::new(async {
let resp = $name(req.try_into().expect("convert request"), env, data).expect("handler result");
Into::<http::Response<axum::body::Body>>::into(resp)
}
})
};
);
#[cfg(not(feature = "http"))]
Expand Down Expand Up @@ -239,6 +241,18 @@ macro_rules! add_routes (
add_route!($obj, get, format_route!("/rate-limit/key/{}", "key"), rate_limit::handle_rate_limit_with_key);
add_route!($obj, get, "/rate-limit/bulk-test", rate_limit::handle_rate_limit_bulk_test);
add_route!($obj, get, "/rate-limit/reset", rate_limit::handle_rate_limit_reset);
add_route!($obj, post, "/workflow/create", workflow::handle_workflow_create);
add_route!($obj, post, "/workflow/create-invalid", workflow::handle_workflow_create_invalid);
add_route!($obj, get, format_route!("/workflow/status/{}", "id"), workflow::handle_workflow_status);
add_route!($obj, post, "/workflow/event/create", workflow::handle_event_workflow_create);
add_route!($obj, post, format_route!("/workflow/event/send/{}", "id"), workflow::handle_event_workflow_send);
add_route!($obj, get, format_route!("/workflow/event/status/{}", "id"), workflow::handle_event_workflow_status);
add_route!($obj, post, "/workflow/lifecycle/create", workflow::handle_lifecycle_workflow_create);
add_route!($obj, get, format_route!("/workflow/lifecycle/status/{}", "id"), workflow::handle_lifecycle_workflow_status);
add_route!($obj, post, format_route!("/workflow/lifecycle/pause/{}", "id"), workflow::handle_lifecycle_workflow_pause);
add_route!($obj, post, format_route!("/workflow/lifecycle/resume/{}", "id"), workflow::handle_lifecycle_workflow_resume);
add_route!($obj, post, format_route!("/workflow/lifecycle/terminate/{}", "id"), workflow::handle_lifecycle_workflow_terminate);
add_route!($obj, post, format_route!("/workflow/lifecycle/restart/{}", "id"), workflow::handle_lifecycle_workflow_restart);
});

#[cfg(feature = "http")]
Expand Down
Loading