Skip to content
Open
104 changes: 72 additions & 32 deletions build-resources/opaque-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
thread::JoinHandle,
};

use flume::{Receiver, Sender};
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::{
zshm, zshmmut, AllocLayout, ChunkAllocResult, ChunkDescriptor, MemoryLayout,
Expand Down Expand Up @@ -50,6 +51,13 @@ macro_rules! get_opaque_type_data {
};
}

type SgNotifier = Sender<()>;

pub(crate) struct SyncGroup {
waiter: Receiver<()>,
notifier: Option<SgNotifier>,
}

/// A Zenoh data.
///
/// To minimize copies and reallocations, Zenoh may provide data in several separate buffers.
Expand Down Expand Up @@ -124,19 +132,29 @@ get_opaque_type_data!(Option<Query>, z_owned_query_t);
/// A loaned Zenoh query.
get_opaque_type_data!(Query, z_loaned_query_t);

struct CQueryable {
queryable: Queryable<()>,
sg: SyncGroup,
}

/// An owned Zenoh <a href="https://zenoh.io/docs/manual/abstractions/#queryable"> queryable </a>.
///
/// Responds to queries sent via `z_get()` with intersecting key expression.
get_opaque_type_data!(Option<Queryable<()>>, z_owned_queryable_t);
get_opaque_type_data!(Option<CQueryable>, z_owned_queryable_t);
/// A loaned Zenoh queryable.
get_opaque_type_data!(Queryable<()>, z_loaned_queryable_t);
get_opaque_type_data!(CQueryable, z_loaned_queryable_t);

struct CQuerier {
querier: Querier<'static>,
sg: SyncGroup,
}

/// An owned Zenoh querier.
///
/// Sends queries to matching queryables.
get_opaque_type_data!(Option<Querier>, z_owned_querier_t);
get_opaque_type_data!(Option<CQuerier>, z_owned_querier_t);
/// A loaned Zenoh queryable.
get_opaque_type_data!(Querier, z_loaned_querier_t);
get_opaque_type_data!(CQuerier, z_loaned_querier_t);

#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
Expand All @@ -156,51 +174,55 @@ get_opaque_type_data!(
ze_loaned_querying_subscriber_t
);

#[cfg(feature = "unstable")]
struct CAdvancedSubscriber {
subscriber: zenoh_ext::AdvancedSubscriber<()>,
sg: SyncGroup,
}

#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief An owned Zenoh advanced subscriber.
///
/// In addition to receiving the data it is subscribed to,
/// it is also able to receive notifications regarding missed samples and/or automatically recover them.
get_opaque_type_data!(
Option<zenoh_ext::AdvancedSubscriber<()>>,
ze_owned_advanced_subscriber_t
);
get_opaque_type_data!(Option<CAdvancedSubscriber>, ze_owned_advanced_subscriber_t);
#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief A loaned Zenoh advanced subscriber.
get_opaque_type_data!(
zenoh_ext::AdvancedSubscriber<()>,
ze_loaned_advanced_subscriber_t
);
get_opaque_type_data!(CAdvancedSubscriber, ze_loaned_advanced_subscriber_t);

#[cfg(feature = "unstable")]
struct CSampleMissListener {
listener: zenoh_ext::SampleMissListener<()>,
sg: SyncGroup,
}

#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief An owned Zenoh sample miss listener. Missed samples can only be detected from advanced publishers, enabling sample miss detection.
///
/// A listener that sends notification when the advanced subscriber misses a sample .
/// Dropping the corresponding subscriber, also drops the listener.
get_opaque_type_data!(
Option<zenoh_ext::SampleMissListener<()>>,
ze_owned_sample_miss_listener_t
);
get_opaque_type_data!(Option<CSampleMissListener>, ze_owned_sample_miss_listener_t);

#[cfg(feature = "unstable")]
struct CAdvancedPublisher {
publisher: zenoh_ext::AdvancedPublisher<'static>,
sg: SyncGroup,
}

