Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
528 changes: 231 additions & 297 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 18 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,37 @@ salvo = { version = "0.81.0", features = [
"serve-static",
] }
serde = "1.0.219"
socketioxide = { version = "0.17.1", features = [
socketioxide = { git = "https://github.com/Totodore/socketioxide.git", rev = "ba71aa5f07ca72a22ae0ecaf3e026ea0ec114963", features = [
"extensions",
"state",
"msgpack",
] }
socketioxide-redis = { version = "0.2.2", features = ["redis-cluster"] }
tokio = { version = "1.46.1", features = ["full"] }
socketioxide-redis = { git = "https://github.com/Totodore/socketioxide.git", rev = "ba71aa5f07ca72a22ae0ecaf3e026ea0ec114963", features = [
"redis-cluster",
] }
tokio = { version = "1.47.0", features = ["full"] }
tokio-util = "0.7.15"
tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.4", features = ["cors", "fs", "auth"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "fmt"] }
chrono = { version = "0.4.41", features = ["serde"] }
diesel = { version = "2.2.12", features = ["postgres", "r2d2", "chrono"] }
# diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
dotenvy = "0.15.7"
time = "0.3.41"
jsonwebtoken = "9.3.1"
anyhow = "1.0.98"
thiserror = "2.0.12"
validator = "0.20.0"
validator_derive = "0.20.0"
aws-sdk-s3 = { version = "1.98.0", features = ["rt-tokio"] }
aws-config = { version = "1.8.2", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.100.0", features = ["rt-tokio"] }
aws-config = { version = "1.8.3", features = ["behavior-version-latest"] }
aws-credential-types = "1.2.4"
rustls = { version = "0.23.27", features = ["ring"] }
rustls = { version = "0.23.30", features = ["ring"] }
nanoid = "0.4.0"
rand = "0.9.2"
serde_json = "1.0.141"
serde_repr = "0.1.20"
bcrypt = "0.17.0"
async-channel = "2.5.0"
rust-embed = "8.7.2"
Expand Down Expand Up @@ -88,14 +90,20 @@ moq-gst = { git = "https://github.com/waterbustech/moq-gst.git", branch = "main"
gst-plugin-fmp4 = "0.14.0"
prost = "0.13.5"
tonic = "0.13.1"
etcd-client = "0.15.0"
etcd-client = "0.16.1"
sysinfo = "0.36.1"
futures-util = "0.3.31"
redis = { version = "0.32.4", features = ["cluster"] }
redis = { version = "0.32.4", features = [
"cluster",
"tls-native-tls",
"tokio-native-tls-comp",
] }
futures = "0.3.31"
crossbeam = "0.8.4"
crossbeam-channel = "0.5.15"
mimalloc = "0.1.46"
bytes = "1.10.1"
reqwest = { version = "0.12.22", features = ["json"] }
kanal = "0.1.1"

# Local crates
waterbus-proto = { path = "./crates/waterbus-proto" }
Expand Down
5 changes: 5 additions & 0 deletions crates/dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ serde_json = { workspace = true }
serde = { workspace = true }
futures-util = { workspace = true }
redis = { workspace = true }

[features]
default = []

redis-cluster = []
18 changes: 17 additions & 1 deletion crates/dispatcher/src/application/sfu_grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use waterbus_proto::{
LeaveRoomRequest, LeaveRoomResponse, MigratePublisherRequest, MigratePublisherResponse,
PublisherRenegotiationRequest, PublisherRenegotiationResponse, SetCameraType,
SetEnabledRequest, SetScreenSharingRequest, SetSubscriberSdpRequest, StatusResponse,
SubscribeRequest, SubscribeResponse, sfu_service_client::SfuServiceClient,
SubscribeHlsLiveStreamRequest, SubscribeHlsLiveStreamResponse, SubscribeRequest,
SubscribeResponse, sfu_service_client::SfuServiceClient,
};

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -45,6 +46,21 @@ impl SfuGrpcClient {
Ok(response)
}

pub async fn subscribe_hls_live_stream(
&self,
server_address: String,
request: SubscribeHlsLiveStreamRequest,
) -> Result<tonic::Response<SubscribeHlsLiveStreamResponse>, tonic::Status> {
let mut client = self
.get_client(server_address)
.await
.map_err(|e| Status::unavailable(format!("Failed to connect to SFU: {e}")))?;
let response = client
.subscribe_hls_live_stream(Request::new(request))
.await?;
Ok(response)
}

pub async fn set_subscriber_sdp(
&self,
server_address: String,
Expand Down
38 changes: 37 additions & 1 deletion crates/dispatcher/src/dispatcher_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use waterbus_proto::{
AddPublisherCandidateRequest, AddSubscriberCandidateRequest, JoinRoomRequest, JoinRoomResponse,
LeaveRoomRequest, MigratePublisherRequest, MigratePublisherResponse,
PublisherRenegotiationRequest, PublisherRenegotiationResponse, SetCameraType,
SetEnabledRequest, SetScreenSharingRequest, SetSubscriberSdpRequest, SubscribeRequest,
SetEnabledRequest, SetScreenSharingRequest, SetSubscriberSdpRequest,
SubscribeHlsLiveStreamRequest, SubscribeHlsLiveStreamResponse, SubscribeRequest,
SubscribeResponse,
};

Expand Down Expand Up @@ -130,6 +131,41 @@ impl DispatcherManager {
}
}

pub async fn subscribe_hls_live_stream(
&self,
req: SubscribeHlsLiveStreamRequest,
) -> Result<SubscribeHlsLiveStreamResponse, anyhow::Error> {
let client = self.cache_manager.get_by_participant_id(&req.target_id);

match client {
Ok(client) => {
if let Some(client) = client {
let node_id = client.sfu_node_id;
let node_addr = client.node_addr;

let server_addr = format!("{}:{}", node_addr, self.sfu_port);

let response = self
.sfu_grpc_client
.subscribe_hls_live_stream(server_addr, req)
.await;

match response {
Ok(resp) => Ok(resp.into_inner()),
Err(e) => Err(anyhow::anyhow!(
"Failed to join room on node {}: {}",
node_id,
e
)),
}
} else {
Err(anyhow::anyhow!("Client not found!"))
}
}
Err(_) => Err(anyhow::anyhow!("Client not found!")),
}
}

pub async fn set_subscribe_sdp(
&self,
req: SetSubscriberSdpRequest,
Expand Down
28 changes: 25 additions & 3 deletions crates/dispatcher/src/infrastructure/cache/cache_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
use redis::{Commands, cluster::ClusterClient};
use redis::Commands;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};

#[cfg(not(feature = "redis-cluster"))]
use redis::Client;
#[cfg(feature = "redis-cluster")]
use redis::cluster::ClusterClient;

#[cfg(feature = "redis-cluster")]
type DefaultClient = ClusterClient;

#[cfg(not(feature = "redis-cluster"))]
type DefaultClient = Client;

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ClientMetadata {
pub room_id: String,
Expand All @@ -23,12 +34,23 @@ impl CacheKey {

#[derive(Clone)]
pub struct CacheManager {
client: Arc<Mutex<ClusterClient>>,
client: Arc<Mutex<DefaultClient>>,
}

impl CacheManager {
pub fn new(urls: Vec<String>) -> Self {
let client = ClusterClient::new(urls).unwrap();
let client: DefaultClient = {
#[cfg(feature = "redis-cluster")]
{
ClusterClient::new(urls).expect("Failed to create ClusterClient")
}

#[cfg(not(feature = "redis-cluster"))]
{
Client::open(urls.first().unwrap().as_str()).expect("Failed to create Redis Client")
}
};

Self {
client: Arc::new(Mutex::new(client)),
}
Expand Down
21 changes: 14 additions & 7 deletions crates/egress-manager/src/egress/hls_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,22 @@ pub struct HlsWriter {
start_time: Instant,
video_offset: Arc<Mutex<u64>>,
audio_offset: Arc<Mutex<u64>>,
pub hls_url: String,
}

impl HlsWriter {
pub async fn new(dir: &str, prefix_path: String) -> Result<Self, anyhow::Error> {
pub fn new(dir: &str, prefix_path: &str) -> Result<Self, anyhow::Error> {
init()?;

let path = PathBuf::from(dir);
let pipeline = gst::Pipeline::default();
std::fs::create_dir_all(&path).expect("failed to create directory");

let r2_config: Option<R2Config> = Self::_get_r2_config(prefix_path);
let r2_config: Option<R2Config> = Self::_get_r2_config(prefix_path.to_string());

let (r2_storage, master_state) = if let Some(config) = r2_config {
let (r2_storage, master_state, cloud_url_base) = if let Some(config) = r2_config {
// Use new_with_worker instead of new
let (r2_storage, upload_receiver) = R2Storage::new_with_worker(config.clone()).await?;
let (r2_storage, upload_receiver) = R2Storage::new_with_worker(config.clone())?;
let r2_storage = Arc::new(r2_storage);

// Start the upload worker
Expand All @@ -65,9 +66,9 @@ impl HlsWriter {
cloud_url_base.clone(),
)));

(Some(r2_storage), Some(master_state))
(Some(r2_storage), Some(master_state), cloud_url_base)
} else {
(None, None)
(None, None, None)
};

let mut manifest_path = path.clone();
Expand Down Expand Up @@ -128,6 +129,12 @@ impl HlsWriter {
start_time: Instant::now(),
video_offset: Arc::new(Mutex::new(0)),
audio_offset: Arc::new(Mutex::new(0)),
hls_url: format!(
"{}/{}/{}",
cloud_url_base.unwrap(),
prefix_path,
"manifest.m3u8"
),
};

let hls_writer_arc = Arc::new(this.clone());
Expand Down Expand Up @@ -222,7 +229,7 @@ impl HlsWriter {
dotenvy::dotenv().ok();

let account_id = env::var("STORAGE_ACCOUNT_ID").ok()?;
let bucket_name = env::var("STORAGE_BUCKET_NAME").ok()?;
let bucket_name = env::var("STORAGE_BUCKET").ok()?;
let custom_domain = env::var("STORAGE_CUSTOM_DOMAIN").ok();

let r2_config = R2Config {
Expand Down
34 changes: 16 additions & 18 deletions crates/egress-manager/src/egress/utils/aws_utils.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use aws_config::meta::region::RegionProviderChain;
use aws_config::{BehaviorVersion, SdkConfig};
use aws_credential_types::Credentials;
use aws_sdk_s3::{Client, config::Region};
use aws_sdk_s3::{
Client,
config::{Region, SharedCredentialsProvider},
};
use std::env;

pub async fn get_storage_object_client() -> Client {
pub fn get_storage_object_client() -> Client {
dotenvy::dotenv().ok();

let access_key_id = env::var("STORAGE_ACCESS_KEY_ID").expect("STORAGE_ACCESS_KEY_ID not set");
let secret_access_key =
env::var("STORAGE_SECRET_ACCESS_KEY").expect("STORAGE_SECRET_ACCESS_KEY not set");
let region = env::var("STORAGE_REGION").ok();
let endpoint_url = env::var("STORAGE_ENDPOINT_URL").ok();
let access_key_id = env::var("STORAGE_ACCESS_KEY").expect("STORAGE_ACCESS_KEY not set");
let secret_access_key = env::var("STORAGE_SECRET_KEY").expect("STORAGE_SECRET_KEY not set");
let region = env::var("STORAGE_REGION").unwrap_or_else(|_| "auto".to_string());
let endpoint_url = env::var("STORAGE_ENDPOINT").ok();

let credentials = Credentials::new(
access_key_id,
Expand All @@ -20,16 +22,12 @@ pub async fn get_storage_object_client() -> Client {
"waterbus_provider",
);

let region_provider = RegionProviderChain::first_try(region.map(Region::new))
.or_default_provider()
.or_else(Region::new("us-west-2"));

let shared_config = aws_config::from_env()
.region(region_provider)
let config = SdkConfig::builder()
.behavior_version(BehaviorVersion::latest())
.endpoint_url(endpoint_url.unwrap_or_default())
.credentials_provider(credentials)
.load()
.await;
.region(Region::new(region))
.credentials_provider(SharedCredentialsProvider::new(credentials))
.build();

Client::new(&shared_config)
Client::new(&config)
}
8 changes: 4 additions & 4 deletions crates/egress-manager/src/egress/utils/cloud_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub struct R2Storage {

impl R2Storage {
/// Create a new R2Storage instance
pub async fn new(config: R2Config) -> Result<Self> {
let client = get_storage_object_client().await;
pub fn new(config: R2Config) -> Result<Self> {
let client = get_storage_object_client();

Ok(Self {
client,
Expand All @@ -48,10 +48,10 @@ impl R2Storage {
}

/// Create a new R2Storage instance with background upload worker
pub async fn new_with_worker(
pub fn new_with_worker(
config: R2Config,
) -> Result<(Self, mpsc::UnboundedReceiver<UploadTask>)> {
let client = get_storage_object_client().await;
let client = get_storage_object_client();
let (tx, rx) = mpsc::unbounded_channel();

let storage = Self {
Expand Down
15 changes: 15 additions & 0 deletions crates/waterbus-proto/proto/sfu.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@ message JoinRoomRequest {
bool isE2eeEnabled = 7;
int32 totalTracks = 8;
int32 connectionType = 9;
int32 streamingProtocol = 10;
bool isIpv6Supported = 11;
}

message SubscribeRequest {
string clientId = 1;
string targetId = 2;
string participantId = 3;
string roomId = 4;
bool isIpv6Supported = 5;
}

message SubscribeHlsLiveStreamRequest {
string clientId = 1;
string targetId = 2;
string roomId = 3;
string participantId = 4;
}

message SetSubscriberSdpRequest {
Expand Down Expand Up @@ -92,6 +102,10 @@ message SubscribeResponse {
optional string screenTrackId = 9;
}

message SubscribeHlsLiveStreamResponse {
repeated string hlsUrls = 1;
}

message PublisherRenegotiationResponse {
string sdp = 1;
}
Expand All @@ -112,6 +126,7 @@ message StatusResponse {
service SfuService {
rpc joinRoom(JoinRoomRequest) returns (JoinRoomResponse) {}
rpc subscribe(SubscribeRequest) returns (SubscribeResponse) {}
rpc subscribeHlsLiveStream(SubscribeHlsLiveStreamRequest) returns (SubscribeHlsLiveStreamResponse) {}
rpc setSubscriberSdp(SetSubscriberSdpRequest) returns (StatusResponse) {}
rpc publisherRenegotiation(PublisherRenegotiationRequest) returns (PublisherRenegotiationResponse) {}
rpc addPublisherCandidate(AddPublisherCandidateRequest) returns (StatusResponse) {}
Expand Down
Loading
Loading