Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions rust/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- "5432:5432"
networks:
- app-network

volumes:
postgres-data:

networks:
app-network:
driver: bridge
71 changes: 71 additions & 0 deletions rust/impls/src/postgres_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,10 +660,15 @@ where
}

#[cfg(test)]

mod tests {
use super::{drop_database, DUMMY_MIGRATION, MIGRATIONS};
use crate::postgres_store::PostgresPlaintextBackend;
use api::define_kv_store_tests;
use api::kv_store::KvStore;
use api::types::{DeleteObjectRequest, GetObjectRequest, KeyValue, PutObjectRequest};

use bytes::Bytes;
use tokio::sync::OnceCell;
use tokio_postgres::NoTls;

Expand Down Expand Up @@ -779,4 +784,70 @@ mod tests {

drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap();
}

#[tokio::test]
async fn supports_objects_up_to_non_large_object_threshold() {
let vss_db = "supports_objects_up_to_non_large_object_threshold";
let _ = drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await;

const MAXIMUM_SUPPORTED_VALUE_SIZE: usize = 1024 * 1024 * 1024;
const PROTOCOL_OVERHEAD_MARGIN: usize = 150;

// Construct entry that's for a field that's the maximum size of a non-"large_object" object
let large_value = vec![0u8; MAXIMUM_SUPPORTED_VALUE_SIZE - PROTOCOL_OVERHEAD_MARGIN];
let kv = KeyValue { key: "k1".into(), version: 0, value: Bytes::from(large_value) };

{
let store =
PostgresPlaintextBackend::new(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db).await.unwrap();
let (start, end) = store.migrate_vss_database(MIGRATIONS).await.unwrap();
assert_eq!(start, MIGRATIONS_START);
assert_eq!(end, MIGRATIONS_END);
assert_eq!(store.get_upgrades_list().await, [MIGRATIONS_START]);
assert_eq!(store.get_schema_version().await, MIGRATIONS_END);

// Round trip with non-large_object of threshold size

store
.put(
"token".to_string(),
PutObjectRequest {
store_id: "store_id".to_string(),
global_version: None,
transaction_items: vec![kv],
delete_items: vec![],
},
)
.await
.unwrap();

let resp_kv = store
.get(
"token".to_string(),
GetObjectRequest { store_id: "store_id".to_string(), key: "k1".to_string() },
)
.await
.unwrap()
.value
.unwrap();
assert_eq!(
resp_kv.value.len(),
MAXIMUM_SUPPORTED_VALUE_SIZE - PROTOCOL_OVERHEAD_MARGIN
);
assert!(resp_kv.value.iter().all(|&b| b == 0));

store
.delete(
"token".to_string(),
DeleteObjectRequest {
store_id: "store_id".to_string(),
key_value: Some(resp_kv),
},
)
.await
.unwrap();
};

drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap();
}
}
14 changes: 12 additions & 2 deletions rust/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use auth_impls::jwt::JWTAuthorizer;
#[cfg(feature = "sigs")]
use auth_impls::signature::SignatureValidatingAuthorizer;
use impls::postgres_store::{PostgresPlaintextBackend, PostgresTlsBackend};
use vss_service::VssService;
use vss_service::{VssService, VssServiceConfig};

mod util;
mod vss_service;
Expand All @@ -37,6 +37,16 @@ fn main() {
eprintln!("Failed to load configuration: {}", e);
std::process::exit(-1);
});
let vss_service_config = match &config.maximum_request_body_size {
Some(size) => match VssServiceConfig::new(*size) {
Ok(config) => Arc::new(config),
Err(e) => {
eprintln!("Configuration validation error: {}", e);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing, let's do an std::process:exit(-1) here like we do elsewhere

},
},
None => Arc::new(VssServiceConfig::default()),
};

