Skip to content

Commit 5e99c92

Browse files
authored
Upgrade pravega client
Signed-off-by: Luis Liu <[email protected]>
2 parents f712f8e + 1374ee1 commit 5e99c92

File tree

18 files changed

+246
-168
lines changed

18 files changed

+246
-168
lines changed

Cargo.lock

Lines changed: 90 additions & 64 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ gst-rtsp-server = { package = "gstreamer-rtsp-server", git = "https://gitlab.fre
3333
gst-sdp = { package = "gstreamer-sdp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
3434
gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
3535
gtk = { git = "https://github.com/gtk-rs/gtk-rs" }
36-
pravega-client = { git = "https://github.com/pravega/pravega-client-rust", rev = "08338688fbea790f9bc947cd8cfb9a90f13b6498" }
37-
pravega-client-config = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-config", rev = "08338688fbea790f9bc947cd8cfb9a90f13b6498" }
38-
pravega-client-shared = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-shared", rev = "08338688fbea790f9bc947cd8cfb9a90f13b6498" }
36+
pravega-client = { git = "https://github.com/pravega/pravega-client-rust", rev = "17deb48bbdb9b0180e93942d5e0e9218b553f77b" }
37+
pravega-client-config = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-config", rev = "17deb48bbdb9b0180e93942d5e0e9218b553f77b" }
38+
pravega-client-shared = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-shared", rev = "17deb48bbdb9b0180e93942d5e0e9218b553f77b" }
3939
pravega-video = { path = "../pravega-video" }
4040
log = "0.4"
4141
serde = "1"

apps/src/bin/pravega-tools.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use pravega_client::client_factory::ClientFactory;
1717
use pravega_client_config::ClientConfigBuilder;
1818
use pravega_client_shared::{Scope, Stream, ScopedStream};
1919
use pravega_video::index::{IndexSearcher, SearchMethod, get_index_stream_name};
20-
use pravega_video::utils::parse_controller_uri;
2120
use pravega_video::timestamp::PravegaTimestamp;
21+
use pravega_video::utils::{parse_controller_uri, SyncByteReader};
2222

2323
/// Tools to manage Pravega streams.
2424
#[derive(Clap)]
@@ -80,14 +80,14 @@ fn truncate_stream(controller: String, scope_name: String, stream_name: String,
8080
scope: scope.clone(),
8181
stream: stream.clone(),
8282
};
83-
let writer = client_factory.create_byte_writer(scoped_stream);
83+
let writer = runtime.block_on(client_factory.create_byte_writer(scoped_stream));
8484
let index_scoped_stream = ScopedStream {
8585
scope: scope.clone(),
8686
stream: index_stream.clone(),
8787
};
88-
let index_writer = client_factory.create_byte_writer(index_scoped_stream.clone());
89-
let index_reader = client_factory.create_byte_reader(index_scoped_stream.clone());
90-
let mut index_searcher = IndexSearcher::new(index_reader);
88+
let index_writer = runtime.block_on(client_factory.create_byte_writer(index_scoped_stream.clone()));
89+
let index_reader = runtime.block_on(client_factory.create_byte_reader(index_scoped_stream.clone()));
90+
let mut index_searcher = IndexSearcher::new(SyncByteReader::new(index_reader, client_factory.runtime_handle()));
9191
let index_record = index_searcher.search_timestamp_and_return_index_offset(
9292
truncate_at_timestamp, SearchMethod::Before).unwrap();
9393
println!("Truncating prior to {:?}", index_record);

apps/src/bin/pravega_event_test1.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use clap::Clap;
1212
use log::info;
1313

1414
use std::convert::TryInto;
15-
use std::io::{Write};
1615
use uuid::Uuid;
1716

1817
use pravega_client::client_factory::ClientFactory;
@@ -79,10 +78,9 @@ fn main() {
7978
let num_events: u64 = 3;
8079

8180
if opts.use_byte_stream_writer {
82-
let client_factory = client_factory.clone();
8381
let scoped_stream = scoped_stream.clone();
84-
runtime.spawn_blocking(move || {
85-
let mut writer = client_factory.create_byte_writer(scoped_stream);
82+
let mut writer = client_factory.create_byte_writer(scoped_stream).await;
83+
runtime.spawn(async move{
8684
for i in 0..num_events {
8785
let payload = format!("event {}", i).into_bytes();
8886
let payload_length = payload.len();
@@ -92,7 +90,7 @@ fn main() {
9290
bytes_to_write[4..8].copy_from_slice(&event_length.to_be_bytes()[..]);
9391
bytes_to_write[8..8+payload_length].copy_from_slice(&payload[..]);
9492
info!("bytes_to_write={:?}", bytes_to_write);
95-
writer.write_all(&bytes_to_write).unwrap();
93+
writer.write(&bytes_to_write).await.unwrap();
9694
}
9795
});
9896
} else {
@@ -108,11 +106,11 @@ fn main() {
108106

109107
// create event stream reader
110108
let reader_group_name = format!("rg{}", uuid::Uuid::new_v4()).to_string();
111-
let rg = client_factory.create_reader_group(scope, reader_group_name, scoped_stream).await;
109+
let rg = client_factory.create_reader_group(reader_group_name, scoped_stream).await;
112110
let mut reader = rg.create_reader("r1".to_string()).await;
113111

114112
// read from segment
115-
let mut slice = reader.acquire_segment().await.expect("acquire segment");
113+
let mut slice = reader.acquire_segment().await.expect("acquire segment").unwrap();
116114
for i in 0..num_events {
117115
let read_event = slice.next();
118116
info!("read_event={:?}", read_event);

apps/src/bin/pravega_event_test2.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ struct Opts {
3333
stream: String,
3434
}
3535

36-
/// Demonstrate ability to write using the byte stream writer and read using the event reader.
36+
/// Demonstrate ability to write using the event stream writer and read using the event reader.
3737
fn main() {
3838
env_logger::init();
3939
let opts: Opts = Opts::parse();
@@ -96,11 +96,11 @@ fn main() {
9696
runtime.block_on(async {
9797
// create event stream reader
9898
let reader_group_name = format!("rg{}", uuid::Uuid::new_v4()).to_string();
99-
let rg = client_factory.create_reader_group(scope, reader_group_name, scoped_stream).await;
99+
let rg = client_factory.create_reader_group(reader_group_name, scoped_stream).await;
100100
let mut reader = rg.create_reader("r1".to_string()).await;
101101

102102
// read from segment
103-
let mut slice = reader.acquire_segment().await.expect("acquire segment");
103+
let mut slice = reader.acquire_segment().await.expect("acquire segment").unwrap();
104104
for _ in 0..num_events {
105105
let read_event = slice.next();
106106
info!("read_event={:?}", read_event);

apps/src/bin/pravega_retention_checker.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ use std::{thread, time};
1515
use pravega_client::client_factory::ClientFactory;
1616
use pravega_client_shared::{Scope, Stream, ScopedStream};
1717

18+
use pravega_video::index::IndexSearcher;
1819
use pravega_video::utils;
19-
use pravega_video::index::{IndexSearcher};
20+
use pravega_video::utils::SyncByteReader;
2021

2122
#[derive(Clap)]
2223
struct Opts {
@@ -56,8 +57,9 @@ fn main() {
5657
scope: scope,
5758
stream: stream,
5859
};
59-
let index_reader = client_factory.create_byte_reader(index_scoped_stream);
60-
let mut index_searcher = IndexSearcher::new(index_reader);
60+
let runtime = client_factory.runtime();
61+
let index_reader = runtime.block_on(client_factory.create_byte_reader(index_scoped_stream));
62+
let mut index_searcher = IndexSearcher::new(SyncByteReader::new(index_reader, client_factory.runtime_handle()));
6163
let check_period = time::Duration::from_secs(opts.check_period);
6264

6365
info!("Checking period is {} seconds", opts.check_period);

deepstream/pravega_protocol_adapter/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ edition = "2018"
1818

1919
[dependencies]
2020
anyhow = "1"
21-
pravega-client = { git = "https://github.com/pravega/pravega-client-rust", rev = "08338688fbea790f9bc947cd8cfb9a90f13b6498" }
22-
pravega-client-config = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-config", rev = "08338688fbea790f9bc947cd8cfb9a90f13b6498" }
23-
pravega-client-shared = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-shared", rev = "08338688fbea790f9bc947cd8cfb9a90f13b6498" }
21+
pravega-client = { git = "https://github.com/pravega/pravega-client-rust", rev = "17deb48bbdb9b0180e93942d5e0e9218b553f77b" }
22+
pravega-client-config = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-config", rev = "17deb48bbdb9b0180e93942d5e0e9218b553f77b" }
23+
pravega-client-shared = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-shared", rev = "17deb48bbdb9b0180e93942d5e0e9218b553f77b" }
2424
pravega-video = { path = "../../pravega-video" }
2525
tracing = "0.1"
2626
tracing-subscriber = "0.2"

deepstream/pravega_protocol_adapter/src/lib.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::fmt;
2121
use std::os::raw::c_char;
2222
use std::ptr;
2323
use std::sync::{Arc, Once};
24+
use tokio::runtime::Handle;
2425
use tokio::sync::Mutex;
2526
use tracing::{debug, error, info, trace};
2627
use tracing_subscriber::fmt::format::FmtSpan;
@@ -134,16 +135,16 @@ impl fmt::Debug for EventWriterPool {
134135
}
135136

136137
pub struct NvDsPravegaClientHandle {
137-
pub client_factory: ClientFactory,
138+
pub runtime_handle: Handle,
138139
pub writer_pool: EventWriterPool,
139140
pub routing_key_method: RoutingKeyMethod,
140141
}
141142

142143
impl NvDsPravegaClientHandle {
143144
pub fn new(client_factory: ClientFactory, routing_key_method: RoutingKeyMethod) -> Self {
144145
NvDsPravegaClientHandle {
145-
client_factory: client_factory.clone(),
146-
writer_pool: EventWriterPool::new(client_factory.clone()),
146+
runtime_handle: client_factory.runtime_handle(),
147+
writer_pool: EventWriterPool::new(client_factory),
147148
routing_key_method,
148149
}
149150
}
@@ -255,13 +256,12 @@ pub extern "C" fn nvds_msgapi_send(h_ptr: *mut NvDsPravegaClientHandle, topic: *
255256
let payload_string = String::from_utf8_lossy(payload);
256257
trace!("nvds_msgapi_send: payload_string={}", payload_string);
257258
let scoped_stream = client_handle.resolve_topic(topic).unwrap();
258-
let runtime = client_handle.client_factory.runtime();
259259
let routing_key_method = client_handle.routing_key_method.clone();
260260
let routing_key = match routing_key_method {
261261
RoutingKeyMethod::Fixed { routing_key } => routing_key,
262262
};
263263
debug!("nvds_msgapi_send: routing_key={:?}", routing_key);
264-
let result = runtime.block_on(async {
264+
let result = client_handle.runtime_handle.block_on(async {
265265
// Get a reference to the writer for this topic from the writer pool.
266266
let writer = client_handle.writer_pool.get_or_create(scoped_stream).await;
267267
// Get the mutex for this writer so we can use it.
@@ -312,15 +312,14 @@ pub extern "C" fn nvds_msgapi_send_async(
312312
// Convert unsafe payload to a vector. This also copies the payload which is critical to avoid memory corruption.
313313
let event = payload.to_vec();
314314
let scoped_stream = client_handle.resolve_topic(topic).unwrap();
315-
let runtime = client_handle.client_factory.runtime();
316315
let routing_key_method = client_handle.routing_key_method.clone();
317316
let routing_key = match routing_key_method {
318317
RoutingKeyMethod::Fixed { routing_key } => routing_key,
319318
};
320319
debug!("nvds_msgapi_send_async: routing_key={:?}", routing_key);
321320
// Spawn a task in the Tokio runtime that will write the event, wait for it to be durably persisted,
322321
// and then call the callback function.
323-
runtime.spawn(async move {
322+
client_handle.runtime_handle.spawn(async move {
324323
// Get a reference to the writer for this topic from the writer pool.
325324
let writer = client_handle.writer_pool.get_or_create(scoped_stream).await;
326325
// Get the mutex for this writer so we can use it.

gst-plugin-pravega/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ glib = { git = "https://github.com/gtk-rs/gtk-rs" }
2323
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
2424
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
2525
once_cell = "1"
26-
pravega-client = { git = "https://github.com/pravega/pravega-client-rust", rev = "08338688fbea790f9bc947cd8cfb9a90f13b6498" }
27-
pravega-client-config = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-config", rev = "08338688fbea790f9bc947cd8cfb9a90f13b6498" }
28-
pravega-client-shared = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-shared", rev = "08338688fbea790f9bc947cd8cfb9a90f13b6498" }
26+
pravega-client = { git = "https://github.com/pravega/pravega-client-rust", rev = "17deb48bbdb9b0180e93942d5e0e9218b553f77b" }
27+
pravega-client-config = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-config", rev = "17deb48bbdb9b0180e93942d5e0e9218b553f77b" }
28+
pravega-client-shared = { git = "https://github.com/pravega/pravega-client-rust", package = "pravega-client-shared", rev = "17deb48bbdb9b0180e93942d5e0e9218b553f77b" }
2929
pravega-video = { path = "../pravega-video" }
3030
serde = "1"
3131
serde_json = "1"
32+
tokio = { version = "1", features = ["full"] }
3233

3334
[lib]
3435
name = "gstpravega"

0 commit comments

Comments
 (0)