Skip to content

Commit 3556654

Browse files
committed
change tcp to http
1 parent 8950c80 commit 3556654

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1189
-5058
lines changed
File renamed without changes.

router-api/src/api/settings/auto_config.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
//! It allows for bulk operations through a single API call, making it easier to set up and manage
55
//! gateway configurations.
66
7+
use std::sync::{Arc, Mutex};
8+
79
use actix_web::{post, get, web, HttpResponse, Responder, HttpRequest};
810
use serde::{Deserialize, Serialize};
911
use uuid::Uuid;
10-
use crate::api::users::helper::{ClaimsFromRequest, is_staff_or_admin};
12+
use crate::{api::users::helper::{is_staff_or_admin, ClaimsFromRequest}, module::httpc::HttpC};
1113
use super::{
1214
Proxy, ProxyDomain, GatewayNode, Gateway,
1315
proxy_queries, proxydomain_queries, gwnode_queries, gateway_queries
@@ -112,7 +114,9 @@ pub struct YamlConfig {
112114
pub async fn upload_config(
113115
req: HttpRequest,
114116
body: web::Bytes,
117+
client: web::Data<Arc<Mutex<HttpC>>>
115118
) -> impl Responder {
119+
let client = client.as_ref();
116120
// Extract authenticated user's claims
117121
let claims = match req.get_claims() {
118122
Some(claims) => claims,
@@ -308,19 +312,19 @@ pub async fn upload_config(
308312
}
309313

310314
// Add sync calls after successful configuration
311-
match sync::proxy_node_tcp::sync_proxy_nodes_to_registry().await {
315+
match sync::gateway_node_tcp::sync_gateway_paths_to_registry(client).await {
316+
Ok(_) => log::info!("Successfully synced gateway paths to registry"),
317+
Err(e) => log::warn!("Failed to sync gateway paths to registry: {:?}. Continuing anyway.", e),
318+
}
319+
320+
match sync::proxy_node_tcp::sync_proxy_nodes_to_registry(client).await {
312321
Ok(_) => log::info!("Successfully synced proxy nodes to registry"),
313-
Err(e) => log::warn!("Failed to sync proxy nodes to registry: {}. Continuing anyway.", e),
322+
Err(e) => log::warn!("Failed to sync proxy nodes to registry: {:?}. Continuing anyway.", e),
314323
}
315324

316-
match sync::gateway_node_tcp::sync_gateway_nodes_to_registry().await {
325+
match sync::gateway_node_tcp::sync_gateway_nodes_to_registry(client).await {
317326
Ok(_) => log::info!("Successfully synced gateway nodes to registry"),
318-
Err(e) => log::warn!("Failed to sync gateway nodes to registry: {}. Continuing anyway.", e),
319-
}
320-
321-
match sync::gateway_node_tcp::sync_gateway_paths_to_registry().await {
322-
Ok(_) => log::info!("Successfully synced gateway paths to registry"),
323-
Err(e) => log::warn!("Failed to sync gateway paths to registry: {}. Continuing anyway.", e),
327+
Err(e) => log::warn!("Failed to sync gateway nodes to registry: {:?}. Continuing anyway.", e),
324328
}
325329

326330
HttpResponse::Ok().json(serde_json::json!({

router-api/src/api/sync/gateway_node.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use actix_web::{post, HttpResponse};
1+
use std::sync::{Arc, Mutex};
2+
use actix_web::{post, web, HttpResponse};
23
use serde::{Deserialize, Serialize};
34

4-
use crate::api::sync::gateway_node_tcp::{sync_gateway_nodes_to_registry, sync_gateway_paths_to_registry};
5+
use crate::{api::sync::gateway_node_tcp::{sync_gateway_nodes_to_registry, sync_gateway_paths_to_registry}, module::httpc::HttpC};
56

67
#[derive(Debug, Clone, Deserialize, Serialize)]
78
pub struct GatewayNode {
@@ -22,25 +23,25 @@ pub struct GatewayNode {
2223
}
2324

2425
#[post("/gateway")]
25-
pub async fn gateway() -> HttpResponse {
26+
pub async fn gateway(client: web::Data<Arc<Mutex<HttpC>>>) -> HttpResponse {
2627

27-
let result = match sync_gateway_nodes_to_registry().await {
28+
let result = match sync_gateway_nodes_to_registry(client.as_ref()).await {
2829
Ok(data) => {
2930
log::info!("Successfully synced gateway nodes to registry");
30-
let path_result = sync_gateway_paths_to_registry().await;
31+
let path_result = sync_gateway_paths_to_registry(client.as_ref()).await;
3132
match path_result {
3233
Ok(_paths) => {
3334
log::info!("Successfully synced gateway paths to registry");
3435
data
3536
}
3637
Err(e) => {
37-
log::error!("Failed to sync gateway paths: {}", e);
38+
log::error!("Failed to sync gateway paths: {:?}", e);
3839
return HttpResponse::BadRequest().body("Failed to sync gateway paths");
3940
}
4041
}
4142
}
4243
Err(e) => {
43-
log::error!("Failed to sync gateway nodes: {}", e);
44+
log::error!("Failed to sync gateway nodes: {:?}", e);
4445
return HttpResponse::BadRequest().body("Failed to sync gateway nodes");
4546
}
4647
};
Lines changed: 63 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,24 @@
11

2+
use std::sync::{Arc, Mutex};
3+
24
use super::gateway_node_queries;
35
use crate::{
4-
api::sync::TCPDefaultResponse,
5-
client::{Client, ClientError, Result as TCPResult},
6-
config,
6+
api::sync::HTTPCResponse,
7+
config, module::httpc::HttpC,
78
};
89
use log::{error, info, warn};
910

10-
/// Syncs all gateway nodes to the registry server
11-
///
12-
/// This function retrieves all gateway nodes by joining data from multiple tables,
13-
/// then sends the collected data to the gateway registry server via TCP.
14-
///
15-
/// # Returns
16-
///
17-
/// * `Ok(())` - If the data was successfully sent to the registry server
18-
/// * `Err(ClientError)` - If there was an error retrieving or sending the data
19-
///
20-
/// # Errors
21-
///
22-
/// This function will return an error if:
23-
/// - Database queries fail
24-
/// - Connection to the registry server cannot be established
25-
/// - Data transmission fails
26-
pub async fn sync_gateway_nodes_to_registry() -> TCPResult<TCPDefaultResponse> {
11+
pub async fn sync_gateway_nodes_to_registry(client: &Arc<Mutex<HttpC>>) -> Result<HTTPCResponse, HTTPCResponse> {
2712
log::info!("Syncing gateway nodes to registry...");
2813

2914
let gateway_nodes = match gateway_node_queries::get_all_gateway_nodes() {
3015
Ok(nodes) => nodes,
3116
Err(e) => {
3217
error!("Failed to retrieve gateway nodes from database: {}", e);
33-
return Err(ClientError::ProtocolError(format!("Database error: {}", e)));
18+
return Err(HTTPCResponse{
19+
status: "error".to_string(),
20+
message: format!("Database error: {}", e),
21+
});
3422
}
3523
};
3624

@@ -42,54 +30,46 @@ pub async fn sync_gateway_nodes_to_registry() -> TCPResult<TCPDefaultResponse> {
4230

4331
// Create the payload with the nodes
4432
let payload = gateway_nodes.clone();
45-
46-
// Create a new client instance
47-
let mut client = Client::new();
48-
49-
let server_address = config::Api::TCPAddress.get_str();
50-
51-
// Connect to the server without timeout
52-
match client.connect(server_address).await {
53-
Ok(_) => info!("Connected to registry server at {}", server_address),
33+
let payload_str = match serde_json::to_string(&payload) {
34+
Ok(json) => json,
5435
Err(e) => {
55-
error!("Failed to connect to registry server: {}", e);
56-
return Err(e);
36+
error!("Failed to serialize proxy nodes to JSON: {}", e);
37+
return Err(HTTPCResponse{
38+
status: "error".to_string(),
39+
message: format!("Serialization error: {}", e),
40+
});
5741
}
58-
}
59-
60-
// Create a new client with the service set using builder pattern
61-
let mut client = client.service("registry");
42+
};
6243

63-
// Send the payload to the "gateway" endpoint without timeout
64-
match client.action::<_, TCPDefaultResponse>("gwnode", &payload).await {
65-
Ok(data) => {
66-
info!(
67-
"Successfully sent {} gateway nodes to registry",
68-
gateway_nodes.len()
69-
);
70-
// Close the current client
71-
if let Err(e) = client.close().await {
72-
warn!("Error closing client connection: {}", e);
73-
}
74-
Ok(data)
75-
}
76-
Err(e) => {
77-
error!("Failed to send gateway nodes to registry: {}", e);
78-
if let Err(close_err) = client.close().await {
79-
warn!("Error closing client connection: {}", close_err);
80-
}
81-
Err(e)
44+
let _ = match client.lock() {
45+
Ok(client)=>{
46+
let _ = client.post_text("/gateway/node", &payload_str);
47+
info!("Successfully sent proxy nodes to registry");
48+
},
49+
Err(e)=>{
50+
error!("Failed to lock HTTP client: {}", e);
51+
return Err(HTTPCResponse{
52+
status: "error".to_string(),
53+
message: format!("Client lock error: {}", e),
54+
});
8255
}
83-
}
56+
};
57+
Ok(HTTPCResponse {
58+
status: "success".to_string(),
59+
message: format!("Successfully synced gateway nodes"),
60+
})
8461
}
8562

86-
pub async fn sync_gateway_paths_to_registry() -> TCPResult<TCPDefaultResponse> {
63+
pub async fn sync_gateway_paths_to_registry(client: &Arc<Mutex<HttpC>>) -> Result<HTTPCResponse, HTTPCResponse> {
8764
// Get the gateway nodes from the database using our JOIN query
8865
let gateway_path = match gateway_node_queries::get_all_gateway_paths() {
8966
Ok(nodes) => nodes,
9067
Err(e) => {
9168
error!("Failed to retrieve gateway paths from database: {}", e);
92-
return Err(ClientError::ProtocolError(format!("Database error: {}", e)));
69+
return Err(HTTPCResponse{
70+
status: "error".to_string(),
71+
message: format!("Database error: {}", e),
72+
});
9373
}
9474
};
9575

@@ -101,40 +81,33 @@ pub async fn sync_gateway_paths_to_registry() -> TCPResult<TCPDefaultResponse> {
10181

10282
// Create the payload with the nodes for the second action
10383
let payload = gateway_path.clone();
104-
105-
// Create a new client for the second action without timeout
106-
let mut new_client = Client::new();
107-
let server_address = config::Api::TCPAddress.get_str();
108-
109-
match new_client.connect(server_address).await {
110-
Ok(_) => info!("Connected to registry server at {}", server_address),
84+
let payload_str = match serde_json::to_string(&payload) {
85+
Ok(json) => json,
11186
Err(e) => {
112-
error!("Failed to connect to registry server for second action: {}", e);
113-
return Err(e);
87+
error!("Failed to serialize proxy nodes to JSON: {}", e);
88+
return Err(HTTPCResponse{
89+
status: "error".to_string(),
90+
message: format!("Serialization error: {}", e),
91+
});
11492
}
115-
}
116-
117-
// Set service for new client
118-
let mut new_client = new_client.service("registry");
93+
};
11994

120-
// Send the second payload without timeout
121-
match new_client.action::<_, TCPDefaultResponse>("gateway", &payload).await {
122-
Ok(data) => {
123-
info!(
124-
"Successfully sent {} gateway paths to registry",
125-
gateway_path.len()
126-
);
127-
if let Err(e) = new_client.close().await {
128-
warn!("Error closing client connection: {}", e);
129-
}
130-
Ok(data)
95+
let _ = match client.lock() {
96+
Ok(client)=>{
97+
let _ = client.post_text("/gateway/path", &payload_str);
98+
info!("Successfully sent proxy nodes to registry");
99+
},
100+
Err(e)=>{
101+
error!("Failed to lock HTTP client: {}", e);
102+
return Err(HTTPCResponse{
103+
status: "error".to_string(),
104+
message: format!("Client lock error: {}", e),
105+
});
131106
}
132-
Err(e) => {
133-
error!("Failed to send gateway paths to registry: {}", e);
134-
if let Err(close_err) = new_client.close().await {
135-
warn!("Error closing client connection: {}", close_err);
136-
}
137-
Err(e)
138-
}
139-
}
107+
};
108+
109+
Ok(HTTPCResponse {
110+
status: "success".to_string(),
111+
message: format!("Successfully synced gateway paths"),
112+
})
140113
}

router-api/src/api/sync/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use serde::{Deserialize, Serialize};
5151
use super::users::{JwtAuth, RoleAuth};
5252

5353
#[derive(Debug, Serialize, Deserialize)]
54-
pub struct TCPDefaultResponse {
54+
pub struct HTTPCResponse {
5555
pub status: String,
5656
pub message: String,
5757
}

router-api/src/api/sync/proxy_node.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55
//! Proxy Nodes are serialized and sent to the registry service to sync
66
//! configuration across the distributed system.
77
8-
use actix_web::{post, HttpResponse};
8+
use std::sync::{Arc, Mutex};
9+
10+
use actix_web::{post, web, HttpResponse};
911
use serde::{Deserialize, Serialize};
1012

13+
use crate::module::httpc::HttpC;
14+
1115
use super::proxy_node_tcp::sync_proxy_nodes_to_registry;
1216

1317
/// Represents a proxy node configuration
@@ -71,14 +75,13 @@ pub struct ProxyDomain {
7175
}
7276

7377
#[post("/proxy")]
74-
pub async fn gateway() -> HttpResponse {
75-
76-
let result = sync_proxy_nodes_to_registry().await;
78+
pub async fn gateway(client: web::Data<Arc<Mutex<HttpC>>>) -> HttpResponse {
79+
let result = sync_proxy_nodes_to_registry(client.as_ref()).await;
7780

7881
match result {
7982
Ok(data)=> HttpResponse::Ok().json(data),
8083
Err(e) => {
81-
log::error!("Failed to sync gateway nodes: {}", e);
84+
log::error!("Failed to sync gateway nodes: {:?}", e);
8285
HttpResponse::BadRequest().body("Failed to sync gateway nodes")
8386
}
8487
}

0 commit comments

Comments
 (0)