#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief An owned Zenoh advanced publisher.
///
/// In addition to publishing the data,
/// it also maintains the storage, allowing matching subscribers to retrive missed samples.
get_opaque_type_data!(
Option<zenoh_ext::AdvancedPublisher<'static>>,
ze_owned_advanced_publisher_t
);
get_opaque_type_data!(Option<CAdvancedPublisher>, ze_owned_advanced_publisher_t);
#[cfg(feature = "unstable")]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief A loaned Zenoh advanced publisher.
get_opaque_type_data!(
zenoh_ext::AdvancedPublisher<'static>,
ze_loaned_advanced_publisher_t
);
get_opaque_type_data!(CAdvancedPublisher, ze_loaned_advanced_publisher_t);
/// A Zenoh-allocated <a href="https://zenoh.io/docs/manual/abstractions/#key-expression"> key expression </a>.
///
/// Key expressions can identify a single key or a set of keys.
Expand Down Expand Up @@ -232,10 +254,15 @@ get_opaque_type_data!(Option<KeyExpr<'static>>, z_view_keyexpr_t);
/// both for local processing and network-wise.
get_opaque_type_data!(KeyExpr<'static>, z_loaned_keyexpr_t);

struct CSession {
session: Session,
sg: SyncGroup,
}

/// An owned Zenoh session.
get_opaque_type_data!(Option<Session>, z_owned_session_t);
get_opaque_type_data!(Option<CSession>, z_owned_session_t);
/// A loaned Zenoh session.
get_opaque_type_data!(Session, z_loaned_session_t);
get_opaque_type_data!(CSession, z_loaned_session_t);

#[cfg(feature = "unstable")]
/// An owned Close handle
Expand All @@ -259,24 +286,38 @@ get_opaque_type_data!(ZenohId, z_id_t);
/// It consists of a time generated by a Hybrid Logical Clock (HLC) in NPT64 format and a unique zenoh identifier.
get_opaque_type_data!(Timestamp, z_timestamp_t);

struct CPublisher {
publisher: Publisher<'static>,
sg: SyncGroup,
}

/// An owned Zenoh <a href="https://zenoh.io/docs/manual/abstractions/#publisher"> publisher </a>.
get_opaque_type_data!(Option<Publisher<'static>>, z_owned_publisher_t);
get_opaque_type_data!(Option<CPublisher>, z_owned_publisher_t);
/// A loaned Zenoh publisher.
get_opaque_type_data!(Publisher<'static>, z_loaned_publisher_t);
get_opaque_type_data!(CPublisher, z_loaned_publisher_t);

struct CMatchingListener {
listener: MatchingListener<()>,
sg: SyncGroup,
}

/// @brief An owned Zenoh matching listener.
///
/// A listener that sends notifications when the [`MatchingStatus`] of a publisher or querier changes.
/// Dropping the corresponding publisher, also drops matching listener.
get_opaque_type_data!(Option<MatchingListener<()>>, z_owned_matching_listener_t);
get_opaque_type_data!(Option<CMatchingListener>, z_owned_matching_listener_t);

struct CSubscriber {
subscriber: Subscriber<()>,
sg: SyncGroup,
}
/// An owned Zenoh <a href="https://zenoh.io/docs/manual/abstractions/#subscriber"> subscriber </a>.
///
/// Receives data from publication on intersecting key expressions.
/// Destroying the subscriber cancels the subscription.
get_opaque_type_data!(Option<Subscriber<()>>, z_owned_subscriber_t);
get_opaque_type_data!(Option<CSubscriber>, z_owned_subscriber_t);
/// A loaned Zenoh subscriber.
get_opaque_type_data!(Subscriber<()>, z_loaned_subscriber_t);
get_opaque_type_data!(CSubscriber, z_loaned_subscriber_t);

/// @brief A liveliness token that can be used to provide the network with information about connectivity to its
/// declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key
Expand Down Expand Up @@ -362,7 +403,6 @@ get_opaque_type_data!(MemoryLayout, z_loaned_memory_layout_t);
/// @brief An owned ChunkAllocResult.
get_opaque_type_data!(Option<ChunkAllocResult>, z_owned_chunk_alloc_result_t);
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief An owned ZShm slice.
get_opaque_type_data!(Option<ZShm>, z_owned_shm_t);
Expand Down
10 changes: 0 additions & 10 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -1142,16 +1142,6 @@ typedef struct ze_moved_closure_miss_t {
struct ze_owned_closure_miss_t _this;
} ze_moved_closure_miss_t;
#endif
/**
* @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
* @brief An owned Zenoh sample miss listener. Missed samples can only be detected from advanced publishers, enabling sample miss detection.
*
* A listener that sends notification when the advanced subscriber misses a sample .
* Dropping the corresponding subscriber, also drops the listener.
*/
typedef struct ALIGN(8) ze_owned_sample_miss_listener_t {
uint8_t _0[24];
} ze_owned_sample_miss_listener_t;
typedef struct ze_moved_advanced_subscriber_t {
struct ze_owned_advanced_subscriber_t _this;
} ze_moved_advanced_subscriber_t;
Expand Down
1 change: 1 addition & 0 deletions splitguide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ zenoh_opaque.h:
- ze_loaned_advanced_subscriber_t!#unstable
- ze_owned_advanced_publisher_t!#unstable
- ze_loaned_advanced_publisher_t!#unstable
- ze_owned_sample_miss_listener_t!#unstable
- z_owned_keyexpr_t!
- z_view_keyexpr_t!
- z_loaned_keyexpr_t!
Expand Down
48 changes: 39 additions & 9 deletions src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh team, <[email protected]>
//

use std::{mem::MaybeUninit, time::Duration};
use std::{mem::MaybeUninit, ops::Deref, time::Duration};

use zenoh::{
handlers::Callback,
Expand All @@ -30,7 +30,8 @@ use crate::{
z_closure_matching_status_call, z_closure_matching_status_loan, z_congestion_control_t,
z_entity_global_id_t, z_loaned_keyexpr_t, z_loaned_session_t, z_matching_status_t,
z_moved_bytes_t, z_moved_closure_matching_status_t, z_owned_matching_listener_t, z_priority_t,
z_publisher_delete_options_t, z_publisher_options_t, z_publisher_put_options_t,
z_publisher_delete_options_t, z_publisher_options_t, z_publisher_put_options_t, SgNotifier,
SyncGroup,
};

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
Expand Down Expand Up @@ -189,8 +190,22 @@ pub extern "C" fn ze_advanced_publisher_options_default(
pub use crate::opaque_types::{
ze_loaned_advanced_publisher_t, ze_moved_advanced_publisher_t, ze_owned_advanced_publisher_t,
};

pub(crate) struct CAdvancedPublisher {
publisher: zenoh_ext::AdvancedPublisher<'static>,
sg: SyncGroup,
}

impl Deref for CAdvancedPublisher {
type Target = zenoh_ext::AdvancedPublisher<'static>;

fn deref(&self) -> &Self::Target {
&self.publisher
}
}

decl_c_type!(
owned(ze_owned_advanced_publisher_t, option zenoh_ext::AdvancedPublisher<'static>),
owned(ze_owned_advanced_publisher_t, option CAdvancedPublisher),
loaned(ze_loaned_advanced_publisher_t),
);

Expand Down Expand Up @@ -242,7 +257,10 @@ pub extern "C" fn ze_declare_advanced_publisher(
result::Z_EGENERIC
}
Ok(publisher) => {
this.write(Some(publisher));
this.write(Some(CAdvancedPublisher {
publisher,
sg: SyncGroup::new(),
}));
result::Z_OK
}
}
Expand Down Expand Up @@ -417,16 +435,20 @@ pub extern "C" fn ze_advanced_publisher_keyexpr(
fn _advanced_publisher_matching_listener_declare_inner<'a>(
publisher: &'a ze_loaned_advanced_publisher_t,
callback: &mut z_moved_closure_matching_status_t,
notifier: SgNotifier,
) -> zenoh::matching::MatchingListenerBuilder<'a, Callback<MatchingStatus>> {
use crate::SyncObj;

let publisher = publisher.as_rust_type_ref();
let callback = callback.take_rust_type();
let sync_callback = SyncObj::new(callback, notifier);
let listener = publisher
.matching_listener()
.callback_mut(move |matching_status| {
let status = z_matching_status_t {
matching: matching_status.matching(),
};
z_closure_matching_status_call(z_closure_matching_status_loan(&callback), &status);
z_closure_matching_status_call(z_closure_matching_status_loan(&sync_callback), &status);
});
listener
}
Expand All @@ -446,11 +468,15 @@ pub extern "C" fn ze_advanced_publisher_declare_matching_listener(
matching_listener: &mut MaybeUninit<z_owned_matching_listener_t>,
callback: &mut z_moved_closure_matching_status_t,
) -> result::z_result_t {
use crate::{CMatchingListener, SyncGroup};

let this = matching_listener.as_rust_type_mut_uninit();
let listener = _advanced_publisher_matching_listener_declare_inner(publisher, callback);
let sg = SyncGroup::new();
let listener =
_advanced_publisher_matching_listener_declare_inner(publisher, callback, sg.notifier());
match listener.wait() {
Ok(listener) => {
this.write(Some(listener));
this.write(Some(CMatchingListener { listener, _sg: sg }));
result::Z_OK
}
Err(e) => {
Expand All @@ -475,7 +501,11 @@ pub extern "C" fn ze_advanced_publisher_declare_background_matching_listener(
publisher: &'static ze_loaned_advanced_publisher_t,
callback: &mut z_moved_closure_matching_status_t,
) -> result::z_result_t {
let listener = _advanced_publisher_matching_listener_declare_inner(publisher, callback);
let listener = _advanced_publisher_matching_listener_declare_inner(
publisher,
callback,
publisher.as_rust_type_ref().sg.notifier(),
);
match listener.background().wait() {
Ok(_) => result::Z_OK,
Err(e) => {
Expand Down Expand Up @@ -528,7 +558,7 @@ pub extern "C" fn ze_undeclare_advanced_publisher(
this_: &mut ze_moved_advanced_publisher_t,
) -> result::z_result_t {
if let Some(p) = this_.take_rust_type() {
if let Err(e) = p.undeclare().wait() {
if let Err(e) = p.publisher.undeclare().wait() {
crate::report_error!("{}", e);
return result::Z_ENETWORK;
}
Expand Down
Loading
Loading