let runtime = match tokio::runtime::Builder::new_multi_thread().enable_all().build() {
Ok(runtime) => Arc::new(runtime),
Expand Down Expand Up @@ -132,7 +142,7 @@ fn main() {
match res {
Ok((stream, _)) => {
let io_stream = TokioIo::new(stream);
let vss_service = VssService::new(Arc::clone(&store), Arc::clone(&authorizer));
let vss_service = VssService::new(Arc::clone(&store), Arc::clone(&authorizer), Arc::clone(&vss_service_config));
runtime.spawn(async move {
if let Err(err) = http1::Builder::new().serve_connection(io_stream, vss_service).await {
eprintln!("Failed to serve connection: {}", err);
Expand Down
34 changes: 32 additions & 2 deletions rust/server/src/util/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use serde::Deserialize;
use std::net::SocketAddr;

const BIND_ADDR_VAR: &str = "VSS_BIND_ADDRESS";
const MAX_REQUEST_BODY_SIZE: &str = "VSS_MAX_REQUEST_BODY_SIZE";
const JWT_RSA_PEM_VAR: &str = "VSS_JWT_RSA_PEM";
const PSQL_USER_VAR: &str = "VSS_PSQL_USERNAME";
const PSQL_PASS_VAR: &str = "VSS_PSQL_PASSWORD";
Expand All @@ -23,6 +24,7 @@ struct TomlConfig {
#[derive(Deserialize)]
struct ServerConfig {
bind_address: Option<SocketAddr>,
maximum_request_body_size: Option<usize>,
}

#[derive(Deserialize)]
Expand All @@ -48,6 +50,7 @@ struct TlsConfig {
// Encapsulates the result of reading both the environment variables and the config file.
pub(crate) struct Configuration {
pub(crate) bind_address: SocketAddr,
pub(crate) maximum_request_body_size: Option<usize>,
pub(crate) rsa_pem: Option<String>,
pub(crate) postgresql_prefix: String,
pub(crate) default_db: String,
Expand Down Expand Up @@ -85,6 +88,11 @@ pub(crate) fn load_configuration(config_file_path: Option<&str>) -> Result<Confi
None => TomlConfig::default(), // All fields are set to `None`
};

let (bind_address_config, max_request_body_size_config) = match server_config {
Some(c) => (c.bind_address, c.maximum_request_body_size),
None => (None, None),
};

let bind_address_env = read_env(BIND_ADDR_VAR)?
.map(|addr| {
addr.parse().map_err(|e| {
Expand All @@ -94,11 +102,25 @@ pub(crate) fn load_configuration(config_file_path: Option<&str>) -> Result<Confi
.transpose()?;
let bind_address = read_config(
bind_address_env,
server_config.and_then(|c| c.bind_address),
bind_address_config,
"VSS server bind address",
BIND_ADDR_VAR,
)?;

let maximum_request_body_size_env = read_env(MAX_REQUEST_BODY_SIZE)?
.map(|mrbs| {
mrbs.parse::<usize>().map_err(|e| {
format!("Unable to parse the maximum request body size environment variable: {}", e)
})
})
.transpose()?;
let maximum_request_body_size = read_config(
maximum_request_body_size_env,
max_request_body_size_config,
"VSS server maximum request body size",
MAX_REQUEST_BODY_SIZE,
)?;
Comment on lines +117 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not make this parameter mandatory. In case it's not set, let's set it to None in the Configuration struct.


let rsa_pem_env = read_env(JWT_RSA_PEM_VAR)?;
let rsa_pem = rsa_pem_env.or(jwt_auth_config.and_then(|config| config.rsa_pem));

Expand Down Expand Up @@ -155,5 +177,13 @@ pub(crate) fn load_configuration(config_file_path: Option<&str>) -> Result<Confi

let postgresql_prefix = format!("postgresql://{}:{}@{}", username, password, address);

Ok(Configuration { bind_address, rsa_pem, postgresql_prefix, default_db, vss_db, tls_config })
Ok(Configuration {
bind_address,
maximum_request_body_size: Some(maximum_request_body_size),
rsa_pem,
postgresql_prefix,
default_db,
vss_db,
tls_config,
})
}
87 changes: 77 additions & 10 deletions rust/server/src/vss_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use http_body_util::{BodyExt, Full};
use http_body_util::{BodyExt, Full, Limited};
use hyper::body::{Bytes, Incoming};
use hyper::service::Service;
use hyper::{Request, Response, StatusCode};
Expand All @@ -18,15 +18,44 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

const MAXIMUM_REQUEST_BODY_SIZE: usize = 1024 * 1024 * 1024;

#[derive(Clone)]
pub(crate) struct VssServiceConfig {
maximum_request_body_size: usize,
}

impl VssServiceConfig {
pub fn new(maximum_request_body_size: usize) -> Result<Self, String> {
if maximum_request_body_size > MAXIMUM_REQUEST_BODY_SIZE {
return Err(format!(
"Request body size {} exceeds maximum {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Maximum request body size {} exceeds maximum {}"

maximum_request_body_size, MAXIMUM_REQUEST_BODY_SIZE
));
}

Ok(Self { maximum_request_body_size })
}
}

impl Default for VssServiceConfig {
fn default() -> Self {
Self { maximum_request_body_size: MAXIMUM_REQUEST_BODY_SIZE }
}
}

#[derive(Clone)]
pub struct VssService {
store: Arc<dyn KvStore>,
authorizer: Arc<dyn Authorizer>,
config: Arc<VssServiceConfig>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer we remove the Arc<> here, and do #[derive(Clone, Copy)] on VssServiceConfig

}

impl VssService {
pub(crate) fn new(store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>) -> Self {
Self { store, authorizer }
pub(crate) fn new(
store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, config: Arc<VssServiceConfig>,
) -> Self {
Self { store, authorizer, config }
}
}

Expand All @@ -41,22 +70,51 @@ impl Service<Request<Incoming>> for VssService {
let store = Arc::clone(&self.store);
let authorizer = Arc::clone(&self.authorizer);
let path = req.uri().path().to_owned();
let maximum_request_body_size = self.config.maximum_request_body_size;

Box::pin(async move {
let prefix_stripped_path = path.strip_prefix(BASE_PATH_PREFIX).unwrap_or_default();

match prefix_stripped_path {
"/getObject" => {
handle_request(store, authorizer, req, handle_get_object_request).await
handle_request(
store,
authorizer,
req,
maximum_request_body_size,
handle_get_object_request,
)
.await
},
"/putObjects" => {
handle_request(store, authorizer, req, handle_put_object_request).await
handle_request(
store,
authorizer,
req,
maximum_request_body_size,
handle_put_object_request,
)
.await
},
"/deleteObject" => {
handle_request(store, authorizer, req, handle_delete_object_request).await
handle_request(
store,
authorizer,
req,
maximum_request_body_size,
handle_delete_object_request,
)
.await
},
"/listKeyVersions" => {
handle_request(store, authorizer, req, handle_list_object_request).await
handle_request(
store,
authorizer,
req,
maximum_request_body_size,
handle_list_object_request,
)
.await
},
_ => {
let error_msg = "Invalid request path.".as_bytes();
Expand Down Expand Up @@ -97,7 +155,7 @@ async fn handle_request<
Fut: Future<Output = Result<R, VssError>> + Send,
>(
store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, request: Request<Incoming>,
handler: F,
maximum_request_body_size: usize, handler: F,
) -> Result<<VssService as Service<Request<Incoming>>>::Response, hyper::Error> {
let (parts, body) = request.into_parts();
let headers_map = parts
Expand All @@ -110,8 +168,17 @@ async fn handle_request<
Ok(auth_response) => auth_response.user_token,
Err(e) => return Ok(build_error_response(e)),
};
// TODO: we should bound the amount of data we read to avoid allocating too much memory.
let bytes = body.collect().await?.to_bytes();

let limited_body = Limited::new(body, maximum_request_body_size);
let bytes = match limited_body.collect().await {
Ok(body) => body.to_bytes(),
Err(_) => {
return Ok(Response::builder()
.status(StatusCode::PAYLOAD_TOO_LARGE)
.body(Full::new(Bytes::from("Request body too large")))
.unwrap());
},
};
match T::decode(bytes) {
Ok(request) => match handler(store.clone(), user_token, request).await {
Ok(response) => Ok(Response::builder()
Expand Down
1 change: 1 addition & 0 deletions rust/server/vss-server-config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[server_config]
bind_address = "127.0.0.1:8080" # Optional in TOML, can be overridden by env var `VSS_BIND_ADDRESS`
maximum_request_body_size = 1073741824 # Optional in TOML: maximum request body size in bytes capped at 1 GB, can be overriden by env var 'VSS_MAX_REQUEST_BODY_SIZE'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this commented out to highlight to people that this parameter is optional; we'll default to 1GiB in case it is not set anywhere.


# Uncomment the table below to verify JWT tokens in the HTTP Authorization header against the given RSA public key,
# can be overridden by env var `VSS_JWT_RSA_PEM`
Expand Down