diff --git a/examples/axum/Cargo.lock b/examples/axum/Cargo.lock index e2b104274..927118dfe 100644 --- a/examples/axum/Cargo.lock +++ b/examples/axum/Cargo.lock @@ -342,9 +342,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" [[package]] name = "js-sys" -version = "0.3.85" +version = "0.3.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c942ebf8e95485ca0d52d97da7c5a2c387d0e7f0ba4c35e93bfcaee045955b3" +checksum = "14dc6f6450b3f6d4ed5b16327f38fed626d375a886159ca555bd7822c0c3a5a6" dependencies = [ "once_cell", "wasm-bindgen", @@ -666,9 +666,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "wasm-bindgen" -version = "0.2.108" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64024a30ec1e37399cf85a7ffefebdb72205ca1c972291c51512360d90bd8566" +checksum = "60722a937f594b7fde9adb894d7c092fc1bb6612897c46368d18e7a20208eff2" dependencies = [ "cfg-if", "once_cell", @@ -679,9 +679,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.58" +version = "0.4.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70a6e77fd0ae8029c9ea0063f87c46fde723e7d887703d74ad2616d792e51e6f" +checksum = "8a89f4650b770e4521aa6573724e2aed4704372151bd0de9d16a3bbabb87441a" dependencies = [ "cfg-if", "futures-util", @@ -693,9 +693,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.108" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "008b239d9c740232e71bd39e8ef6429d27097518b6b30bdf9086833bd5b6d608" +checksum = "0fac8c6395094b6b91c4af293f4c79371c163f9a6f56184d2c9a85f5a95f3950" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -703,9 +703,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.108" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5256bae2d58f54820e6490f9839c49780dff84c65aeab9e772f15d5f0e913a55" +checksum = "ab3fabce6159dc20728033842636887e4877688ae94382766e00b180abac9d60" dependencies = [ "bumpalo", "proc-macro2", @@ -716,18 +716,18 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.108" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f01b580c9ac74c8d8f0c0e4afb04eeef2acf145458e52c03845ee9cd23e3d12" +checksum = "de0e091bdb824da87dc01d967388880d017a0a9bc4f3bdc0d86ee9f9336e3bb5" dependencies = [ "unicode-ident", ] [[package]] name = "wasm-streams" -version = "0.4.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +checksum = "9d1ec4f6517c9e11ae630e200b2b65d193279042e28edd4a2cda233e46670bbb" dependencies = [ "futures-util", "js-sys", @@ -738,9 +738,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.85" +version = "0.3.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "312e32e551d92129218ea9a2452120f4aabc03529ef03e4d0d82fb2780608598" +checksum = "705eceb4ce901230f8625bd1d665128056ccbe4b7408faa625eec1ba80f59a97" dependencies = [ "js-sys", "wasm-bindgen", @@ -748,9 +748,7 @@ dependencies = [ [[package]] name = "worker" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1e113ad04154d7ded1b582891ef2ae0d62d8460fcc5663cd2378588161527c" +version = "0.7.4" dependencies = [ "async-trait", "axum", @@ -779,9 +777,7 @@ dependencies = [ [[package]] name = "worker-macros" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55c7ca8df6cf32003d4f60c7c89a7e975e0fa0cbf031726f45cd395e557d9a5e" +version = "0.7.4" dependencies = [ "async-trait", "proc-macro2", @@ -795,9 +791,7 @@ dependencies = [ [[package]] name = "worker-sys" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cefeaf6edf2558ff50e6776056464b507f64be9c1f26f2465af531b584867ed7" +version = "0.7.4" dependencies = [ "cfg-if", "js-sys", diff --git a/examples/axum/Cargo.toml b/examples/axum/Cargo.toml index 36e34902e..2a816c4e4 100644 --- a/examples/axum/Cargo.toml +++ b/examples/axum/Cargo.toml @@ -10,8 +10,8 @@ release = false crate-type = ["cdylib"] [dependencies] -worker = { version = "0.7", features = ['http', 'axum'] } -worker-macros = { version = "0.7", features = ['http'] } +worker = { path = "../../worker", features = ['http', 'axum'] } +worker-macros = { path = "../../worker-macros", features = ['http'] } axum = { version = "0.8", default-features = false, features = ['json'] } axum-macros = "0.5.0" tower-service = "0.3.3" diff --git a/examples/axum/src/resources/foos/api.rs b/examples/axum/src/resources/foos/api.rs index 489c4f9eb..0cfd6f6ae 100644 --- a/examples/axum/src/resources/foos/api.rs +++ b/examples/axum/src/resources/foos/api.rs @@ -5,15 +5,6 @@ use axum::Json; use axum_macros::debug_handler; use worker::Result; -/// `get()` requires the `#[worker::send]` macro because Cloudflare Workers -/// execute a handler's future on a single JavaScript event loop. -/// -/// The macro helps make `await` boundaries in the handler's function body `Send` -/// so the worker runtime can safely poll them. -/// -/// You can read more about it here in the "`Send` Helpers" section: -/// https://docs.rs/worker/latest/worker/ -#[worker::send] #[debug_handler] pub async fn get( State(state): State, diff --git a/examples/rpc-client/src/calculator.rs b/examples/rpc-client/src/calculator.rs index ad76ca941..a90239194 100644 --- a/examples/rpc-client/src/calculator.rs +++ b/examples/rpc-client/src/calculator.rs @@ -22,20 +22,18 @@ mod sys { pub trait Calculator { async fn add(&self, a: u32, b: u32) -> ::worker::Result; } -pub struct CalculatorService(::worker::send::SendWrapper); +pub struct CalculatorService(sys::CalculatorSys); #[async_trait::async_trait] impl Calculator for CalculatorService { async fn add(&self, a: u32, b: u32) -> ::worker::Result { let promise = self.0.add(a, b)?; - let fut = ::worker::send::SendFuture::new( - ::worker::wasm_bindgen_futures::JsFuture::from(promise), - ); + let fut = ::worker::wasm_bindgen_futures::JsFuture::from(promise); let output = fut.await?; Ok(::serde_wasm_bindgen::from_value(output)?) } } impl From<::worker::Fetcher> for CalculatorService { fn from(fetcher: ::worker::Fetcher) -> Self { - Self(::worker::send::SendWrapper::new(fetcher.into_rpc())) + Self(fetcher.into_rpc()) } } diff --git a/test/src/alarm.rs b/test/src/alarm.rs index 76e188bfb..606d598ad 100644 --- a/test/src/alarm.rs +++ b/test/src/alarm.rs @@ -36,7 +36,6 @@ impl DurableObject for AlarmObject { } } -#[worker::send] pub async fn handle_alarm(_req: Request, env: Env, _data: SomeSharedData) -> Result { let namespace = env.durable_object("ALARM")?; let stub = namespace.id_from_name("alarm")?.get_stub()?; diff --git a/test/src/analytics_engine.rs b/test/src/analytics_engine.rs index cd0880d97..152d6f2f8 100644 --- a/test/src/analytics_engine.rs +++ b/test/src/analytics_engine.rs @@ -2,7 +2,6 @@ use super::SomeSharedData; use uuid::Uuid; use worker::{AnalyticsEngineDataPointBuilder, Env, Request, Response, Result}; -#[worker::send] pub async fn handle_analytics_event( req: Request, env: Env, @@ -31,5 +30,5 @@ pub async fn handle_analytics_event( .add_double(200) .write_to(&dataset)?; - return Response::ok("Events sent"); + Response::ok("Events sent") } diff --git a/test/src/assets.rs b/test/src/assets.rs index f3bedc0f7..eb69bfdc1 100644 --- a/test/src/assets.rs +++ b/test/src/assets.rs @@ -17,7 +17,6 @@ pub async fn handle_asset( } #[cfg(feature = "http")] -#[worker::send] pub async fn handle_asset( req: worker::Request, env: worker::Env, diff --git a/test/src/auto_response.rs b/test/src/auto_response.rs index 38e1b429e..2e917d2c6 100644 --- a/test/src/auto_response.rs +++ b/test/src/auto_response.rs @@ -36,7 +36,6 @@ impl DurableObject for AutoResponseObject { } // Route handler to exercise the Durable Object from tests. -#[worker::send] pub async fn handle_auto_response( _req: Request, env: Env, diff --git a/test/src/cache.rs b/test/src/cache.rs index edaf48452..baea21374 100644 --- a/test/src/cache.rs +++ b/test/src/cache.rs @@ -12,7 +12,6 @@ fn key(req: &Request) -> Result> { Ok(segments.nth(2).map(ToOwned::to_owned)) } -#[worker::send] pub async fn handle_cache_example( req: Request, _env: Env, @@ -36,7 +35,6 @@ pub async fn handle_cache_example( } } -#[worker::send] pub async fn handle_cache_api_get( req: Request, _env: Env, @@ -52,7 +50,6 @@ pub async fn handle_cache_api_get( Response::error("key missing", 400) } -#[worker::send] pub async fn handle_cache_api_put( req: Request, _env: Env, @@ -71,7 +68,6 @@ pub async fn handle_cache_api_put( Response::error("key missing", 400) } -#[worker::send] pub async fn handle_cache_api_delete( req: Request, _env: Env, @@ -86,7 +82,6 @@ pub async fn handle_cache_api_delete( Response::error("key missing", 400) } -#[worker::send] pub async fn handle_cache_stream( req: Request, _env: Env, @@ -100,8 +95,10 @@ pub async fn handle_cache_stream( Ok(resp) } else { console_log!("Cache MISS!"); - let mut rng = rand::rng(); - let count = rng.random_range(0..10); + let count = { + let mut rng = rand::rng(); + rng.random_range(0..10) + }; let stream = futures_util::stream::repeat("Hello, world!\n") .take(count) .then(|text| async move { @@ -119,3 +116,14 @@ pub async fn handle_cache_stream( Ok(resp) } } + +// Compile-time assertion: public async Cache methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn cache(c: worker::Cache) { + require_send(c.put("k", worker::Response::empty().unwrap())); + require_send(c.get("k", false)); + require_send(c.delete("k", false)); + } +} diff --git a/test/src/container.rs b/test/src/container.rs index b99ab1ddc..1052ae723 100644 --- a/test/src/container.rs +++ b/test/src/container.rs @@ -81,7 +81,6 @@ impl DurableObject for EchoContainer { const CONTAINER_NAME: &str = "my-container"; -#[worker::send] pub async fn handle_container( mut req: Request, env: Env, @@ -142,3 +141,13 @@ async fn redir_websocket(dst: WebSocket, src: WebSocket) { } } } + +// Compile-time assertion: public async Container methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn container(c: worker::Container) { + require_send(c.wait_for_exit()); + require_send(c.destroy(None)); + } +} diff --git a/test/src/counter.rs b/test/src/counter.rs index c301efabb..af9be9a9d 100644 --- a/test/src/counter.rs +++ b/test/src/counter.rs @@ -95,7 +95,6 @@ impl DurableObject for Counter { } } -#[worker::send] pub async fn handle_id(req: Request, env: Env, _data: SomeSharedData) -> Result { let durable_object_name = if req.path().contains("shared") { "SHARED_COUNTER" @@ -110,7 +109,6 @@ pub async fn handle_id(req: Request, env: Env, _data: SomeSharedData) -> Result< stub.fetch_with_str("https://fake-host/").await } -#[worker::send] pub async fn handle_websocket(req: Request, env: Env, _data: SomeSharedData) -> Result { let durable_object_name = if req.path().contains("shared") { "SHARED_COUNTER" diff --git a/test/src/d1.rs b/test/src/d1.rs index 63ce86355..562046454 100644 --- a/test/src/d1.rs +++ b/test/src/d1.rs @@ -14,7 +14,6 @@ struct Person { age: u32, } -#[worker::send] pub async fn prepared_statement( _req: Request, env: Env, @@ -68,7 +67,6 @@ pub async fn prepared_statement( Response::ok("ok") } -#[worker::send] pub async fn batch(_req: Request, env: Env, _data: SomeSharedData) -> Result { let db = env.d1("DB")?; let mut results = db @@ -93,7 +91,6 @@ pub async fn batch(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let db = env.d1("DB")?; let result = db @@ -104,14 +101,12 @@ pub async fn exec(mut req: Request, env: Env, _data: SomeSharedData) -> Result Result { let db = env.d1("DB")?; let bytes = db.dump().await?; Response::from_bytes(bytes) } -#[worker::send] pub async fn error(_req: Request, env: Env, _data: SomeSharedData) -> Result { let db = env.d1("DB")?; let error = db @@ -138,7 +133,6 @@ struct NullablePerson { age: Option, } -#[worker::send] pub async fn jsvalue_null_is_null( _req: Request, _env: Env, @@ -149,7 +143,6 @@ pub async fn jsvalue_null_is_null( Response::ok("ok") } -#[worker::send] pub async fn serialize_optional_none( _req: Request, _env: Env, @@ -164,7 +157,6 @@ pub async fn serialize_optional_none( Response::ok("ok") } -#[worker::send] pub async fn serialize_optional_some( _req: Request, _env: Env, @@ -179,7 +171,6 @@ pub async fn serialize_optional_some( Response::ok("ok") } -#[worker::send] pub async fn deserialize_optional_none( _req: Request, _env: Env, @@ -201,7 +192,6 @@ pub async fn deserialize_optional_none( Response::ok("ok") } -#[worker::send] pub async fn insert_and_retrieve_optional_none( _req: Request, env: Env, @@ -227,7 +217,6 @@ pub async fn insert_and_retrieve_optional_none( Response::ok("ok") } -#[worker::send] pub async fn insert_and_retrieve_optional_some( _req: Request, env: Env, @@ -252,7 +241,6 @@ pub async fn insert_and_retrieve_optional_some( Response::ok("ok") } -#[worker::send] pub async fn retrieve_optional_none( _req: Request, env: Env, @@ -269,7 +257,6 @@ pub async fn retrieve_optional_none( Response::ok("ok") } -#[worker::send] pub async fn retrieve_optional_some( _req: Request, env: Env, @@ -286,7 +273,6 @@ pub async fn retrieve_optional_some( Response::ok("ok") } -#[worker::send] pub async fn retrive_first_none( _req: Request, env: Env, @@ -299,3 +285,18 @@ pub async fn retrive_first_none( Response::ok("ok") } + +// Compile-time assertion: public async D1 methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn d1(db: worker::D1Database) { + require_send(db.dump()); + require_send(db.exec("SELECT 1")); + let stmt = db.prepare("SELECT 1"); + require_send(stmt.first::(None)); + require_send(stmt.run()); + require_send(stmt.all()); + require_send(stmt.raw::()); + } +} diff --git a/test/src/durable.rs b/test/src/durable.rs index c99e646f0..726e06552 100644 --- a/test/src/durable.rs +++ b/test/src/durable.rs @@ -199,7 +199,6 @@ impl DurableObject for AnotherClass { } // Route handlers to exercise the Durable Object from tests. -#[worker::send] pub async fn handle_hello( _req: Request, env: Env, @@ -213,7 +212,6 @@ pub async fn handle_hello( .await } -#[worker::send] pub async fn handle_hello_unique( _req: Request, env: Env, @@ -227,7 +225,6 @@ pub async fn handle_hello_unique( .await } -#[worker::send] pub async fn handle_storage( _req: Request, env: Env, @@ -238,7 +235,6 @@ pub async fn handle_storage( stub.fetch_with_str("https://fake-host/storage").await } -#[worker::send] pub async fn handle_basic_test( _req: Request, env: Env, @@ -313,7 +309,6 @@ pub async fn handle_basic_test( Response::ok("ok") } -#[worker::send] pub async fn handle_get_by_name( _req: Request, env: Env, @@ -331,7 +326,6 @@ pub async fn handle_get_by_name( .await } -#[worker::send] pub async fn handle_get_by_name_with_location_hint( _req: Request, env: Env, @@ -346,3 +340,12 @@ pub async fn handle_get_by_name_with_location_hint( stub.fetch_with_str(&format!("https://fake-host/hello?name={name}")) .await } + +// Compile-time assertion: public async Durable Object methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn stub(s: worker::Stub) { + require_send(s.fetch_with_str("https://example.com")); + } +} diff --git a/test/src/fetch.rs b/test/src/fetch.rs index d5c351bc6..0a7495fa7 100644 --- a/test/src/fetch.rs +++ b/test/src/fetch.rs @@ -7,7 +7,6 @@ use worker::{ RequestInit, Response, Result, }; -#[worker::send] pub async fn handle_fetch(_req: Request, _env: Env, _data: SomeSharedData) -> Result { let req = Request::new("https://example.com", Method::Post)?; let resp = Fetch::Request(req).send().await?; @@ -19,7 +18,6 @@ pub async fn handle_fetch(_req: Request, _env: Env, _data: SomeSharedData) -> Re )) } -#[worker::send] pub async fn handle_fetch_json( _req: Request, _env: Env, @@ -40,7 +38,6 @@ pub async fn handle_fetch_json( )) } -#[worker::send] pub async fn handle_proxy_request( req: Request, _env: Env, @@ -57,7 +54,6 @@ pub async fn handle_proxy_request( Fetch::Url(url.parse()?).send().await } -#[worker::send] pub async fn handle_request_init_fetch( _req: Request, _env: Env, @@ -69,7 +65,6 @@ pub async fn handle_request_init_fetch( .await } -#[worker::send] pub async fn handle_request_init_fetch_post( _req: Request, _env: Env, @@ -82,7 +77,6 @@ pub async fn handle_request_init_fetch_post( .await } -#[worker::send] pub async fn handle_cancelled_fetch( _req: Request, _env: Env, @@ -115,7 +109,6 @@ pub async fn handle_cancelled_fetch( Ok(res) } -#[worker::send] pub async fn handle_fetch_timeout( _req: Request, _env: Env, @@ -158,7 +151,6 @@ pub async fn handle_fetch_timeout( } } -#[worker::send] pub async fn handle_cloned_fetch( _req: Request, _env: Env, @@ -179,7 +171,6 @@ pub async fn handle_cloned_fetch( Response::ok((left == right).to_string()) } -#[worker::send] pub async fn handle_cloned_response_attributes( _req: Request, _env: Env, @@ -209,3 +200,15 @@ pub async fn handle_cloned_response_attributes( Response::ok("true") } + +// Compile-time assertion: public async Fetch methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn fetch(f: worker::Fetch) { + require_send(f.send()); + } + fn fetcher(f: worker::Fetcher) { + require_send(f.fetch("https://example.com", None)); + } +} diff --git a/test/src/form.rs b/test/src/form.rs index 4c6ad57f1..7f88824b6 100644 --- a/test/src/form.rs +++ b/test/src/form.rs @@ -5,7 +5,6 @@ use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use worker::{kv, Env, FormEntry, Request, Response, Result}; -#[worker::send] pub async fn handle_formdata_name( mut req: Request, _env: Env, @@ -48,7 +47,6 @@ struct FileSize { size: u32, } -#[worker::send] pub async fn handle_formdata_file_size( mut req: Request, env: Env, @@ -88,7 +86,6 @@ pub async fn handle_formdata_file_size( Response::error("Bad Request", 400) } -#[worker::send] pub async fn handle_formdata_file_size_hash( req: Request, env: Env, @@ -108,7 +105,6 @@ pub async fn handle_formdata_file_size_hash( Response::error("Bad Request", 400) } -#[worker::send] pub async fn handle_is_secret( mut req: Request, env: Env, diff --git a/test/src/js_snippets.rs b/test/src/js_snippets.rs index ca929da8a..a5b7197e1 100644 --- a/test/src/js_snippets.rs +++ b/test/src/js_snippets.rs @@ -22,12 +22,10 @@ extern "C" { fn js_throw_error(); } -#[worker::send] pub async fn performance_now(_req: Request, _env: Env, _data: SomeSharedData) -> Result { Response::ok(format!("now: {}", js_performance_now())) } -#[worker::send] pub async fn console_log(_req: Request, _env: Env, _data: SomeSharedData) -> Result { js_console_log("test".to_owned()); Response::ok("OK") diff --git a/test/src/kv.rs b/test/src/kv.rs index 55533aae6..b2cb67e13 100644 --- a/test/src/kv.rs +++ b/test/src/kv.rs @@ -17,7 +17,6 @@ macro_rules! kv_assert_eq { }}; } -#[worker::send] pub async fn handle_post_key_value( req: Request, env: Env, @@ -39,7 +38,6 @@ pub async fn handle_post_key_value( const TEST_NAMESPACE: &str = "TEST"; -#[worker::send] pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; let value = store.get("simple").text().await?; @@ -49,7 +47,6 @@ pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let store = env.kv(TEST_NAMESPACE)?; let value = store.get("not_found").text().await?; @@ -59,7 +56,6 @@ pub async fn get_not_found(_req: Request, env: Env, _data: SomeSharedData) -> Re } } -#[worker::send] pub async fn list_keys(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; let list_res = store.list().execute().await?; @@ -70,7 +66,6 @@ pub async fn list_keys(_req: Request, env: Env, _data: SomeSharedData) -> Result Response::ok("passed") } -#[worker::send] pub async fn put_simple(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; store.put("put_a", "test")?.execute().await?; @@ -81,7 +76,6 @@ pub async fn put_simple(_req: Request, env: Env, _data: SomeSharedData) -> Resul Response::ok("passed") } -#[worker::send] pub async fn put_metadata(_req: Request, env: Env, _data: SomeSharedData) -> Result { let store = env.kv(TEST_NAMESPACE)?; store.put("put_b", "test")?.metadata(100)?.execute().await?; @@ -93,7 +87,6 @@ pub async fn put_metadata(_req: Request, env: Env, _data: SomeSharedData) -> Res Response::ok("passed") } -#[worker::send] pub async fn put_expiration(_req: Request, env: Env, _data: SomeSharedData) -> Result { const EXPIRATION: u64 = 2_000_000_000; let store = env.kv(TEST_NAMESPACE)?; @@ -117,7 +110,6 @@ pub async fn put_expiration(_req: Request, env: Env, _data: SomeSharedData) -> R Response::ok("passed") } -#[worker::send] pub async fn put_metadata_struct( _req: Request, env: Env, @@ -148,3 +140,18 @@ pub async fn put_metadata_struct( Response::ok("passed") } + +// Compile-time assertion: public async KV methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn kv(kv: worker::KvStore) { + require_send(kv.put("k", "v").unwrap().execute()); + require_send(kv.get("k").text()); + require_send(kv.get("k").json::()); + require_send(kv.get("k").bytes()); + require_send(kv.get("k").text_with_metadata::()); + require_send(kv.delete("k")); + require_send(kv.list().execute()); + } +} diff --git a/test/src/lib.rs b/test/src/lib.rs index 4f28ca705..fe444b4d5 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -52,6 +52,9 @@ pub struct SomeSharedData { regex: &'static Regex, } +unsafe impl Send for SomeSharedData {} +unsafe impl Sync for SomeSharedData {} + static GLOBAL_STATE: AtomicBool = AtomicBool::new(false); static GLOBAL_QUEUE_STATE: Mutex> = Mutex::new(Vec::new()); diff --git a/test/src/put_raw.rs b/test/src/put_raw.rs index 04d469ef9..22342a6b2 100644 --- a/test/src/put_raw.rs +++ b/test/src/put_raw.rs @@ -66,7 +66,6 @@ impl DurableObject for PutRawTestObject { } } -#[worker::send] pub(crate) async fn handle_put_raw( req: Request, env: Env, diff --git a/test/src/queue.rs b/test/src/queue.rs index 5e9da4a8a..72342b4cc 100644 --- a/test/src/queue.rs +++ b/test/src/queue.rs @@ -26,7 +26,6 @@ pub async fn queue(message_batch: MessageBatch, _env: Env, _ctx: Cont Ok(()) } -#[worker::send] pub async fn handle_queue_send(req: Request, env: Env, _data: SomeSharedData) -> Result { let uri = req.url()?; let mut segments = uri.path_segments().unwrap(); @@ -54,7 +53,6 @@ pub async fn handle_queue_send(req: Request, env: Env, _data: SomeSharedData) -> } } -#[worker::send] pub async fn handle_batch_send(mut req: Request, env: Env, _: SomeSharedData) -> Result { let messages: Vec = match req.json().await { Ok(messages) => messages, @@ -82,3 +80,12 @@ pub async fn handle_queue(_req: Request, _env: Env, _data: SomeSharedData) -> Re let messages: Vec = guard.clone(); Response::from_json(&messages) } + +// Compile-time assertion: public async Queue methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn queue(q: worker::Queue) { + require_send(q.send("msg")); + } +} diff --git a/test/src/r2.rs b/test/src/r2.rs index 5ef6c0f93..685f09852 100644 --- a/test/src/r2.rs +++ b/test/src/r2.rs @@ -31,7 +31,6 @@ pub async fn seed_bucket(bucket: &Bucket) -> Result<()> { Ok(()) } -#[worker::send] pub async fn list_empty(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("EMPTY_BUCKET")?; @@ -43,7 +42,6 @@ pub async fn list_empty(_req: Request, env: Env, _data: SomeSharedData) -> Resul Response::ok("ok") } -#[worker::send] pub async fn list(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("SEEDED_BUCKET")?; seed_bucket(&bucket).await?; @@ -96,7 +94,6 @@ pub async fn list(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let bucket = env.bucket("EMPTY_BUCKET")?; @@ -119,7 +116,6 @@ pub async fn get_empty(_req: Request, env: Env, _data: SomeSharedData) -> Result Response::ok("ok") } -#[worker::send] pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("SEEDED_BUCKET")?; seed_bucket(&bucket).await?; @@ -141,7 +137,6 @@ pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let bucket = env.bucket("PUT_BUCKET")?; @@ -175,7 +170,6 @@ pub async fn put(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { let bucket = env.bucket("PUT_BUCKET")?; let (http_metadata, custom_metadata, object_with_props) = @@ -190,7 +184,6 @@ pub async fn put_properties(_req: Request, env: Env, _data: SomeSharedData) -> R } #[allow(clippy::large_stack_arrays)] -#[worker::send] pub async fn put_multipart(_req: Request, env: Env, _data: SomeSharedData) -> Result { const R2_MULTIPART_CHUNK_MIN_SIZE: usize = 5 * 1_024 * 1_024; // 5MiB. // const TEST_CHUNK_COUNT: usize = 3; @@ -246,7 +239,6 @@ pub async fn put_multipart(_req: Request, env: Env, _data: SomeSharedData) -> Re Response::ok("ok") } -#[worker::send] pub async fn delete(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("DELETE_BUCKET")?; @@ -303,3 +295,17 @@ fn dummy_properties() -> (HttpMetadata, HashMap) { }; (http_metadata, custom_metadata) } + +// Compile-time assertion: public async R2 methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn r2(bucket: worker::Bucket) { + require_send(bucket.head("k")); + require_send(bucket.delete("k")); + require_send(bucket.delete_multiple(vec!["a", "b"])); + require_send(bucket.get("k").execute()); + require_send(bucket.put("k", worker::Data::Empty).execute()); + require_send(bucket.list().execute()); + } +} diff --git a/test/src/rate_limit.rs b/test/src/rate_limit.rs index d8665d1e4..5d92a3fb1 100644 --- a/test/src/rate_limit.rs +++ b/test/src/rate_limit.rs @@ -2,7 +2,6 @@ use super::SomeSharedData; use std::collections::HashMap; use worker::{js_sys, Env, Request, Response, Result}; -#[worker::send] pub async fn handle_rate_limit_check( _req: Request, env: Env, @@ -18,7 +17,6 @@ pub async fn handle_rate_limit_check( })) } -#[worker::send] pub async fn handle_rate_limit_with_key( req: Request, env: Env, @@ -37,7 +35,6 @@ pub async fn handle_rate_limit_with_key( })) } -#[worker::send] pub async fn handle_rate_limit_bulk_test( _req: Request, env: Env, @@ -62,7 +59,6 @@ pub async fn handle_rate_limit_bulk_test( })) } -#[worker::send] pub async fn handle_rate_limit_reset( _req: Request, env: Env, @@ -82,3 +78,12 @@ pub async fn handle_rate_limit_reset( Response::from_json(&outcomes) } + +// Compile-time assertion: public async RateLimiter methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn rate_limiter(rl: worker::RateLimiter) { + require_send(rl.limit("key".into())); + } +} diff --git a/test/src/request.rs b/test/src/request.rs index 981d41fcb..dbf81f4d9 100644 --- a/test/src/request.rs +++ b/test/src/request.rs @@ -71,7 +71,6 @@ pub async fn handle_headers(req: Request, _env: Env, _data: SomeSharedData) -> R .ok("returned your headers to you.") } -#[worker::send] pub async fn handle_post_file_size( mut req: Request, _env: Env, @@ -81,7 +80,6 @@ pub async fn handle_post_file_size( Response::ok(format!("size = {}", bytes.len())) } -#[worker::send] pub async fn handle_async_text_echo( mut req: Request, _env: Env, @@ -112,7 +110,6 @@ pub async fn handle_bytes(_req: Request, _env: Env, _data: SomeSharedData) -> Re Response::from_bytes(vec![1, 2, 3, 4, 5, 6, 7]) } -#[worker::send] pub async fn handle_api_data( mut req: Request, _env: Env, @@ -183,7 +180,6 @@ pub async fn handle_now(_req: Request, _env: Env, _data: SomeSharedData) -> Resu Response::ok(js_date.to_string()) } -#[worker::send] pub async fn handle_cloned(_req: Request, _env: Env, _data: SomeSharedData) -> Result { let mut resp = Response::ok("Hello")?; let mut resp1 = resp.cloned()?; @@ -194,7 +190,6 @@ pub async fn handle_cloned(_req: Request, _env: Env, _data: SomeSharedData) -> R Response::ok((left == right).to_string()) } -#[worker::send] pub async fn handle_cloned_stream( _req: Request, _env: Env, @@ -226,7 +221,6 @@ pub async fn handle_custom_response_body( Response::from_body(ResponseBody::Body(vec![b'h', b'e', b'l', b'l', b'o'])) } -#[worker::send] pub async fn handle_wait_delay(req: Request, _env: Env, _data: SomeSharedData) -> Result { let uri = req.url()?; let mut segments = uri.path_segments().unwrap(); @@ -241,3 +235,15 @@ pub async fn handle_wait_delay(req: Request, _env: Env, _data: SomeSharedData) - Response::ok("Waited!\n") } + +// Compile-time assertion: public async Request methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn request(mut req: worker::Request) { + require_send(req.json::()); + require_send(req.text()); + require_send(req.bytes()); + require_send(req.form_data()); + } +} diff --git a/test/src/router.rs b/test/src/router.rs index 4ca07033e..bd43b7267 100644 --- a/test/src/router.rs +++ b/test/src/router.rs @@ -266,7 +266,6 @@ async fn respond_async(req: Request, _env: Env, _data: SomeSharedData) -> Result .ok(format!("Ok (async): {}", String::from(req.method()))) } -#[worker::send] async fn handle_close_event(_req: Request, env: Env, _data: SomeSharedData) -> Result { let some_namespace_kv = env.kv("SOME_NAMESPACE")?; let got_close_event = some_namespace_kv @@ -279,7 +278,6 @@ async fn handle_close_event(_req: Request, env: Env, _data: SomeSharedData) -> R Response::ok(got_close_event) } -#[worker::send] async fn catchall(req: Request, _env: Env, _data: SomeSharedData) -> Result { let uri = req.url()?; let path = uri.path(); diff --git a/test/src/secret_store.rs b/test/src/secret_store.rs index 14270c17d..d488829d0 100644 --- a/test/src/secret_store.rs +++ b/test/src/secret_store.rs @@ -1,7 +1,6 @@ use crate::SomeSharedData; use worker::{Env, Request, Response, Result}; -#[worker::send] pub async fn get_from_secret_store( _req: Request, env: Env, @@ -16,7 +15,6 @@ pub async fn get_from_secret_store( } } -#[worker::send] pub async fn get_from_secret_store_missing( _req: Request, env: Env, @@ -30,3 +28,12 @@ pub async fn get_from_secret_store_missing( None => Response::error("Secret not found", 500), } } + +// Compile-time assertion: public async SecretStore methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn secret_store(ss: worker::SecretStore) { + require_send(ss.get()); + } +} diff --git a/test/src/service.rs b/test/src/service.rs index 91fc2bcd8..1df8abc30 100644 --- a/test/src/service.rs +++ b/test/src/service.rs @@ -3,7 +3,6 @@ use super::SomeSharedData; use std::convert::TryInto; use worker::{Env, Method, Request, RequestInit, Response, Result}; -#[worker::send] pub async fn handle_remote_by_request( req: Request, env: Env, @@ -21,7 +20,6 @@ pub async fn handle_remote_by_request( result } -#[worker::send] pub async fn handle_remote_by_path( req: Request, env: Env, @@ -39,3 +37,15 @@ pub async fn handle_remote_by_path( result } + +// Compile-time assertion: public async Fetcher methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn fetcher(f: worker::Fetcher) { + require_send(f.fetch("https://example.com", None)); + require_send(f.fetch_request( + worker::Request::new("https://example.com", worker::Method::Get).unwrap(), + )); + } +} diff --git a/test/src/socket.rs b/test/src/socket.rs index 81fe2983e..1ddbeddb3 100644 --- a/test/src/socket.rs +++ b/test/src/socket.rs @@ -3,7 +3,6 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use worker::{ConnectionBuilder, Env, Error, Request, Response, Result}; -#[worker::send] pub async fn handle_socket_failed( _req: Request, _env: Env, @@ -12,16 +11,13 @@ pub async fn handle_socket_failed( let socket = ConnectionBuilder::new().connect("127.0.0.1", 25000)?; match socket.opened().await { - Ok(_) => { - return Err(Error::RustError( - "Socket should have failed to open.".to_owned(), - )) - } + Ok(_) => Err(Error::RustError( + "Socket should have failed to open.".to_owned(), + )), Err(e) => Response::ok(format!("{e:?}")), } } -#[worker::send] pub async fn handle_socket_read( _req: Request, _env: Env, @@ -48,3 +44,14 @@ pub async fn handle_socket_read( Response::ok("success") } + +// Compile-time assertion: public async Socket methods return Send futures. +#[allow(dead_code, unused)] +fn _assert_send() { + fn require_send(_t: T) {} + fn socket(mut s: worker::Socket) { + require_send(s.close()); + require_send(s.closed()); + require_send(s.opened()); + } +} diff --git a/test/src/sql_counter.rs b/test/src/sql_counter.rs index 46672e018..9eb1faaf5 100644 --- a/test/src/sql_counter.rs +++ b/test/src/sql_counter.rs @@ -85,7 +85,6 @@ impl SqlCounter { } } -#[worker::send] /// Route handler that proxies a request to our SqlCounter Durable Object with id derived from the /// path `/sql-counter/{name}` (so every name gets its own instance). pub async fn handle_sql_counter( diff --git a/test/src/sql_iterator.rs b/test/src/sql_iterator.rs index 019c0b63d..a630cdc87 100644 --- a/test/src/sql_iterator.rs +++ b/test/src/sql_iterator.rs @@ -411,7 +411,6 @@ impl SqlIterator { } } -#[worker::send] /// Route handler for the SQL iterator test Durable Object. pub async fn handle_sql_iterator( req: Request, diff --git a/test/src/ws.rs b/test/src/ws.rs index e0c3899af..79ad22e65 100644 --- a/test/src/ws.rs +++ b/test/src/ws.rs @@ -40,7 +40,6 @@ pub async fn handle_websocket(_req: Request, env: Env, _data: SomeSharedData) -> Response::from_websocket(pair.client) } -#[worker::send] pub async fn handle_websocket_client( _req: Request, _env: Env, diff --git a/worker-codegen/src/wit.rs b/worker-codegen/src/wit.rs index 50e2aa0a1..85332f87c 100644 --- a/worker-codegen/src/wit.rs +++ b/worker-codegen/src/wit.rs @@ -85,7 +85,7 @@ fn expand_trait(interface: &Interface, interface_name: &Ident) -> anyhow::Result fn expand_struct(struct_name: &Ident, sys_name: &Ident) -> anyhow::Result { let struct_raw = quote!( - pub struct #struct_name(::worker::send::SendWrapper); + pub struct #struct_name(sys::#sys_name); ); let struct_item: syn::ItemStruct = syn::parse2(struct_raw)?; Ok(struct_item) @@ -95,7 +95,7 @@ fn expand_from_impl(struct_name: &Ident, from_type: &syn::Type) -> anyhow::Resul let impl_raw = quote!( impl From<#from_type> for #struct_name { fn from(fetcher: #from_type) -> Self { - Self(::worker::send::SendWrapper::new(fetcher.into_rpc())) + Self(fetcher.into_rpc()) } } ); @@ -184,7 +184,7 @@ fn expand_sys_module(interface: &Interface, sys_name: &Ident) -> anyhow::Result< let mod_raw = quote!( mod sys { - use ::wasm_bindgen::prelude::*; + use wasm_bindgen::prelude::*; } ); let mut mod_item: syn::ItemMod = syn::parse2(mod_raw)?; diff --git a/worker-macros/src/lib.rs b/worker-macros/src/lib.rs index c6f2cd376..ba3c67be4 100644 --- a/worker-macros/src/lib.rs +++ b/worker-macros/src/lib.rs @@ -119,19 +119,14 @@ pub fn event(attr: TokenStream, item: TokenStream) -> TokenStream { } #[proc_macro_attribute] -/// Convert an async function which is `!Send` to be `Send`. -/// -/// This is useful for implementing async handlers in frameworks which -/// expect the handler to be `Send`, such as `axum`. -/// -/// ```rust -/// #[worker::send] -/// async fn foo() { -/// // JsFuture is !Send -/// let fut = JsFuture::from(promise); -/// fut.await -/// } -/// ``` +#[deprecated( + since = "0.8.0", + note = "JsValue types are now Send in wasm-bindgen. #[worker::send] is no longer needed for most async handlers." +)] +/// Deprecated: Convert an async function which is `!Send` to be `Send`. +/// +/// With recent versions of `wasm-bindgen`, `JsValue` types are `Send`, +/// so this macro is no longer needed for most async handlers. pub fn send(attr: TokenStream, stream: TokenStream) -> TokenStream { send::expand_macro(attr, stream) } diff --git a/worker/src/cache.rs b/worker/src/cache.rs index 6d300fa07..e6c66ec01 100644 --- a/worker/src/cache.rs +++ b/worker/src/cache.rs @@ -1,5 +1,6 @@ use std::convert::TryInto; +use crate::send::SendFuture; use serde::Serialize; use wasm_bindgen::JsCast; use wasm_bindgen_futures::JsFuture; @@ -55,7 +56,7 @@ impl Cache { // unwrap is safe because this promise never rejects // https://developer.mozilla.org/en-US/docs/Web/API/CacheStorage/open - let inner = JsFuture::from(cache).await.unwrap().into(); + let inner = SendFuture::new(JsFuture::from(cache)).await.unwrap().into(); Self { inner } } @@ -80,7 +81,7 @@ impl Cache { .inner .put_with_request(&request.try_into()?, &response.into()), }; - let _ = JsFuture::from(promise).await?; + let _ = SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } @@ -116,7 +117,7 @@ impl Cache { }; // `match` returns either a response or undefined - let result = JsFuture::from(promise).await?; + let result = SendFuture::new(JsFuture::from(promise)).await?; if result.is_undefined() { Ok(None) } else { @@ -147,7 +148,7 @@ impl Cache { .inner .delete_with_request_and_options(&request.try_into()?, &options), }; - let result = JsFuture::from(promise).await?; + let result = SendFuture::new(JsFuture::from(promise)).await?; // Unwrap is safe because we know this is a boolean // https://developers.cloudflare.com/workers/runtime-apis/cache#delete diff --git a/worker/src/container.rs b/worker/src/container.rs index 2378218d3..d94722dd4 100644 --- a/worker/src/container.rs +++ b/worker/src/container.rs @@ -1,3 +1,4 @@ +use crate::send::SendFuture; use js_sys::{Map, Object, Reflect}; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; @@ -24,13 +25,13 @@ impl Container { pub async fn wait_for_exit(&self) -> Result<()> { let promise = self.inner.monitor(); - JsFuture::from(promise).await?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } pub async fn destroy(&self, error: Option<&str>) -> Result<()> { let promise = self.inner.destroy(error); - JsFuture::from(promise).await?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } diff --git a/worker/src/d1/mod.rs b/worker/src/d1/mod.rs index e27046804..5aaaa47ca 100644 --- a/worker/src/d1/mod.rs +++ b/worker/src/d1/mod.rs @@ -4,6 +4,7 @@ use std::iter::{once, Once}; use std::ops::Deref; use std::result::Result as StdResult; +use crate::send::SendFuture; use js_sys::Array; use js_sys::ArrayBuffer; use js_sys::JsString; @@ -39,7 +40,7 @@ impl D1Database { /// Dump the data in the database to a `Vec`. pub async fn dump(&self) -> Result> { - let result = JsFuture::from(self.0.dump()?).await; + let result = SendFuture::new(JsFuture::from(self.0.dump()?)).await; let array_buffer = cast_to_d1_error(result)?; let array_buffer = array_buffer.dyn_into::()?; let array = Uint8Array::new(&array_buffer); @@ -51,7 +52,7 @@ impl D1Database { /// Returns the results in the same order as the provided statements. pub async fn batch(&self, statements: Vec) -> Result> { let statements = statements.into_iter().map(|s| s.0).collect::(); - let results = JsFuture::from(self.0.batch(statements)?).await; + let results = SendFuture::new(JsFuture::from(self.0.batch(statements)?)).await; let results = cast_to_d1_error(results)?; let results = results.dyn_into::()?; let mut vec = Vec::with_capacity(results.length() as usize); @@ -75,7 +76,7 @@ impl D1Database { /// If an error occurs, an exception is thrown with the query and error /// messages, execution stops and further statements are not executed. pub async fn exec(&self, query: &str) -> Result { - let result = JsFuture::from(self.0.exec(query)?).await; + let result = SendFuture::new(JsFuture::from(self.0.exec(query)?)).await; let result = cast_to_d1_error(result)?; Ok(result.into()) } @@ -287,7 +288,7 @@ impl D1PreparedStatement { where T: for<'a> Deserialize<'a>, { - let result = JsFuture::from(self.0.first(col_name)?).await; + let result = SendFuture::new(JsFuture::from(self.0.first(col_name)?)).await; let js_value = cast_to_d1_error(result)?; let value = serde_wasm_bindgen::from_value(js_value)?; Ok(value) @@ -295,14 +296,14 @@ impl D1PreparedStatement { /// Executes a query against the database but only return metadata. pub async fn run(&self) -> Result { - let result = JsFuture::from(self.0.run()?).await; + let result = SendFuture::new(JsFuture::from(self.0.run()?)).await; let result = cast_to_d1_error(result)?; Ok(D1Result(result.into())) } /// Executes a query against the database and returns all rows and metadata. pub async fn all(&self) -> Result { - let result = JsFuture::from(self.0.all()?).await?; + let result = SendFuture::new(JsFuture::from(self.0.all()?)).await?; Ok(D1Result(result.into())) } @@ -311,7 +312,7 @@ impl D1PreparedStatement { where T: for<'a> Deserialize<'a>, { - let result = JsFuture::from(self.0.raw()?).await; + let result = SendFuture::new(JsFuture::from(self.0.raw()?)).await; let result = cast_to_d1_error(result)?; let result = result.dyn_into::()?; let mut vec = Vec::with_capacity(result.length() as usize); @@ -324,7 +325,7 @@ impl D1PreparedStatement { /// Executes a query against the database and returns a `Vec` of JsValues. pub async fn raw_js_value(&self) -> Result> { - let result = JsFuture::from(self.0.raw()?).await; + let result = SendFuture::new(JsFuture::from(self.0.raw()?)).await; let result = cast_to_d1_error(result)?; let array = result.dyn_into::()?; diff --git a/worker/src/delay.rs b/worker/src/delay.rs index de98ec204..0825f8944 100644 --- a/worker/src/delay.rs +++ b/worker/src/delay.rs @@ -98,3 +98,7 @@ impl PinnedDrop for Delay { } } } + +/// SAFETY: Cloudflare Workers runtime is single-threaded, so it's safe to mark Delay as Send +/// even though it contains Rc>. +unsafe impl Send for Delay {} diff --git a/worker/src/durable.rs b/worker/src/durable.rs index 446866b49..4bee0c80f 100644 --- a/worker/src/durable.rs +++ b/worker/src/durable.rs @@ -35,6 +35,8 @@ use worker_sys::{ // use wasm_bindgen_futures::future_to_promise; use wasm_bindgen_futures::{future_to_promise, JsFuture}; +use crate::send::SendFuture; + /// A Durable Object stub is a client object used to send requests to a remote Durable Object. #[derive(Debug)] pub struct Stub { @@ -48,14 +50,14 @@ impl Stub { /// Send an internal Request to the Durable Object to which the stub points. pub async fn fetch_with_request(&self, req: Request) -> Result { let promise = self.inner.fetch_with_request(req.inner())?; - let response = JsFuture::from(promise).await?; + let response = SendFuture::new(JsFuture::from(promise)).await?; Ok(response.dyn_into::()?.into()) } /// Construct a Request from a URL to the Durable Object to which the stub points. pub async fn fetch_with_str(&self, url: &str) -> Result { let promise = self.inner.fetch_with_str(url)?; - let response = JsFuture::from(promise).await?; + let response = SendFuture::new(JsFuture::from(promise)).await?; Ok(response.dyn_into::()?.into()) } @@ -346,7 +348,7 @@ impl Storage { /// /// Returns `Ok(None)` if the key does not exist. pub async fn get(&self, key: &str) -> Result> { - let res = match JsFuture::from(self.inner.get(key)?).await { + let res = match SendFuture::new(JsFuture::from(self.inner.get(key)?)).await { // If we successfully retrived `undefined`, that means the key doesn't exist Ok(val) if val.is_undefined() => Ok(None), // Otherwise deserialize whatever we successfully received @@ -367,7 +369,7 @@ impl Storage { .map(|key| JsValue::from(key.deref())) .collect(), )?; - let keys = JsFuture::from(keys).await?; + let keys = SendFuture::new(JsFuture::from(keys)).await?; keys.dyn_into::().map_err(Error::from) } @@ -378,7 +380,7 @@ impl Storage { } pub async fn put_raw(&self, key: &str, value: impl Into) -> Result<()> { - JsFuture::from(self.inner.put(key, value.into())?) + SendFuture::new(JsFuture::from(self.inner.put(key, value.into())?)) .await .map_err(Error::from) .map(|_| ()) @@ -407,7 +409,7 @@ impl Storage { /// storage.put_multiple_raw(obj); /// ``` pub async fn put_multiple_raw(&self, values: Object) -> Result<()> { - JsFuture::from(self.inner.put_multiple(values.into())?) + SendFuture::new(JsFuture::from(self.inner.put_multiple(values.into())?)) .await .map_err(Error::from) .map(|_| ()) @@ -415,7 +417,7 @@ impl Storage { /// Deletes the key and associated value. Returns true if the key existed or false if it didn't. pub async fn delete(&self, key: &str) -> Result { - let fut: JsFuture = self.inner.delete(key)?.into(); + let fut = SendFuture::new(JsFuture::from(self.inner.delete(key)?)); fut.await .and_then(|jsv| { jsv.as_bool() @@ -427,14 +429,13 @@ impl Storage { /// Deletes the provided keys and their associated values. Returns a count of the number of /// key-value pairs deleted. pub async fn delete_multiple(&self, keys: Vec>) -> Result { - let fut: JsFuture = self - .inner - .delete_multiple( + let fut = SendFuture::new(JsFuture::from( + self.inner.delete_multiple( keys.into_iter() .map(|key| JsValue::from(key.deref())) .collect(), - )? - .into(); + )?, + )); fut.await .and_then(|jsv| { jsv.as_f64() @@ -448,7 +449,7 @@ impl Storage { /// Durable Object. In the event of a failure while the operation is still in flight, it may be /// that only a subset of the data is properly deleted. pub async fn delete_all(&self) -> Result<()> { - let fut: JsFuture = self.inner.delete_all()?.into(); + let fut = SendFuture::new(JsFuture::from(self.inner.delete_all()?)); fut.await.map(|_| ()).map_err(Error::from) } @@ -460,7 +461,7 @@ impl Storage { /// potentially hitting its [limit](https://developers.cloudflare.com/workers/platform/limits#durable-objects-limits). /// If that is a concern, use the alternate `list_with_options()` method. pub async fn list(&self) -> Result { - let fut: JsFuture = self.inner.list()?.into(); + let fut = SendFuture::new(JsFuture::from(self.inner.list()?)); fut.await .and_then(|jsv| jsv.dyn_into()) .map_err(Error::from) @@ -469,10 +470,10 @@ impl Storage { /// Returns keys associated with the current Durable Object according to the parameters in the /// provided options object. pub async fn list_with_options(&self, opts: ListOptions<'_>) -> Result { - let fut: JsFuture = self - .inner - .list_with_options(serde_wasm_bindgen::to_value(&opts)?.into())? - .into(); + let fut = SendFuture::new(JsFuture::from( + self.inner + .list_with_options(serde_wasm_bindgen::to_value(&opts)?.into())?, + )); fut.await .and_then(|jsv| jsv.dyn_into()) .map_err(Error::from) @@ -482,17 +483,17 @@ impl Storage { /// The alarm is considered to be set if it has not started, or if it has failed /// and any retry has not begun. If no alarm is set, `get_alarm()` returns `None`. pub async fn get_alarm(&self) -> Result> { - let fut: JsFuture = self.inner.get_alarm(JsValue::NULL.into())?.into(); + let fut = SendFuture::new(JsFuture::from(self.inner.get_alarm(JsValue::NULL.into())?)); fut.await .map(|jsv| jsv.as_f64().map(|f| f as i64)) .map_err(Error::from) } pub async fn get_alarm_with_options(&self, options: GetAlarmOptions) -> Result> { - let fut: JsFuture = self - .inner - .get_alarm(serde_wasm_bindgen::to_value(&options)?.into())? - .into(); + let fut = SendFuture::new(JsFuture::from( + self.inner + .get_alarm(serde_wasm_bindgen::to_value(&options)?.into())?, + )); fut.await .map(|jsv| jsv.as_f64().map(|f| f as i64)) .map_err(Error::from) @@ -507,10 +508,10 @@ impl Storage { /// a few milliseconds after the set time, but can be delayed by up to a minute /// due to maintenance or failures while failover takes place. pub async fn set_alarm(&self, scheduled_time: impl Into) -> Result<()> { - let fut: JsFuture = self - .inner - .set_alarm(scheduled_time.into().schedule(), JsValue::NULL.into())? - .into(); + let fut = SendFuture::new(JsFuture::from( + self.inner + .set_alarm(scheduled_time.into().schedule(), JsValue::NULL.into())?, + )); fut.await.map(|_| ()).map_err(Error::from) } @@ -519,28 +520,27 @@ impl Storage { scheduled_time: impl Into, options: SetAlarmOptions, ) -> Result<()> { - let fut: JsFuture = self - .inner - .set_alarm( - scheduled_time.into().schedule(), - serde_wasm_bindgen::to_value(&options)?.into(), - )? - .into(); + let fut = SendFuture::new(JsFuture::from(self.inner.set_alarm( + scheduled_time.into().schedule(), + serde_wasm_bindgen::to_value(&options)?.into(), + )?)); fut.await.map(|_| ()).map_err(Error::from) } /// Deletes the alarm if one exists. Does not cancel the alarm handler if it is /// currently executing. pub async fn delete_alarm(&self) -> Result<()> { - let fut: JsFuture = self.inner.delete_alarm(JsValue::NULL.into())?.into(); + let fut = SendFuture::new(JsFuture::from( + self.inner.delete_alarm(JsValue::NULL.into())?, + )); fut.await.map(|_| ()).map_err(Error::from) } pub async fn delete_alarm_with_options(&self, options: SetAlarmOptions) -> Result<()> { - let fut: JsFuture = self - .inner - .delete_alarm(serde_wasm_bindgen::to_value(&options)?.into())? - .into(); + let fut = SendFuture::new(JsFuture::from( + self.inner + .delete_alarm(serde_wasm_bindgen::to_value(&options)?.into())?, + )); fut.await.map(|_| ()).map_err(Error::from) } @@ -559,7 +559,7 @@ impl Storage { })) }); let clos = wasm_bindgen::closure::Closure::once_assert_unwind_safe(inner); - JsFuture::from(self.inner.transaction(&clos)?) + SendFuture::new(JsFuture::from(self.inner.transaction(&clos)?)) .await .map_err(Error::from) .map(|_| ()) @@ -578,7 +578,7 @@ pub struct Transaction { impl Transaction { pub async fn get(&self, key: &str) -> Result { - JsFuture::from(self.inner.get(key)?) + SendFuture::new(JsFuture::from(self.inner.get(key)?)) .await .and_then(|val| { if val.is_undefined() { @@ -596,15 +596,17 @@ impl Transaction { .map(|key| JsValue::from(key.deref())) .collect(), )?; - let keys = JsFuture::from(keys).await?; + let keys = SendFuture::new(JsFuture::from(keys)).await?; keys.dyn_into::().map_err(Error::from) } pub async fn put(&self, key: &str, value: T) -> Result<()> { - JsFuture::from(self.inner.put(key, serde_wasm_bindgen::to_value(&value)?)?) - .await - .map_err(Error::from) - .map(|_| ()) + SendFuture::new(JsFuture::from( + self.inner.put(key, serde_wasm_bindgen::to_value(&value)?)?, + )) + .await + .map_err(Error::from) + .map(|_| ()) } // Each key-value pair in the serialized object will be added to the storage @@ -613,14 +615,14 @@ impl Transaction { if !values.is_object() { return Err("Must pass in a struct type".to_string().into()); } - JsFuture::from(self.inner.put_multiple(values)?) + SendFuture::new(JsFuture::from(self.inner.put_multiple(values)?)) .await .map_err(Error::from) .map(|_| ()) } pub async fn delete(&self, key: &str) -> Result { - let fut: JsFuture = self.inner.delete(key)?.into(); + let fut = SendFuture::new(JsFuture::from(self.inner.delete(key)?)); fut.await .and_then(|jsv| { jsv.as_bool() @@ -630,14 +632,13 @@ impl Transaction { } pub async fn delete_multiple(&self, keys: Vec>) -> Result { - let fut: JsFuture = self - .inner - .delete_multiple( + let fut = SendFuture::new(JsFuture::from( + self.inner.delete_multiple( keys.into_iter() .map(|key| JsValue::from(key.deref())) .collect(), - )? - .into(); + )?, + )); fut.await .and_then(|jsv| { jsv.as_f64() @@ -648,22 +649,22 @@ impl Transaction { } pub async fn delete_all(&self) -> Result<()> { - let fut: JsFuture = self.inner.delete_all()?.into(); + let fut = SendFuture::new(JsFuture::from(self.inner.delete_all()?)); fut.await.map(|_| ()).map_err(Error::from) } pub async fn list(&self) -> Result { - let fut: JsFuture = self.inner.list()?.into(); + let fut = SendFuture::new(JsFuture::from(self.inner.list()?)); fut.await .and_then(|jsv| jsv.dyn_into()) .map_err(Error::from) } pub async fn list_with_options(&self, opts: ListOptions<'_>) -> Result { - let fut: JsFuture = self - .inner - .list_with_options(serde_wasm_bindgen::to_value(&opts)?.into())? - .into(); + let fut = SendFuture::new(JsFuture::from( + self.inner + .list_with_options(serde_wasm_bindgen::to_value(&opts)?.into())?, + )); fut.await .and_then(|jsv| jsv.dyn_into()) .map_err(Error::from) diff --git a/worker/src/fetcher.rs b/worker/src/fetcher.rs index f72231715..2ea6f1dd2 100644 --- a/worker/src/fetcher.rs +++ b/worker/src/fetcher.rs @@ -1,4 +1,4 @@ -use crate::{env::EnvBinding, RequestInit, Result}; +use crate::{env::EnvBinding, send::SendFuture, RequestInit, Result}; use std::convert::TryInto; use wasm_bindgen::{JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; @@ -31,7 +31,8 @@ impl Fetcher { None => self.0.fetch_with_str(&path), }?; - let resp_sys: web_sys::Response = JsFuture::from(promise).await?.dyn_into()?; + let resp_sys: web_sys::Response = + SendFuture::new(JsFuture::from(promise)).await?.dyn_into()?; #[cfg(not(feature = "http"))] let result = Ok(Response::from(resp_sys)); #[cfg(feature = "http")] @@ -52,7 +53,8 @@ impl Fetcher { { let req = request.try_into()?; let promise = self.0.fetch(req.inner())?; - let resp_sys: web_sys::Response = JsFuture::from(promise).await?.dyn_into()?; + let resp_sys: web_sys::Response = + SendFuture::new(JsFuture::from(promise)).await?.dyn_into()?; let response = Response::from(resp_sys); #[cfg(feature = "http")] let result = response.try_into(); diff --git a/worker/src/formdata.rs b/worker/src/formdata.rs index afab879f0..44c58d460 100644 --- a/worker/src/formdata.rs +++ b/worker/src/formdata.rs @@ -5,6 +5,7 @@ use crate::Date; use crate::DateInit; use crate::Result; +use crate::send::SendFuture; use js_sys::Array; use js_sys::Uint8Array; use wasm_bindgen::prelude::*; @@ -172,7 +173,7 @@ impl File { /// Read the file from an internal buffer and get the resulting bytes. pub async fn bytes(&self) -> Result> { - JsFuture::from(self.0.array_buffer()) + SendFuture::new(JsFuture::from(self.0.array_buffer())) .await .map(|val| js_sys::Uint8Array::new(&val).to_vec()) .map_err(|e| { diff --git a/worker/src/global.rs b/worker/src/global.rs index 6d6462ec3..c75125f5b 100644 --- a/worker/src/global.rs +++ b/worker/src/global.rs @@ -1,5 +1,6 @@ use std::ops::Deref; +use crate::send::SendFuture; use wasm_bindgen::JsCast; use wasm_bindgen_futures::JsFuture; @@ -37,7 +38,7 @@ async fn fetch_with_str(url: &str, signal: Option<&AbortSignal>) -> Result) -> let worker: web_sys::WorkerGlobalScope = js_sys::global().unchecked_into(); let req = request.inner(); let promise = worker.fetch_with_request_and_init(req, &init); - let resp = JsFuture::from(promise).await?; + let resp = SendFuture::new(JsFuture::from(promise)).await?; let edge_response: web_sys::Response = resp.dyn_into()?; Ok(edge_response.into()) } diff --git a/worker/src/kv/builder.rs b/worker/src/kv/builder.rs index 4a2beae92..2f0b3d334 100644 --- a/worker/src/kv/builder.rs +++ b/worker/src/kv/builder.rs @@ -1,3 +1,4 @@ +use crate::send::SendFuture; use js_sys::{ArrayBuffer, Function, Object, Promise, Uint8Array}; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; @@ -63,7 +64,7 @@ impl PutOptionsBuilder { .put_function .call3(&self.this, &self.name, &self.value, &options_object)? .into(); - JsFuture::from(promise) + SendFuture::new(JsFuture::from(promise)) .await .map(|_| ()) .map_err(KvError::from) @@ -124,7 +125,7 @@ impl ListOptionsBuilder { .call1(&self.this, &options_object)? .into(); - let value = JsFuture::from(promise).await?; + let value = SendFuture::new(JsFuture::from(promise)).await?; let resp = serde_wasm_bindgen::from_value(value).map_err(JsValue::from)?; Ok(resp) } @@ -185,7 +186,9 @@ impl GetOptionsBuilder { .get_function .call2(&self.this, &self.name, &options_object)? .into(); - JsFuture::from(promise).await.map_err(KvError::from) + SendFuture::new(JsFuture::from(promise)) + .await + .map_err(KvError::from) } /// Gets the value as a string. @@ -230,7 +233,7 @@ impl GetOptionsBuilder { .call2(&self.this, &self.name, &options_object)? .into(); - let pair = JsFuture::from(promise).await?; + let pair = SendFuture::new(JsFuture::from(promise)).await?; let metadata = crate::kv::get(&pair, "metadata")?; let value = crate::kv::get(&pair, "value")?; diff --git a/worker/src/kv/mod.rs b/worker/src/kv/mod.rs index af49b66a9..b130f25c3 100644 --- a/worker/src/kv/mod.rs +++ b/worker/src/kv/mod.rs @@ -20,6 +20,7 @@ mod builder; pub use builder::*; +use crate::send::SendFuture; use js_sys::{global, Function, Object, Promise, Reflect, Uint8Array}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -137,7 +138,7 @@ impl KvStore { pub async fn delete(&self, name: &str) -> Result<(), KvError> { let name = JsValue::from(name); let promise: Promise = self.delete_function.call1(&self.this, &name)?.into(); - JsFuture::from(promise).await?; + SendFuture::new(JsFuture::from(promise)).await?; Ok(()) } } diff --git a/worker/src/lib.rs b/worker/src/lib.rs index 174427495..a18eeeb6a 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -56,51 +56,18 @@ //! We also implement `try_from` between `worker::Request` and `http::Request`, and between `worker::Response` and `http::Response`. //! This allows you to convert your code incrementally if it is tightly coupled to the original types. //! -//! ### `Send` Helpers +//! ### `Send` //! -//! A number of frameworks (including `axum`) require that objects that they are given (including route handlers) can be -//! sent between threads (i.e are marked as `Send`). Unfortuntately, objects which interact with JavaScript are frequently -//! not marked as `Send`. In the Workers environment, this is not an issue, because Workers are single threaded. There are still -//! some ergonomic difficulties which we address with some wrapper types: +//! All JavaScript value types (`JsValue` and friends) are now `Send` in `wasm-bindgen`, +//! so worker types can be freely shared across async contexts without wrappers. //! -//! 1. [`send::SendFuture`] - wraps any `Future` and marks it as `Send`: +//! The one exception is `JsFuture`, which still uses `Rc` internally and is not `Send`. +//! The [`send::SendFuture`] wrapper is provided for internal use and for any advanced cases +//! where you need to wrap a `!Send` future in a `Send` context: //! //! ```rust -//! // `fut` is `Send` -//! let fut = send::SendFuture::new(async move { -//! // `JsFuture` is not `Send` -//! JsFuture::from(promise).await -//! }); -//! ``` -//! -//! 2. [`send::SendWrapper`] - Marks an arbitrary object as `Send` and implements `Deref` and `DerefMut`, as well as `Clone`, `Debug`, and `Display` if the -//! inner type does. This is useful for attaching types as state to an `axum` `Router`: -//! -//! ```rust -//! // `KvStore` is not `Send` -//! let store = env.kv("FOO")?; -//! // `state` is `Send` -//! let state = send::SendWrapper::new(store); -//! let router = axum::Router::new() -//! .layer(Extension(state)); -//! ``` -//! -//! 3. [`[worker::send]`](macro@crate::send) - Macro to make any `async` function `Send`. This can be a little tricky to identify as the problem, but -//! `axum`'s `[debug_handler]` macro can help, and looking for warnings that a function or object cannot safely be sent -//! between threads. -//! -//! ```rust -//! // This macro makes the whole function (i.e. the `Future` it returns) `Send`. -//! #[worker::send] -//! async fn handler(Extension(env): Extension) -> Response { -//! let kv = env.kv("FOO").unwrap()?; -//! // Holding `kv`, which is not `Send` across `await` boundary would mark this function as `!Send` -//! let value = kv.get("foo").text().await?; -//! Ok(format!("Got value: {:?}", value)); -//! } -//! -//! let router = axum::Router::new() -//! .route("/", get(handler)) +//! let fut = send::SendFuture::new(JsFuture::from(promise)); +//! fut.await //! ``` //! //! # RPC Support diff --git a/worker/src/queue.rs b/worker/src/queue.rs index 75529cdf8..27ed7b40d 100644 --- a/worker/src/queue.rs +++ b/worker/src/queue.rs @@ -10,6 +10,8 @@ use wasm_bindgen::{prelude::*, JsCast}; use wasm_bindgen_futures::JsFuture; use worker_sys::{Message as MessageSys, MessageBatch as MessageBatchSys, Queue as EdgeQueue}; +use crate::send::SendFuture; + /// A batch of messages that are sent to a consumer Worker. #[derive(Debug)] pub struct MessageBatch { @@ -616,7 +618,7 @@ impl Queue { None => JsValue::null(), }; - let fut: JsFuture = self.0.send(message.message, options)?.into(); + let fut = SendFuture::new(JsFuture::from(self.0.send(message.message, options)?)); fut.await.map_err(Error::from)?; Ok(()) } @@ -684,7 +686,9 @@ impl Queue { }) .collect::>()?; - let fut: JsFuture = self.0.send_batch(messages, batch_send_options)?.into(); + let fut = SendFuture::new(JsFuture::from( + self.0.send_batch(messages, batch_send_options)?, + )); fut.await.map_err(Error::from)?; Ok(()) } diff --git a/worker/src/r2/builder.rs b/worker/src/r2/builder.rs index e339695ff..d41bb7412 100644 --- a/worker/src/r2/builder.rs +++ b/worker/src/r2/builder.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, convert::TryFrom}; +use crate::send::SendFuture; use js_sys::{Array, Date as JsDate, JsString, Object as JsObject, Uint8Array}; use wasm_bindgen::{JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; @@ -48,7 +49,7 @@ impl GetOptionsBuilder<'_> { .into(), )?; - let value = JsFuture::from(get_promise).await?; + let value = SendFuture::new(JsFuture::from(get_promise)).await?; if value.is_null() { return Ok(None); @@ -248,7 +249,7 @@ impl PutOptionsBuilder<'_> { } .into(), )?; - let res: EdgeR2Object = JsFuture::from(put_promise).await?.into(); + let res: EdgeR2Object = SendFuture::new(JsFuture::from(put_promise)).await?.into(); let inner = if JsString::from("bodyUsed").js_in(&res) { ObjectInner::Body(res.unchecked_into()) } else { @@ -302,9 +303,10 @@ impl CreateMultipartUploadOptionsBuilder<'_> { } .into(), )?; - let inner: EdgeR2MultipartUpload = JsFuture::from(create_multipart_upload_promise) - .await? - .into(); + let inner: EdgeR2MultipartUpload = + SendFuture::new(JsFuture::from(create_multipart_upload_promise)) + .await? + .into(); Ok(MultipartUpload { inner }) } @@ -437,7 +439,7 @@ impl ListOptionsBuilder<'_> { } .into(), )?; - let inner = JsFuture::from(list_promise).await?.into(); + let inner = SendFuture::new(JsFuture::from(list_promise)).await?.into(); Ok(Objects { inner }) } } diff --git a/worker/src/r2/mod.rs b/worker/src/r2/mod.rs index 8b1dab9ee..4ec3e8ac9 100644 --- a/worker/src/r2/mod.rs +++ b/worker/src/r2/mod.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, convert::TryInto, ops::Deref}; pub use builder::*; +use crate::send::SendFuture; use js_sys::{JsString, Reflect, Uint8Array}; use wasm_bindgen::{JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; @@ -28,7 +29,7 @@ impl Bucket { /// Retrieves the [Object] for the given key containing only object metadata, if the key exists. pub async fn head(&self, key: impl Into) -> Result> { let head_promise = self.inner.head(key.into())?; - let value = JsFuture::from(head_promise).await?; + let value = SendFuture::new(JsFuture::from(head_promise)).await?; if value.is_null() { return Ok(None); @@ -75,7 +76,7 @@ impl Bucket { /// operations will no longer see this key value pair globally. pub async fn delete(&self, key: impl Into) -> Result<()> { let delete_promise = self.inner.delete(key.into())?; - JsFuture::from(delete_promise).await?; + SendFuture::new(JsFuture::from(delete_promise)).await?; Ok(()) } @@ -88,10 +89,10 @@ impl Bucket { /// /// Up to 1000 keys may be deleted per call. pub async fn delete_multiple(&self, keys: Vec>) -> Result<()> { - let fut: JsFuture = self - .inner - .delete_multiple(keys.into_iter().map(|key| JsValue::from(&*key)).collect())? - .into(); + let fut = SendFuture::new(JsFuture::from( + self.inner + .delete_multiple(keys.into_iter().map(|key| JsValue::from(&*key)).collect())?, + )); fut.await?; Ok(()) } @@ -328,7 +329,7 @@ impl ObjectBody<'_> { } pub async fn bytes(self) -> Result> { - let js_buffer = JsFuture::from(self.inner.array_buffer()?).await?; + let js_buffer = SendFuture::new(JsFuture::from(self.inner.array_buffer()?)).await?; let js_buffer = Uint8Array::new(&js_buffer); let mut bytes = vec![0; js_buffer.length() as usize]; js_buffer.copy_to(&mut bytes); @@ -396,8 +397,10 @@ impl MultipartUpload { part_number: u16, value: impl Into, ) -> Result { - let uploaded_part = - JsFuture::from(self.inner.upload_part(part_number, value.into().into())?).await?; + let uploaded_part = SendFuture::new(JsFuture::from( + self.inner.upload_part(part_number, value.into().into())?, + )) + .await?; Ok(UploadedPart { inner: uploaded_part.into(), }) @@ -410,7 +413,7 @@ impl MultipartUpload { /// Aborts the multipart upload. pub async fn abort(&self) -> Result<()> { - JsFuture::from(self.inner.abort()?).await?; + SendFuture::new(JsFuture::from(self.inner.abort()?)).await?; Ok(()) } @@ -420,14 +423,14 @@ impl MultipartUpload { self, uploaded_parts: impl IntoIterator, ) -> Result { - let object = JsFuture::from( + let object = SendFuture::new(JsFuture::from( self.inner.complete( uploaded_parts .into_iter() .map(|part| part.inner.into()) .collect(), )?, - ) + )) .await?; Ok(Object { inner: ObjectInner::Body(object.into()), diff --git a/worker/src/rate_limit.rs b/worker/src/rate_limit.rs index 5f381fa9c..f0d46ee6a 100644 --- a/worker/src/rate_limit.rs +++ b/worker/src/rate_limit.rs @@ -17,9 +17,6 @@ pub struct RateLimitOutcome { pub success: bool, } -unsafe impl Send for RateLimiter {} -unsafe impl Sync for RateLimiter {} - impl EnvBinding for RateLimiter { const TYPE_NAME: &'static str = "Ratelimit"; } diff --git a/worker/src/request.rs b/worker/src/request.rs index 6be8a7b80..7858152a1 100644 --- a/worker/src/request.rs +++ b/worker/src/request.rs @@ -4,6 +4,7 @@ use crate::{ cf::Cf, error::Error, headers::Headers, http::Method, ByteStream, FormData, RequestInit, Result, }; +use crate::send::SendFuture; use serde::de::DeserializeOwned; #[cfg(test)] use std::borrow::Cow; @@ -122,7 +123,7 @@ impl Request { pub async fn json(&mut self) -> Result { if !self.body_used { self.body_used = true; - return JsFuture::from(self.edge_request.json()?) + return SendFuture::new(JsFuture::from(self.edge_request.json()?)) .await .map_err(|e| { Error::JsError( @@ -140,7 +141,7 @@ impl Request { pub async fn text(&mut self) -> Result { if !self.body_used { self.body_used = true; - return JsFuture::from(self.edge_request.text()?) + return SendFuture::new(JsFuture::from(self.edge_request.text()?)) .await .map(|val| val.as_string().unwrap()) .map_err(|e| { @@ -158,7 +159,7 @@ impl Request { pub async fn bytes(&mut self) -> Result> { if !self.body_used { self.body_used = true; - return JsFuture::from(self.edge_request.array_buffer()?) + return SendFuture::new(JsFuture::from(self.edge_request.array_buffer()?)) .await .map(|val| js_sys::Uint8Array::new(&val).to_vec()) .map_err(|e| { @@ -176,7 +177,7 @@ impl Request { pub async fn form_data(&mut self) -> Result { if !self.body_used { self.body_used = true; - return JsFuture::from(self.edge_request.form_data()?) + return SendFuture::new(JsFuture::from(self.edge_request.form_data()?)) .await .map(|val| val.into()) .map_err(|e| { diff --git a/worker/src/secret_store.rs b/worker/src/secret_store.rs index d59b3b67d..fb31d22ff 100644 --- a/worker/src/secret_store.rs +++ b/worker/src/secret_store.rs @@ -1,8 +1,4 @@ -use crate::{ - error::Error, - send::{SendFuture, SendWrapper}, - EnvBinding, Result, -}; +use crate::{error::Error, send::SendFuture, EnvBinding, Result}; use wasm_bindgen::{JsCast, JsValue}; use wasm_bindgen_futures::JsFuture; @@ -12,11 +8,7 @@ use wasm_bindgen_futures::JsFuture; /// secret (identified by `store_id` + `secret_name`) to a binding name. /// Use [`SecretStore::get`] to retrieve the secret value. #[derive(Debug, Clone)] -pub struct SecretStore(SendWrapper); - -// Workers will never allow multithreading. -unsafe impl Send for SecretStore {} -unsafe impl Sync for SecretStore {} +pub struct SecretStore(worker_sys::SecretStoreSys); impl EnvBinding for SecretStore { const TYPE_NAME: &'static str = "Fetcher"; @@ -28,7 +20,7 @@ impl JsCast for SecretStore { } fn unchecked_from_js(val: JsValue) -> Self { - Self(SendWrapper::new(val.unchecked_into())) + Self(val.unchecked_into()) } fn unchecked_from_js_ref(val: &JsValue) -> &Self { diff --git a/worker/src/send.rs b/worker/src/send.rs index b565c9b9a..e3919afaa 100644 --- a/worker/src/send.rs +++ b/worker/src/send.rs @@ -1,7 +1,7 @@ -//! This module provides utilities for working with JavaScript types -//! which do not implement `Send`, in contexts where `Send` is required. +//! This module provides utilities for wrapping `!Send` futures +//! in contexts where `Send` is required. //! Workers is guaranteed to be single-threaded, so it is safe to -//! wrap any type with `Send` and `Sync` traits. +//! wrap any future with `Send`. use futures_util::future::Future; use pin_project::pin_project; @@ -65,23 +65,27 @@ where } } -/// Wrap any type to make it `Send`. -/// -/// ```rust -/// // js_sys::Promise is !Send -/// let send_promise = SendWrapper::new(promise); -/// ``` +/// Deprecated: `JsValue` types are now `Send` in `wasm-bindgen`, so `SendWrapper` is no longer +/// needed. Simply use the inner type directly. +#[deprecated( + since = "0.8.0", + note = "JsValue types are now Send in wasm-bindgen. Use the inner type directly." +)] pub struct SendWrapper(pub T); +#[allow(deprecated)] unsafe impl Send for SendWrapper {} +#[allow(deprecated)] unsafe impl Sync for SendWrapper {} +#[allow(deprecated)] impl SendWrapper { pub fn new(inner: T) -> Self { Self(inner) } } +#[allow(deprecated)] impl std::ops::Deref for SendWrapper { type Target = T; @@ -90,30 +94,35 @@ impl std::ops::Deref for SendWrapper { } } +#[allow(deprecated)] impl std::ops::DerefMut for SendWrapper { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } +#[allow(deprecated)] impl Debug for SendWrapper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "SendWrapper({:?})", self.0) } } +#[allow(deprecated)] impl Clone for SendWrapper { fn clone(&self) -> Self { Self(self.0.clone()) } } +#[allow(deprecated)] impl Default for SendWrapper { fn default() -> Self { Self(T::default()) } } +#[allow(deprecated)] impl Display for SendWrapper { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "SendWrapper({})", self.0) diff --git a/worker/src/socket.rs b/worker/src/socket.rs index 6ce0c94c1..05c4ed885 100644 --- a/worker/src/socket.rs +++ b/worker/src/socket.rs @@ -4,6 +4,7 @@ use std::{ task::{Context, Poll}, }; +use crate::send::SendFuture; use crate::Result; use crate::{r2::js_object, Error}; use futures_util::FutureExt; @@ -93,19 +94,19 @@ impl Socket { /// Closes the TCP socket. Both the readable and writable streams are forcibly closed. pub async fn close(&mut self) -> Result<()> { - JsFuture::from(self.inner.close()?).await?; + SendFuture::new(JsFuture::from(self.inner.close()?)).await?; Ok(()) } /// This Future is resolved when the socket is closed /// and is rejected if the socket encounters an error. pub async fn closed(&self) -> Result<()> { - JsFuture::from(self.inner.closed()?).await?; + SendFuture::new(JsFuture::from(self.inner.closed()?)).await?; Ok(()) } pub async fn opened(&self) -> Result { - let value = JsFuture::from(self.inner.opened()?).await?; + let value = SendFuture::new(JsFuture::from(self.inner.opened()?)).await?; value.try_into() } diff --git a/worker/src/streams.rs b/worker/src/streams.rs index 388adba46..6d0c548e9 100644 --- a/worker/src/streams.rs +++ b/worker/src/streams.rs @@ -46,6 +46,10 @@ impl Stream for ByteStream { } } +/// SAFETY: Cloudflare Workers runtime is single-threaded, so it's safe to mark ByteStream +/// as Send even though `IntoStream` internally contains `Option` which is `!Send`. +unsafe impl Send for ByteStream {} + #[pin_project] pub struct FixedLengthStream { length: u64, @@ -110,6 +114,11 @@ impl Stream for FixedLengthStream { } } +/// SAFETY: Cloudflare Workers runtime is single-threaded, so it's safe to mark FixedLengthStream +/// as Send and Sync even though it contains a trait object that may not be Send/Sync. +unsafe impl Send for FixedLengthStream {} +unsafe impl Sync for FixedLengthStream {} + impl From for FixedLengthStreamSys { fn from(stream: FixedLengthStream) -> Self { let raw = if stream.length < u32::MAX as u64 { diff --git a/worker/src/websocket.rs b/worker/src/websocket.rs index 08b1608d5..87a83e76e 100644 --- a/worker/src/websocket.rs +++ b/worker/src/websocket.rs @@ -302,6 +302,10 @@ pub struct EventStream<'ws> { )>, } +// SAFETY: Workers runtime is single-threaded. EventStream contains Closure +// which is !Send due to the trait object, but this is safe in a single-threaded context. +unsafe impl Send for EventStream<'_> {} + impl Stream for EventStream<'_> { type Item = Result;