diff --git a/build-resources/opaque-types/src/lib.rs b/build-resources/opaque-types/src/lib.rs index af55fc055..19547dd32 100644 --- a/build-resources/opaque-types/src/lib.rs +++ b/build-resources/opaque-types/src/lib.rs @@ -9,6 +9,7 @@ use std::{ thread::JoinHandle, }; +use flume::{Receiver, Sender}; #[cfg(all(feature = "shared-memory", feature = "unstable"))] use zenoh::shm::{ zshm, zshmmut, AllocLayout, ChunkAllocResult, ChunkDescriptor, MemoryLayout, @@ -50,6 +51,13 @@ macro_rules! get_opaque_type_data { }; } +type SgNotifier = Sender<()>; + +pub(crate) struct SyncGroup { + waiter: Receiver<()>, + notifier: Option, +} + /// A Zenoh data. /// /// To minimize copies and reallocations, Zenoh may provide data in several separate buffers. @@ -124,19 +132,29 @@ get_opaque_type_data!(Option, z_owned_query_t); /// A loaned Zenoh query. get_opaque_type_data!(Query, z_loaned_query_t); +struct CQueryable { + queryable: Queryable<()>, + sg: SyncGroup, +} + /// An owned Zenoh queryable . /// /// Responds to queries sent via `z_get()` with intersecting key expression. -get_opaque_type_data!(Option>, z_owned_queryable_t); +get_opaque_type_data!(Option, z_owned_queryable_t); /// A loaned Zenoh queryable. -get_opaque_type_data!(Queryable<()>, z_loaned_queryable_t); +get_opaque_type_data!(CQueryable, z_loaned_queryable_t); + +struct CQuerier { + querier: Querier<'static>, + sg: SyncGroup, +} /// An owned Zenoh querier. /// /// Sends queries to matching queryables. -get_opaque_type_data!(Option, z_owned_querier_t); +get_opaque_type_data!(Option, z_owned_querier_t); /// A loaned Zenoh queryable. -get_opaque_type_data!(Querier, z_loaned_querier_t); +get_opaque_type_data!(CQuerier, z_loaned_querier_t); #[cfg(feature = "unstable")] /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. @@ -156,33 +174,43 @@ get_opaque_type_data!( ze_loaned_querying_subscriber_t ); +#[cfg(feature = "unstable")] +struct CAdvancedSubscriber { + subscriber: zenoh_ext::AdvancedSubscriber<()>, + sg: SyncGroup, +} + #[cfg(feature = "unstable")] /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. /// @brief An owned Zenoh advanced subscriber. /// /// In addition to receiving the data it is subscribed to, /// it is also able to receive notifications regarding missed samples and/or automatically recover them. -get_opaque_type_data!( - Option>, - ze_owned_advanced_subscriber_t -); +get_opaque_type_data!(Option, ze_owned_advanced_subscriber_t); #[cfg(feature = "unstable")] /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. /// @brief A loaned Zenoh advanced subscriber. -get_opaque_type_data!( - zenoh_ext::AdvancedSubscriber<()>, - ze_loaned_advanced_subscriber_t -); +get_opaque_type_data!(CAdvancedSubscriber, ze_loaned_advanced_subscriber_t); + +#[cfg(feature = "unstable")] +struct CSampleMissListener { + listener: zenoh_ext::SampleMissListener<()>, + sg: SyncGroup, +} + #[cfg(feature = "unstable")] /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. /// @brief An owned Zenoh sample miss listener. Missed samples can only be detected from advanced publishers, enabling sample miss detection. /// /// A listener that sends notification when the advanced subscriber misses a sample . /// Dropping the corresponding subscriber, also drops the listener. -get_opaque_type_data!( - Option>, - ze_owned_sample_miss_listener_t -); +get_opaque_type_data!(Option, ze_owned_sample_miss_listener_t); + +#[cfg(feature = "unstable")] +struct CAdvancedPublisher { + publisher: zenoh_ext::AdvancedPublisher<'static>, + sg: SyncGroup, +} #[cfg(feature = "unstable")] /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. @@ -190,17 +218,11 @@ get_opaque_type_data!( /// /// In addition to publishing the data, /// it also maintains the storage, allowing matching subscribers to retrive missed samples. -get_opaque_type_data!( - Option>, - ze_owned_advanced_publisher_t -); +get_opaque_type_data!(Option, ze_owned_advanced_publisher_t); #[cfg(feature = "unstable")] /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. /// @brief A loaned Zenoh advanced publisher. -get_opaque_type_data!( - zenoh_ext::AdvancedPublisher<'static>, - ze_loaned_advanced_publisher_t -); +get_opaque_type_data!(CAdvancedPublisher, ze_loaned_advanced_publisher_t); /// A Zenoh-allocated key expression . /// /// Key expressions can identify a single key or a set of keys. @@ -232,10 +254,15 @@ get_opaque_type_data!(Option>, z_view_keyexpr_t); /// both for local processing and network-wise. get_opaque_type_data!(KeyExpr<'static>, z_loaned_keyexpr_t); +struct CSession { + session: Session, + sg: SyncGroup, +} + /// An owned Zenoh session. -get_opaque_type_data!(Option, z_owned_session_t); +get_opaque_type_data!(Option, z_owned_session_t); /// A loaned Zenoh session. -get_opaque_type_data!(Session, z_loaned_session_t); +get_opaque_type_data!(CSession, z_loaned_session_t); #[cfg(feature = "unstable")] /// An owned Close handle @@ -259,24 +286,38 @@ get_opaque_type_data!(ZenohId, z_id_t); /// It consists of a time generated by a Hybrid Logical Clock (HLC) in NPT64 format and a unique zenoh identifier. get_opaque_type_data!(Timestamp, z_timestamp_t); +struct CPublisher { + publisher: Publisher<'static>, + sg: SyncGroup, +} + /// An owned Zenoh publisher . -get_opaque_type_data!(Option>, z_owned_publisher_t); +get_opaque_type_data!(Option, z_owned_publisher_t); /// A loaned Zenoh publisher. -get_opaque_type_data!(Publisher<'static>, z_loaned_publisher_t); +get_opaque_type_data!(CPublisher, z_loaned_publisher_t); + +struct CMatchingListener { + listener: MatchingListener<()>, + sg: SyncGroup, +} /// @brief An owned Zenoh matching listener. /// /// A listener that sends notifications when the [`MatchingStatus`] of a publisher or querier changes. /// Dropping the corresponding publisher, also drops matching listener. -get_opaque_type_data!(Option>, z_owned_matching_listener_t); +get_opaque_type_data!(Option, z_owned_matching_listener_t); +struct CSubscriber { + subscriber: Subscriber<()>, + sg: SyncGroup, +} /// An owned Zenoh subscriber . /// /// Receives data from publication on intersecting key expressions. /// Destroying the subscriber cancels the subscription. -get_opaque_type_data!(Option>, z_owned_subscriber_t); +get_opaque_type_data!(Option, z_owned_subscriber_t); /// A loaned Zenoh subscriber. -get_opaque_type_data!(Subscriber<()>, z_loaned_subscriber_t); +get_opaque_type_data!(CSubscriber, z_loaned_subscriber_t); /// @brief A liveliness token that can be used to provide the network with information about connectivity to its /// declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key @@ -362,7 +403,6 @@ get_opaque_type_data!(MemoryLayout, z_loaned_memory_layout_t); /// @brief An owned ChunkAllocResult. get_opaque_type_data!(Option, z_owned_chunk_alloc_result_t); #[cfg(all(feature = "shared-memory", feature = "unstable"))] -#[cfg(all(feature = "shared-memory", feature = "unstable"))] /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. /// @brief An owned ZShm slice. get_opaque_type_data!(Option, z_owned_shm_t); diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 95dbeb04d..cdd522015 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -1142,16 +1142,6 @@ typedef struct ze_moved_closure_miss_t { struct ze_owned_closure_miss_t _this; } ze_moved_closure_miss_t; #endif -/** - * @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. - * @brief An owned Zenoh sample miss listener. Missed samples can only be detected from advanced publishers, enabling sample miss detection. - * - * A listener that sends notification when the advanced subscriber misses a sample . - * Dropping the corresponding subscriber, also drops the listener. - */ -typedef struct ALIGN(8) ze_owned_sample_miss_listener_t { - uint8_t _0[24]; -} ze_owned_sample_miss_listener_t; typedef struct ze_moved_advanced_subscriber_t { struct ze_owned_advanced_subscriber_t _this; } ze_moved_advanced_subscriber_t; diff --git a/splitguide.yaml b/splitguide.yaml index 7c99b5d86..1c006f33e 100644 --- a/splitguide.yaml +++ b/splitguide.yaml @@ -43,6 +43,7 @@ zenoh_opaque.h: - ze_loaned_advanced_subscriber_t!#unstable - ze_owned_advanced_publisher_t!#unstable - ze_loaned_advanced_publisher_t!#unstable + - ze_owned_sample_miss_listener_t!#unstable - z_owned_keyexpr_t! - z_view_keyexpr_t! - z_loaned_keyexpr_t! diff --git a/src/advanced_publisher.rs b/src/advanced_publisher.rs index fe69bc975..91bbae0f7 100644 --- a/src/advanced_publisher.rs +++ b/src/advanced_publisher.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh team, // -use std::{mem::MaybeUninit, time::Duration}; +use std::{mem::MaybeUninit, ops::Deref, time::Duration}; use zenoh::{ handlers::Callback, @@ -30,7 +30,8 @@ use crate::{ z_closure_matching_status_call, z_closure_matching_status_loan, z_congestion_control_t, z_entity_global_id_t, z_loaned_keyexpr_t, z_loaned_session_t, z_matching_status_t, z_moved_bytes_t, z_moved_closure_matching_status_t, z_owned_matching_listener_t, z_priority_t, - z_publisher_delete_options_t, z_publisher_options_t, z_publisher_put_options_t, + z_publisher_delete_options_t, z_publisher_options_t, z_publisher_put_options_t, SgNotifier, + SyncGroup, }; /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. @@ -189,8 +190,22 @@ pub extern "C" fn ze_advanced_publisher_options_default( pub use crate::opaque_types::{ ze_loaned_advanced_publisher_t, ze_moved_advanced_publisher_t, ze_owned_advanced_publisher_t, }; + +pub(crate) struct CAdvancedPublisher { + publisher: zenoh_ext::AdvancedPublisher<'static>, + sg: SyncGroup, +} + +impl Deref for CAdvancedPublisher { + type Target = zenoh_ext::AdvancedPublisher<'static>; + + fn deref(&self) -> &Self::Target { + &self.publisher + } +} + decl_c_type!( - owned(ze_owned_advanced_publisher_t, option zenoh_ext::AdvancedPublisher<'static>), + owned(ze_owned_advanced_publisher_t, option CAdvancedPublisher), loaned(ze_loaned_advanced_publisher_t), ); @@ -242,7 +257,10 @@ pub extern "C" fn ze_declare_advanced_publisher( result::Z_EGENERIC } Ok(publisher) => { - this.write(Some(publisher)); + this.write(Some(CAdvancedPublisher { + publisher, + sg: SyncGroup::new(), + })); result::Z_OK } } @@ -417,16 +435,20 @@ pub extern "C" fn ze_advanced_publisher_keyexpr( fn _advanced_publisher_matching_listener_declare_inner<'a>( publisher: &'a ze_loaned_advanced_publisher_t, callback: &mut z_moved_closure_matching_status_t, + notifier: SgNotifier, ) -> zenoh::matching::MatchingListenerBuilder<'a, Callback> { + use crate::SyncObj; + let publisher = publisher.as_rust_type_ref(); let callback = callback.take_rust_type(); + let sync_callback = SyncObj::new(callback, notifier); let listener = publisher .matching_listener() .callback_mut(move |matching_status| { let status = z_matching_status_t { matching: matching_status.matching(), }; - z_closure_matching_status_call(z_closure_matching_status_loan(&callback), &status); + z_closure_matching_status_call(z_closure_matching_status_loan(&sync_callback), &status); }); listener } @@ -446,11 +468,15 @@ pub extern "C" fn ze_advanced_publisher_declare_matching_listener( matching_listener: &mut MaybeUninit, callback: &mut z_moved_closure_matching_status_t, ) -> result::z_result_t { + use crate::{CMatchingListener, SyncGroup}; + let this = matching_listener.as_rust_type_mut_uninit(); - let listener = _advanced_publisher_matching_listener_declare_inner(publisher, callback); + let sg = SyncGroup::new(); + let listener = + _advanced_publisher_matching_listener_declare_inner(publisher, callback, sg.notifier()); match listener.wait() { Ok(listener) => { - this.write(Some(listener)); + this.write(Some(CMatchingListener { listener, _sg: sg })); result::Z_OK } Err(e) => { @@ -475,7 +501,11 @@ pub extern "C" fn ze_advanced_publisher_declare_background_matching_listener( publisher: &'static ze_loaned_advanced_publisher_t, callback: &mut z_moved_closure_matching_status_t, ) -> result::z_result_t { - let listener = _advanced_publisher_matching_listener_declare_inner(publisher, callback); + let listener = _advanced_publisher_matching_listener_declare_inner( + publisher, + callback, + publisher.as_rust_type_ref().sg.notifier(), + ); match listener.background().wait() { Ok(_) => result::Z_OK, Err(e) => { @@ -528,7 +558,7 @@ pub extern "C" fn ze_undeclare_advanced_publisher( this_: &mut ze_moved_advanced_publisher_t, ) -> result::z_result_t { if let Some(p) = this_.take_rust_type() { - if let Err(e) = p.undeclare().wait() { + if let Err(e) = p.publisher.undeclare().wait() { crate::report_error!("{}", e); return result::Z_ENETWORK; } diff --git a/src/advanced_subscriber.rs b/src/advanced_subscriber.rs index 45c16b8e2..b0e143b24 100644 --- a/src/advanced_subscriber.rs +++ b/src/advanced_subscriber.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh team, // -use std::{mem::MaybeUninit, time::Duration}; +use std::{mem::MaybeUninit, ops::Deref, time::Duration}; use zenoh::{handlers::Callback, liveliness::LivelinessSubscriberBuilder, sample::Sample, Wait}; use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig, SampleMissListener}; @@ -25,9 +25,28 @@ use crate::{ z_moved_closure_sample_t, z_owned_subscriber_t, z_subscriber_options_t, ze_closure_miss_call, ze_closure_miss_loan, ze_loaned_advanced_subscriber_t, ze_moved_advanced_subscriber_t, ze_moved_closure_miss_t, ze_moved_sample_miss_listener_t, ze_owned_advanced_subscriber_t, - ze_owned_sample_miss_listener_t, + ze_owned_sample_miss_listener_t, CSubscriber, SgNotifier, SyncGroup, SyncObj, }; +pub(crate) struct CAdvancedSubscriber { + subscriber: zenoh_ext::AdvancedSubscriber<()>, + sg: SyncGroup, +} + +impl CAdvancedSubscriber { + pub(crate) fn notifier(&self) -> SgNotifier { + self.sg.notifier() + } +} + +impl Deref for CAdvancedSubscriber { + type Target = zenoh_ext::AdvancedSubscriber<()>; + + fn deref(&self) -> &Self::Target { + &self.subscriber + } +} + /// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. /// @brief Settings for retrievieng historical data for Advanced Subscriber. #[repr(C)] @@ -196,12 +215,14 @@ fn _declare_advanced_subscriber_inner( session: &'static z_loaned_session_t, key_expr: &'static z_loaned_keyexpr_t, callback: &mut z_moved_closure_sample_t, + notifier: SgNotifier, mut options: Option<&'static mut ze_advanced_subscriber_options_t>, ) -> zenoh_ext::AdvancedSubscriberBuilder<'static, 'static, 'static, Callback> { let sub = _declare_subscriber_inner( session, key_expr, callback, + notifier, options.as_mut().map(|o| &mut o.subscriber_options), ); let mut sub = sub.advanced(); @@ -239,7 +260,7 @@ fn _declare_advanced_subscriber_inner( } decl_c_type!( - owned(ze_owned_advanced_subscriber_t, option zenoh_ext::AdvancedSubscriber<()>), + owned(ze_owned_advanced_subscriber_t, option CAdvancedSubscriber), loaned(ze_loaned_advanced_subscriber_t), ); @@ -297,10 +318,14 @@ pub extern "C" fn ze_declare_advanced_subscriber( options: Option<&'static mut ze_advanced_subscriber_options_t>, ) -> result::z_result_t { let this = subscriber.as_rust_type_mut_uninit(); - let s = _declare_advanced_subscriber_inner(session, key_expr, callback, options); + let sg = SyncGroup::new(); + let s = _declare_advanced_subscriber_inner(session, key_expr, callback, sg.notifier(), options); match s.wait() { Ok(sub) => { - this.write(Some(sub)); + this.write(Some(CAdvancedSubscriber { + subscriber: sub, + sg, + })); result::Z_OK } Err(e) => { @@ -327,7 +352,13 @@ pub extern "C" fn ze_declare_background_advanced_subscriber( callback: &'static mut z_moved_closure_sample_t, options: Option<&'static mut ze_advanced_subscriber_options_t>, ) -> result::z_result_t { - let subscriber = _declare_advanced_subscriber_inner(session, key_expr, callback, options); + let subscriber = _declare_advanced_subscriber_inner( + session, + key_expr, + callback, + session.as_rust_type_ref().notifier(), + options, + ); match subscriber.background().wait() { Ok(_) => result::Z_OK, Err(e) => { @@ -346,7 +377,7 @@ pub extern "C" fn ze_undeclare_advanced_subscriber( this_: &mut ze_moved_advanced_subscriber_t, ) -> result::z_result_t { if let Some(s) = this_.take_rust_type() { - if let Err(e) = s.undeclare().wait() { + if let Err(e) = s.subscriber.undeclare().wait() { crate::report_error!("{}", e); return result::Z_EGENERIC; } @@ -364,8 +395,13 @@ pub struct ze_miss_t { pub nb: u32, } +pub(crate) struct CSampleMissListener { + listener: SampleMissListener<()>, + _sg: SyncGroup, +} + decl_c_type!( - owned(ze_owned_sample_miss_listener_t, option SampleMissListener<()>), + owned(ze_owned_sample_miss_listener_t, option CSampleMissListener), ); #[no_mangle] @@ -403,7 +439,7 @@ pub extern "C" fn ze_undeclare_sample_miss_listener( this: &mut ze_moved_sample_miss_listener_t, ) -> result::z_result_t { if let Some(m) = this.take_rust_type() { - if let Err(e) = m.undeclare().wait() { + if let Err(e) = m.listener.undeclare().wait() { crate::report_error!("{}", e); return result::Z_ENETWORK; } @@ -414,15 +450,17 @@ pub extern "C" fn ze_undeclare_sample_miss_listener( fn _advanced_subscriber_sample_miss_listener_declare_inner<'a>( subscriber: &'a ze_loaned_advanced_subscriber_t, callback: &mut ze_moved_closure_miss_t, + notifier: SgNotifier, ) -> zenoh_ext::SampleMissListenerBuilder<'a, Callback> { let subscriber = subscriber.as_rust_type_ref(); let callback = callback.take_rust_type(); + let sync_callback = SyncObj::new(callback, notifier); let listener = subscriber.sample_miss_listener().callback_mut(move |miss| { let miss = ze_miss_t { source: miss.source().into_c_type(), nb: miss.nb(), }; - ze_closure_miss_call(ze_closure_miss_loan(&callback), &miss); + ze_closure_miss_call(ze_closure_miss_loan(&sync_callback), &miss); }); listener } @@ -442,10 +480,15 @@ pub extern "C" fn ze_advanced_subscriber_declare_sample_miss_listener( callback: &mut ze_moved_closure_miss_t, ) -> result::z_result_t { let this = sample_miss_listener.as_rust_type_mut_uninit(); - let listener = _advanced_subscriber_sample_miss_listener_declare_inner(subscriber, callback); + let sg = SyncGroup::new(); + let listener = _advanced_subscriber_sample_miss_listener_declare_inner( + subscriber, + callback, + sg.notifier(), + ); match listener.wait() { Ok(listener) => { - this.write(Some(listener)); + this.write(Some(CSampleMissListener { listener, _sg: sg })); result::Z_OK } Err(e) => { @@ -469,7 +512,11 @@ pub extern "C" fn ze_advanced_subscriber_declare_background_sample_miss_listener subscriber: &'static ze_loaned_advanced_subscriber_t, callback: &mut ze_moved_closure_miss_t, ) -> result::z_result_t { - let listener = _advanced_subscriber_sample_miss_listener_declare_inner(subscriber, callback); + let listener = _advanced_subscriber_sample_miss_listener_declare_inner( + subscriber, + callback, + subscriber.as_rust_type_ref().notifier(), + ); match listener.background().wait() { Ok(_) => result::Z_OK, Err(e) => { @@ -482,16 +529,18 @@ pub extern "C" fn ze_advanced_subscriber_declare_background_sample_miss_listener fn _advanced_subscriber_detect_publishers_inner( subscriber: &'static ze_loaned_advanced_subscriber_t, callback: &'static mut z_moved_closure_sample_t, + notifier: SgNotifier, options: Option<&'static mut z_liveliness_subscriber_options_t>, ) -> LivelinessSubscriberBuilder<'static, 'static, Callback> { let subscriber = subscriber.as_rust_type_ref(); let callback = callback.take_rust_type(); + let sync_callback = SyncObj::new(callback, notifier); let sub = subscriber .detect_publishers() .history(options.is_some_and(|o| o.history)) .callback(move |sample| { let mut owned_sample = Some(sample); - z_closure_sample_call(z_closure_sample_loan(&callback), unsafe { + z_closure_sample_call(z_closure_sample_loan(&sync_callback), unsafe { owned_sample .as_mut() .unwrap_unchecked() @@ -518,10 +567,15 @@ pub extern "C" fn ze_advanced_subscriber_detect_publishers( options: Option<&'static mut z_liveliness_subscriber_options_t>, ) -> result::z_result_t { let liveliness_subscriber = liveliness_subscriber.as_rust_type_mut_uninit(); - let builder = _advanced_subscriber_detect_publishers_inner(subscriber, callback, options); + let sg = SyncGroup::new(); + let builder = + _advanced_subscriber_detect_publishers_inner(subscriber, callback, sg.notifier(), options); match builder.wait() { Ok(s) => { - liveliness_subscriber.write(Some(s)); + liveliness_subscriber.write(Some(CSubscriber { + subscriber: s, + _sg: sg, + })); result::Z_OK } Err(e) => { @@ -546,7 +600,12 @@ pub extern "C" fn ze_advanced_subscriber_detect_publishers_background( callback: &'static mut z_moved_closure_sample_t, options: Option<&'static mut z_liveliness_subscriber_options_t>, ) -> result::z_result_t { - let builder = _advanced_subscriber_detect_publishers_inner(subscriber, callback, options); + let builder = _advanced_subscriber_detect_publishers_inner( + subscriber, + callback, + subscriber.as_rust_type_ref().sg.notifier(), + options, + ); match builder.background().wait() { Ok(_) => result::Z_OK, Err(e) => { diff --git a/src/closures/mod.rs b/src/closures/mod.rs index 95d460c06..38fa0eecf 100644 --- a/src/closures/mod.rs +++ b/src/closures/mod.rs @@ -1,3 +1,5 @@ +use std::ops::Deref; + // // Copyright (c) 2017, 2022 ZettaScale Technology. // @@ -45,3 +47,55 @@ mod matching_status_closure; pub use miss_closure::*; #[cfg(feature = "unstable")] mod miss_closure; + +use flume::{Receiver, Sender}; + +pub type SgNotifier = Sender<()>; + +pub(crate) struct SyncObj { + value: T, + _notifier: SgNotifier, +} + +impl Deref for SyncObj { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.value + } +} + +impl SyncObj { + pub(crate) fn new(value: T, notifier: SgNotifier) -> Self { + Self { + value, + _notifier: notifier, + } + } +} + +pub(crate) struct SyncGroup { + waiter: Receiver<()>, + notifier: Option, +} + +impl SyncGroup { + pub(crate) fn new() -> SyncGroup { + let (notifier, waiter) = flume::bounded(0); + SyncGroup { + waiter, + notifier: Some(notifier), + } + } + + pub(crate) fn notifier(&self) -> SgNotifier { + self.notifier.as_ref().unwrap().clone() + } +} + +impl Drop for SyncGroup { + fn drop(&mut self) { + self.notifier.take(); + self.waiter.recv().unwrap_err(); + } +} diff --git a/src/get.rs b/src/get.rs index db1a20c6d..879bda561 100644 --- a/src/get.rs +++ b/src/get.rs @@ -33,7 +33,7 @@ use crate::{ z_closure_reply_call, z_closure_reply_loan, z_congestion_control_t, z_consolidation_mode_t, z_loaned_bytes_t, z_loaned_encoding_t, z_loaned_keyexpr_t, z_loaned_sample_t, z_loaned_session_t, z_moved_bytes_t, z_moved_closure_reply_t, z_moved_encoding_t, z_priority_t, - z_query_target_t, zc_locality_default, zc_locality_t, CStringView, + z_query_target_t, zc_locality_default, zc_locality_t, CStringView, SyncObj, }; #[cfg(feature = "unstable")] use crate::{ @@ -387,11 +387,12 @@ pub unsafe extern "C" fn z_get_with_parameters_substr( get = get.timeout(std::time::Duration::from_millis(options.timeout_ms)); } } + let sync_callback = SyncObj::new(callback, session.notifier()); match get .callback(move |response| { let mut owned_response = Some(response); z_closure_reply_call( - z_closure_reply_loan(&callback), + z_closure_reply_loan(&sync_callback), owned_response .as_mut() .unwrap_unchecked() diff --git a/src/liveliness.rs b/src/liveliness.rs index 10d0efd28..94d06141f 100644 --- a/src/liveliness.rs +++ b/src/liveliness.rs @@ -27,7 +27,7 @@ use crate::{ transmute::{LoanedCTypeRef, RustTypeRef, RustTypeRefUninit, TakeRustType}, z_closure_reply_call, z_closure_reply_loan, z_closure_sample_call, z_closure_sample_loan, z_loaned_keyexpr_t, z_loaned_session_t, z_moved_closure_reply_t, z_moved_closure_sample_t, - z_moved_liveliness_token_t, z_owned_subscriber_t, + z_moved_liveliness_token_t, z_owned_subscriber_t, CSubscriber, SgNotifier, SyncGroup, SyncObj, }; decl_c_type!( owned(z_owned_liveliness_token_t, option LivelinessToken), @@ -145,18 +145,20 @@ fn _liveliness_declare_subscriber_inner<'a, 'b>( session: &'a z_loaned_session_t, key_expr: &'b z_loaned_keyexpr_t, callback: &mut z_moved_closure_sample_t, + notifier: SgNotifier, options: Option<&mut z_liveliness_subscriber_options_t>, ) -> LivelinessSubscriberBuilder<'a, 'b, Callback> { let session = session.as_rust_type_ref(); let key_expr = key_expr.as_rust_type_ref(); let callback = callback.take_rust_type(); + let sync_callback = SyncObj::new(callback, notifier); let sub = session .liveliness() .declare_subscriber(key_expr) .history(options.is_some_and(|o| o.history)) .callback(move |sample| { let mut owned_sample = Some(sample); - z_closure_sample_call(z_closure_sample_loan(&callback), unsafe { + z_closure_sample_call(z_closure_sample_loan(&sync_callback), unsafe { owned_sample .as_mut() .unwrap_unchecked() @@ -183,10 +185,15 @@ pub extern "C" fn z_liveliness_declare_subscriber( options: Option<&mut z_liveliness_subscriber_options_t>, ) -> result::z_result_t { let this = subscriber.as_rust_type_mut_uninit(); - let subscriber = _liveliness_declare_subscriber_inner(session, key_expr, callback, options); + let sg = SyncGroup::new(); + let subscriber = + _liveliness_declare_subscriber_inner(session, key_expr, callback, sg.notifier(), options); match subscriber.wait() { Ok(subscriber) => { - this.write(Some(subscriber)); + this.write(Some(CSubscriber { + subscriber, + _sg: sg, + })); result::Z_OK } Err(e) => { @@ -212,7 +219,13 @@ pub extern "C" fn z_liveliness_declare_background_subscriber( callback: &mut z_moved_closure_sample_t, options: Option<&mut z_liveliness_subscriber_options_t>, ) -> result::z_result_t { - let subscriber = _liveliness_declare_subscriber_inner(session, key_expr, callback, options); + let subscriber = _liveliness_declare_subscriber_inner( + session, + key_expr, + callback, + session.as_rust_type_ref().notifier(), + options, + ); match subscriber.background().wait() { Ok(_) => result::Z_OK, Err(e) => { diff --git a/src/matching.rs b/src/matching.rs index 2b56baee0..83fb3a1d3 100644 --- a/src/matching.rs +++ b/src/matching.rs @@ -20,9 +20,16 @@ pub use crate::opaque_types::{z_moved_matching_listener_t, z_owned_matching_list use crate::{ result, transmute::{RustTypeRef, RustTypeRefUninit, TakeRustType}, + SyncGroup, }; + +pub(crate) struct CMatchingListener { + pub(crate) listener: MatchingListener<()>, + pub(crate) _sg: SyncGroup, +} + decl_c_type!( - owned(z_owned_matching_listener_t, option MatchingListener<()>), + owned(z_owned_matching_listener_t, option CMatchingListener), ); #[no_mangle] @@ -62,7 +69,7 @@ pub extern "C" fn z_undeclare_matching_listener( this: &mut z_moved_matching_listener_t, ) -> result::z_result_t { if let Some(m) = this.take_rust_type() { - if let Err(e) = m.undeclare().wait() { + if let Err(e) = m.listener.undeclare().wait() { crate::report_error!("{}", e); return result::Z_ENETWORK; } diff --git a/src/publisher.rs b/src/publisher.rs index fc77e2bef..2c0c70ad2 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh team, // -use std::mem::MaybeUninit; +use std::{mem::MaybeUninit, ops::Deref}; use zenoh::{ handlers::Callback, @@ -30,7 +30,8 @@ use crate::{ z_closure_matching_status_call, z_closure_matching_status_loan, z_congestion_control_t, z_loaned_keyexpr_t, z_loaned_session_t, z_matching_status_t, z_moved_bytes_t, z_moved_closure_matching_status_t, z_moved_encoding_t, z_owned_matching_listener_t, - z_priority_t, z_timestamp_t, zc_locality_default, zc_locality_t, + z_priority_t, z_timestamp_t, zc_locality_default, zc_locality_t, CMatchingListener, SgNotifier, + SyncGroup, }; #[cfg(feature = "unstable")] use crate::{ @@ -77,9 +78,22 @@ pub extern "C" fn z_publisher_options_default(this_: &mut MaybeUninit, + sg: SyncGroup, +} + +impl Deref for CPublisher { + type Target = Publisher<'static>; + + fn deref(&self) -> &Self::Target { + &self.publisher + } +} + pub use crate::opaque_types::{z_loaned_publisher_t, z_moved_publisher_t, z_owned_publisher_t}; decl_c_type!( - owned(z_owned_publisher_t, option Publisher<'static>), + owned(z_owned_publisher_t, option CPublisher), loaned(z_loaned_publisher_t), ); @@ -135,7 +149,10 @@ pub extern "C" fn z_declare_publisher( result::Z_EGENERIC } Ok(publisher) => { - this.write(Some(publisher)); + this.write(Some(CPublisher { + publisher, + sg: SyncGroup::new(), + })); result::Z_OK } } @@ -334,17 +351,21 @@ pub extern "C" fn z_publisher_keyexpr(publisher: &z_loaned_publisher_t) -> &z_lo fn _publisher_matching_listener_declare_inner<'a>( publisher: &'a z_loaned_publisher_t, + notifier: SgNotifier, callback: &mut z_moved_closure_matching_status_t, ) -> zenoh::matching::MatchingListenerBuilder<'a, Callback> { + use crate::SyncObj; + let publisher = publisher.as_rust_type_ref(); let callback = callback.take_rust_type(); + let sync_callback = SyncObj::new(callback, notifier); let listener = publisher .matching_listener() .callback_mut(move |matching_status| { let status = z_matching_status_t { matching: matching_status.matching(), }; - z_closure_matching_status_call(z_closure_matching_status_loan(&callback), &status); + z_closure_matching_status_call(z_closure_matching_status_loan(&sync_callback), &status); }); listener } @@ -363,10 +384,11 @@ pub extern "C" fn z_publisher_declare_matching_listener( callback: &mut z_moved_closure_matching_status_t, ) -> result::z_result_t { let this = matching_listener.as_rust_type_mut_uninit(); - let listener = _publisher_matching_listener_declare_inner(publisher, callback); + let sg = SyncGroup::new(); + let listener = _publisher_matching_listener_declare_inner(publisher, sg.notifier(), callback); match listener.wait() { Ok(listener) => { - this.write(Some(listener)); + this.write(Some(CMatchingListener { listener, _sg: sg })); result::Z_OK } Err(e) => { @@ -389,7 +411,11 @@ pub extern "C" fn z_publisher_declare_background_matching_listener( publisher: &'static z_loaned_publisher_t, callback: &mut z_moved_closure_matching_status_t, ) -> result::z_result_t { - let listener = _publisher_matching_listener_declare_inner(publisher, callback); + let listener = _publisher_matching_listener_declare_inner( + publisher, + publisher.as_rust_type_ref().sg.notifier(), + callback, + ); match listener.background().wait() { Ok(_) => result::Z_OK, Err(e) => { @@ -436,7 +462,7 @@ pub extern "C" fn z_publisher_drop(this: &mut z_moved_publisher_t) { /// @return 0 in case of success, negative error code otherwise. pub extern "C" fn z_undeclare_publisher(this_: &mut z_moved_publisher_t) -> result::z_result_t { if let Some(p) = this_.take_rust_type() { - if let Err(e) = p.undeclare().wait() { + if let Err(e) = p.publisher.undeclare().wait() { crate::report_error!("{}", e); return result::Z_ENETWORK; } diff --git a/src/querier.rs b/src/querier.rs index d1766cadb..c0395621d 100644 --- a/src/querier.rs +++ b/src/querier.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh team, // -use std::mem::MaybeUninit; +use std::{mem::MaybeUninit, ops::Deref}; use libc::c_char; use zenoh::{ @@ -32,7 +32,7 @@ use crate::{ z_loaned_session_t, z_matching_status_t, z_moved_bytes_t, z_moved_closure_matching_status_t, z_moved_closure_reply_t, z_moved_encoding_t, z_moved_querier_t, z_owned_matching_listener_t, z_owned_querier_t, z_priority_t, z_query_consolidation_t, z_query_target_t, - zc_locality_default, zc_locality_t, + zc_locality_default, zc_locality_t, SgNotifier, SyncGroup, SyncObj, }; #[cfg(feature = "unstable")] use crate::{ @@ -80,8 +80,21 @@ pub extern "C" fn z_querier_options_default(this_: &mut MaybeUninit, + sg: SyncGroup, +} + +impl Deref for CQuerier { + type Target = Querier<'static>; + + fn deref(&self) -> &Self::Target { + &self.querier + } +} + decl_c_type!( - owned(z_owned_querier_t, option Querier<'static>), + owned(z_owned_querier_t, option CQuerier), loaned(z_loaned_querier_t), ); @@ -130,7 +143,10 @@ pub extern "C" fn z_declare_querier( result::Z_EGENERIC } Ok(querier) => { - this.write(Some(querier)); + this.write(Some(CQuerier { + querier, + sg: SyncGroup::new(), + })); result::Z_OK } } @@ -308,11 +324,12 @@ pub unsafe extern "C" fn z_querier_get_with_parameters_substr( if !p.is_empty() { get = get.parameters(p); } + let sync_callback = SyncObj::new(callback, querier.sg.notifier()); match get .callback(move |response| { let mut owned_response = Some(response); z_closure_reply_call( - z_closure_reply_loan(&callback), + z_closure_reply_loan(&sync_callback), owned_response .as_mut() .unwrap_unchecked() @@ -347,16 +364,18 @@ pub extern "C" fn z_querier_keyexpr(querier: &z_loaned_querier_t) -> &z_loaned_k fn _querier_matching_listener_declare_inner<'a>( querier: &'a z_loaned_querier_t, callback: &mut z_moved_closure_matching_status_t, + notifier: SgNotifier, ) -> zenoh::matching::MatchingListenerBuilder<'a, Callback> { let querier = querier.as_rust_type_ref(); let callback = callback.take_rust_type(); + let sync_callback = SyncObj::new(callback, notifier); let listener = querier .matching_listener() .callback_mut(move |matching_status| { let status = z_matching_status_t { matching: matching_status.matching(), }; - z_closure_matching_status_call(z_closure_matching_status_loan(&callback), &status); + z_closure_matching_status_call(z_closure_matching_status_loan(&sync_callback), &status); }); listener } @@ -374,11 +393,14 @@ pub extern "C" fn z_querier_declare_matching_listener( matching_listener: &mut MaybeUninit, callback: &mut z_moved_closure_matching_status_t, ) -> result::z_result_t { + use crate::CMatchingListener; + let this = matching_listener.as_rust_type_mut_uninit(); - let listener = _querier_matching_listener_declare_inner(querier, callback); + let sg = SyncGroup::new(); + let listener = _querier_matching_listener_declare_inner(querier, callback, sg.notifier()); match listener.wait() { Ok(listener) => { - this.write(Some(listener)); + this.write(Some(CMatchingListener { listener, _sg: sg })); result::Z_OK } Err(e) => { @@ -401,7 +423,11 @@ pub extern "C" fn z_querier_declare_background_matching_listener( querier: &'static z_loaned_querier_t, callback: &mut z_moved_closure_matching_status_t, ) -> result::z_result_t { - let listener = _querier_matching_listener_declare_inner(querier, callback); + let listener = _querier_matching_listener_declare_inner( + querier, + callback, + querier.as_rust_type_ref().sg.notifier(), + ); match listener.background().wait() { Ok(_) => result::Z_OK, Err(e) => { @@ -448,7 +474,7 @@ pub extern "C" fn z_querier_drop(this: &mut z_moved_querier_t) { #[no_mangle] pub extern "C" fn z_undeclare_querier(this_: &mut z_moved_querier_t) -> result::z_result_t { if let Some(q) = this_.take_rust_type() { - if let Err(e) = q.undeclare().wait() { + if let Err(e) = q.querier.undeclare().wait() { crate::report_error!("{}", e); return result::Z_ENETWORK; } diff --git a/src/queryable.rs b/src/queryable.rs index 51e0ce13a..436b70fbc 100644 --- a/src/queryable.rs +++ b/src/queryable.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh team, // -use std::mem::MaybeUninit; +use std::{mem::MaybeUninit, ops::Deref}; use zenoh::{ bytes::Encoding, @@ -28,12 +28,27 @@ use crate::{ z_closure_query_call, z_closure_query_loan, z_congestion_control_t, z_loaned_bytes_t, z_loaned_encoding_t, z_loaned_keyexpr_t, z_loaned_session_t, z_moved_bytes_t, z_moved_closure_query_t, z_moved_encoding_t, z_moved_queryable_t, z_priority_t, z_timestamp_t, - z_view_string_from_substr, z_view_string_t, zc_locality_default, zc_locality_t, + z_view_string_from_substr, z_view_string_t, zc_locality_default, zc_locality_t, SgNotifier, + SyncGroup, SyncObj, }; #[cfg(feature = "unstable")] use crate::{transmute::IntoCType, z_entity_global_id_t, z_moved_source_info_t}; + +pub(crate) struct CQueryable { + pub(crate) queryable: Queryable<()>, + _sg: SyncGroup, +} + +impl Deref for CQueryable { + type Target = Queryable<()>; + + fn deref(&self) -> &Self::Target { + &self.queryable + } +} + decl_c_type!( - owned(z_owned_queryable_t, option Queryable<()>), + owned(z_owned_queryable_t, option CQueryable), loaned(z_loaned_queryable_t), ); @@ -243,11 +258,13 @@ fn _declare_queryable_inner<'a, 'b>( session: &'a z_loaned_session_t, key_expr: &'b z_loaned_keyexpr_t, callback: &mut z_moved_closure_query_t, + notifier: SgNotifier, options: Option<&mut z_queryable_options_t>, ) -> QueryableBuilder<'a, 'b, Callback> { let session = session.as_rust_type_ref(); let keyexpr = key_expr.as_rust_type_ref(); let callback = callback.take_rust_type(); + let sync_callback = SyncObj::new(callback, notifier); let mut builder = session.declare_queryable(keyexpr); if let Some(options) = options { builder = builder @@ -256,7 +273,7 @@ fn _declare_queryable_inner<'a, 'b>( } let queryable = builder.callback(move |query| { let mut owned_query = Some(query); - z_closure_query_call(z_closure_query_loan(&callback), unsafe { + z_closure_query_call(z_closure_query_loan(&sync_callback), unsafe { owned_query .as_mut() .unwrap_unchecked() @@ -284,10 +301,14 @@ pub extern "C" fn z_declare_queryable( options: Option<&mut z_queryable_options_t>, ) -> result::z_result_t { let this = queryable.as_rust_type_mut_uninit(); - let queryable = _declare_queryable_inner(session, key_expr, callback, options); + let sg = SyncGroup::new(); + let queryable = _declare_queryable_inner(session, key_expr, callback, sg.notifier(), options); match queryable.wait() { Ok(q) => { - this.write(Some(q)); + this.write(Some(CQueryable { + queryable: q, + _sg: sg, + })); result::Z_OK } Err(e) => { @@ -314,7 +335,13 @@ pub extern "C" fn z_declare_background_queryable( callback: &mut z_moved_closure_query_t, options: Option<&mut z_queryable_options_t>, ) -> result::z_result_t { - let queryable = _declare_queryable_inner(session, key_expr, callback, options); + let queryable = _declare_queryable_inner( + session, + key_expr, + callback, + session.as_rust_type_ref().notifier(), + options, + ); match queryable.background().wait() { Ok(_) => result::Z_OK, Err(e) => { @@ -561,7 +588,7 @@ pub extern "C" fn z_query_attachment_mut( #[no_mangle] pub extern "C" fn z_undeclare_queryable(this_: &mut z_moved_queryable_t) -> result::z_result_t { if let Some(qable) = this_.take_rust_type() { - if let Err(e) = qable.undeclare().wait() { + if let Err(e) = qable.queryable.undeclare().wait() { crate::report_error!("{}", e); return result::Z_EGENERIC; } diff --git a/src/scouting.rs b/src/scouting.rs index 7e3e9ed6f..84e038b47 100644 --- a/src/scouting.rs +++ b/src/scouting.rs @@ -24,7 +24,8 @@ use crate::{ result, transmute::{IntoCType, LoanedCTypeRef, RustTypeRef, RustTypeRefUninit, TakeRustType}, z_closure_hello_call, z_closure_hello_loan, z_id_t, z_moved_closure_hello_t, z_moved_config_t, - z_owned_string_array_t, z_view_string_t, CStringInner, CStringView, ZVector, + z_owned_string_array_t, z_view_string_t, CStringInner, CStringView, SyncGroup, SyncObj, + ZVector, }; decl_c_type!( owned(z_owned_hello_t, option Hello ), @@ -203,11 +204,13 @@ pub extern "C" fn z_scout( return result::Z_EINVAL; }; + let sg = SyncGroup::new(); + let sync_callback = SyncObj::new(callback, sg.notifier()); ZRuntime::Application.block_in_place(async move { let res = zenoh::scout(what, config) .callback(move |h| { let mut owned_h = Some(h); - z_closure_hello_call(z_closure_hello_loan(&callback), unsafe { + z_closure_hello_call(z_closure_hello_loan(&sync_callback), unsafe { owned_h.as_mut().unwrap_unchecked().as_loaned_c_type_mut() }) }) diff --git a/src/session.rs b/src/session.rs index 69b4b71ae..e74c65503 100644 --- a/src/session.rs +++ b/src/session.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh team, // -use std::mem::MaybeUninit; +use std::{mem::MaybeUninit, ops::Deref}; use zenoh::{Session, Wait}; @@ -24,10 +24,30 @@ use crate::{ opaque_types::{z_loaned_session_t, z_owned_session_t}, result, transmute::{LoanedCTypeRef, RustTypeRef, RustTypeRefUninit, TakeRustType}, - z_moved_config_t, z_moved_session_t, + z_moved_config_t, z_moved_session_t, SgNotifier, SyncGroup, }; + +pub(crate) struct CSession { + session: Session, + sg: SyncGroup, +} + +impl CSession { + pub(crate) fn notifier(&self) -> SgNotifier { + self.sg.notifier() + } +} + +impl Deref for CSession { + type Target = Session; + + fn deref(&self) -> &Self::Target { + &self.session + } +} + decl_c_type!( - owned(z_owned_session_t, option Session), + owned(z_owned_session_t, option CSession), loaned(z_loaned_session_t), ); @@ -92,7 +112,10 @@ pub extern "C" fn z_open( }; match zenoh::open(config).wait() { Ok(s) => { - this.write(Some(s)); + this.write(Some(CSession { + session: s, + sg: SyncGroup::new(), + })); result::Z_OK } Err(e) => { @@ -126,7 +149,10 @@ pub extern "C" fn z_open_with_custom_shm_clients( .wait() { Ok(s) => { - this.write(Some(s)); + this.write(Some(CSession { + session: s, + sg: SyncGroup::new(), + })); result::Z_OK } Err(e) => { @@ -187,7 +213,7 @@ pub extern "C" fn z_close( #[allow(unused)] options: Option<&mut z_close_options_t>, ) -> result::z_result_t { #[allow(unused_mut)] - let mut close_builder = session.as_rust_type_mut().close(); + let mut close_builder = session.as_rust_type_mut().session.close(); #[cfg(feature = "unstable")] if let Some(options) = options { @@ -219,7 +245,7 @@ pub extern "C" fn z_close( #[no_mangle] pub extern "C" fn z_session_is_closed(session: &z_loaned_session_t) -> bool { let s = session.as_rust_type_ref(); - s.is_closed() + s.session.is_closed() } /// Closes and invalidates the session. diff --git a/src/subscriber.rs b/src/subscriber.rs index 173f3f016..fe9920672 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh team, // -use std::mem::MaybeUninit; +use std::{mem::MaybeUninit, ops::Deref}; use zenoh::{ handlers::Callback, @@ -27,13 +27,26 @@ use crate::{ result, transmute::{LoanedCTypeRef, RustTypeRef, RustTypeRefUninit, TakeRustType}, z_closure_sample_call, z_closure_sample_loan, z_loaned_session_t, z_moved_closure_sample_t, - zc_locality_default, zc_locality_t, + zc_locality_default, zc_locality_t, SgNotifier, SyncGroup, SyncObj, }; #[cfg(feature = "unstable")] use crate::{transmute::IntoCType, z_entity_global_id_t}; +pub(crate) struct CSubscriber { + pub(crate) subscriber: Subscriber<()>, + pub(crate) _sg: SyncGroup, +} + +impl Deref for CSubscriber { + type Target = Subscriber<()>; + + fn deref(&self) -> &Self::Target { + &self.subscriber + } +} + decl_c_type!( - owned(z_owned_subscriber_t, option Subscriber<()>), + owned(z_owned_subscriber_t, option CSubscriber), loaned(z_loaned_subscriber_t), ); @@ -82,16 +95,18 @@ pub(crate) fn _declare_subscriber_inner<'a, 'b>( session: &'a z_loaned_session_t, key_expr: &'b z_loaned_keyexpr_t, callback: &mut z_moved_closure_sample_t, + notifier: SgNotifier, options: Option<&mut z_subscriber_options_t>, ) -> SubscriberBuilder<'a, 'b, Callback> { let session = session.as_rust_type_ref(); let key_expr = key_expr.as_rust_type_ref(); let callback = callback.take_rust_type(); + let sync_callback = SyncObj::new(callback, notifier); let mut subscriber = session .declare_subscriber(key_expr) .callback(move |sample| { let mut owned_sample = Some(sample); - z_closure_sample_call(z_closure_sample_loan(&callback), unsafe { + z_closure_sample_call(z_closure_sample_loan(&sync_callback), unsafe { owned_sample .as_mut() .unwrap_unchecked() @@ -122,10 +137,14 @@ pub extern "C" fn z_declare_subscriber( options: Option<&mut z_subscriber_options_t>, ) -> result::z_result_t { let this = subscriber.as_rust_type_mut_uninit(); - let s = _declare_subscriber_inner(session, key_expr, callback, options); + let sg = SyncGroup::new(); + let s = _declare_subscriber_inner(session, key_expr, callback, sg.notifier(), options); match s.wait() { Ok(sub) => { - this.write(Some(sub)); + this.write(Some(CSubscriber { + subscriber: sub, + _sg: sg, + })); result::Z_OK } Err(e) => { @@ -152,7 +171,13 @@ pub extern "C" fn z_declare_background_subscriber( callback: &mut z_moved_closure_sample_t, options: Option<&mut z_subscriber_options_t>, ) -> result::z_result_t { - let subscriber = _declare_subscriber_inner(session, key_expr, callback, options); + let subscriber = _declare_subscriber_inner( + session, + key_expr, + callback, + session.as_rust_type_ref().notifier(), + options, + ); match subscriber.background().wait() { Ok(_) => result::Z_OK, Err(e) => { @@ -191,7 +216,7 @@ pub extern "C" fn z_internal_subscriber_check(this_: &z_owned_subscriber_t) -> b #[no_mangle] pub extern "C" fn z_undeclare_subscriber(this_: &mut z_moved_subscriber_t) -> result::z_result_t { if let Some(s) = this_.take_rust_type() { - if let Err(e) = s.undeclare().wait() { + if let Err(e) = s.subscriber.undeclare().wait() { crate::report_error!("{}", e); return result::Z_EGENERIC; } diff --git a/tests/z_int_callback_drop.c b/tests/z_int_callback_drop.c new file mode 100644 index 000000000..92c521c04 --- /dev/null +++ b/tests/z_int_callback_drop.c @@ -0,0 +1,159 @@ +// +// Copyright (c) 2025 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include + +#include "zenoh.h" + +#undef NDEBUG +#include +z_owned_mutex_t mu; + +typedef struct z_val_t { + int val; + bool dropped; +} z_val_t; + +void drop(void *arg) { + z_mutex_lock(z_loan_mut(mu)); + z_val_t *val = (z_val_t *)arg; + val->dropped = true; + z_mutex_unlock(z_loan_mut(mu)); +} + +void data_handler(z_loaned_sample_t *sample, void *arg) { + z_val_t *val = (z_val_t *)arg; + z_mutex_lock(z_loan_mut(mu)); + (val->val)++; + z_mutex_unlock(z_loan_mut(mu)); + z_sleep_s(5); + z_mutex_lock(z_loan_mut(mu)); + (val->val)++; + z_mutex_unlock(z_loan_mut(mu)); +} + +void query_handler(z_loaned_query_t *query, void *arg) { + z_val_t *val = (z_val_t *)arg; + z_mutex_lock(z_loan_mut(mu)); + (val->val)++; + z_mutex_unlock(z_loan_mut(mu)); + z_sleep_s(5); + z_mutex_lock(z_loan_mut(mu)); + (val->val)++; + z_mutex_unlock(z_loan_mut(mu)); + + z_view_keyexpr_t ke; + z_view_keyexpr_from_str(&ke, "test/query_callbacks_drop"); + z_owned_bytes_t p; + z_bytes_copy_from_str(&p, "reply"); + z_query_reply(query, z_loan(ke), z_move(p), NULL); +} + +void reply_handler(z_loaned_reply_t *reply, void *arg) { + z_val_t *val = (z_val_t *)arg; + z_mutex_lock(z_loan_mut(mu)); + (val->val)++; + z_mutex_unlock(z_loan_mut(mu)); + z_sleep_s(5); + z_mutex_lock(z_loan_mut(mu)); + (val->val)++; + z_mutex_unlock(z_loan_mut(mu)); +} + +void test_pub_sub() { + z_mutex_init(&mu); + z_view_keyexpr_t ke; + z_view_keyexpr_from_str(&ke, "test/callbacks_drop"); + + z_owned_config_t c1, c2; + z_owned_session_t s1, s2; + z_config_default(&c1); + z_config_default(&c2); + + assert(z_open(&s1, z_move(c1), NULL) == Z_OK); + assert(z_open(&s2, z_move(c2), NULL) == Z_OK); + + z_val_t val = {0, false}; + z_owned_subscriber_t sub; + z_owned_closure_sample_t callback; + z_closure(&callback, data_handler, drop, (void *)&val); + assert(z_declare_subscriber(z_loan(s1), &sub, z_loan(ke), z_move(callback), NULL) == Z_OK); + z_sleep_s(1); + z_owned_bytes_t p; + z_bytes_copy_from_str(&p, "data"); + assert(z_put(z_loan(s2), z_loan(ke), z_move(p), NULL) == Z_OK); + + z_sleep_s(1); + z_drop(z_move(sub)); + z_mutex_lock(z_loan_mut(mu)); + assert(val.dropped); + assert(val.val == 2); + z_mutex_unlock(z_loan_mut(mu)); + + z_drop(z_move(s1)); + z_drop(z_move(s2)); + z_drop(z_move(mu)); +} + +void test_query_reply() { + z_mutex_init(&mu); + z_view_keyexpr_t ke; + z_view_keyexpr_from_str(&ke, "test/query_callbacks_drop"); + + z_owned_config_t c1, c2; + z_owned_session_t s1, s2; + z_config_default(&c1); + z_config_default(&c2); + + assert(z_open(&s1, z_move(c1), NULL) == Z_OK); + assert(z_open(&s2, z_move(c2), NULL) == Z_OK); + + z_val_t query_val = {0, false}; + z_val_t reply_val = {0, false}; + + z_owned_queryable_t q; + z_owned_closure_query_t q_callback; + z_closure(&q_callback, query_handler, drop, (void *)&query_val); + + assert(z_declare_queryable(z_loan(s1), &q, z_loan(ke), z_move(q_callback), NULL) == Z_OK); + z_sleep_s(1); + + z_owned_closure_reply_t r_callback; + z_closure(&r_callback, reply_handler, drop, (void *)&reply_val); + assert(z_get(z_loan(s2), z_loan(ke), "", z_move(r_callback), NULL) == Z_OK); + + z_sleep_s(1); + z_drop(z_move(q)); + z_mutex_lock(z_loan_mut(mu)); + assert(query_val.dropped); + assert(query_val.val == 2); + z_mutex_unlock(z_loan_mut(mu)); + + z_sleep_s(1); + z_drop(z_move(s2)); + z_mutex_lock(z_loan_mut(mu)); + assert(reply_val.dropped); + assert(reply_val.val == 2); + z_mutex_unlock(z_loan_mut(mu)); + + z_drop(z_move(s1)); + z_drop(z_move(mu)); +} + +int main(int argc, char **argv) { + test_pub_sub(); + test_query_reply(); +}