-
Notifications
You must be signed in to change notification settings - Fork 88
Guard concurrent sends with exclusive DB lock and URI/RK checks #1376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,11 +15,16 @@ pub(crate) fn now() -> i64 { | |
|
|
||
| pub(crate) const DB_PATH: &str = "payjoin.sqlite"; | ||
|
|
||
| #[derive(Debug)] | ||
| pub(crate) struct Database(Pool<SqliteConnectionManager>); | ||
|
|
||
| impl Database { | ||
| pub(crate) fn create(path: impl AsRef<Path>) -> Result<Self> { | ||
| let manager = SqliteConnectionManager::file(path.as_ref()); | ||
| // locking_mode is a per-connection PRAGMA, so it must be set via | ||
| // with_init to apply to every connection the pool creates, not only | ||
| // the first one used during init_schema. | ||
| let manager = SqliteConnectionManager::file(path.as_ref()) | ||
| .with_init(|conn| conn.execute_batch("PRAGMA locking_mode = EXCLUSIVE;")); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't it more appropriate to put this in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for the clarification, i thought it was a schema level pragma |
||
| let pool = Pool::new(manager)?; | ||
|
|
||
| // Initialize database schema | ||
|
|
@@ -36,6 +41,7 @@ impl Database { | |
| conn.execute( | ||
| "CREATE TABLE IF NOT EXISTS send_sessions ( | ||
| session_id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| pj_uri TEXT NOT NULL, | ||
| receiver_pubkey BLOB NOT NULL, | ||
| completed_at INTEGER | ||
| )", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,20 +20,40 @@ impl std::fmt::Display for SessionId { | |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) } | ||
| } | ||
|
|
||
| #[derive(Clone)] | ||
| #[derive(Clone, Debug)] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the debug still necessary?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. still needed for test, when removed test error |
||
| pub(crate) struct SenderPersister { | ||
| db: Arc<Database>, | ||
| session_id: SessionId, | ||
| } | ||
|
|
||
| impl SenderPersister { | ||
| pub fn new(db: Arc<Database>, receiver_pubkey: HpkePublicKey) -> crate::db::Result<Self> { | ||
| pub fn new( | ||
| db: Arc<Database>, | ||
| pj_uri: &str, | ||
| receiver_pubkey: &HpkePublicKey, | ||
| ) -> crate::db::Result<Self> { | ||
| let conn = db.get_connection()?; | ||
| let receiver_pubkey_bytes = receiver_pubkey.to_compressed_bytes(); | ||
|
|
||
| let (duplicate_uri, duplicate_rk): (bool, bool) = conn.query_row( | ||
| "SELECT \ | ||
| EXISTS(SELECT 1 FROM send_sessions WHERE pj_uri = ?1), \ | ||
| EXISTS(SELECT 1 FROM send_sessions WHERE receiver_pubkey = ?2)", | ||
| params![pj_uri, &receiver_pubkey_bytes], | ||
| |row| Ok((row.get(0)?, row.get(1)?)), | ||
| )?; | ||
|
|
||
| if duplicate_uri { | ||
| return Err(Error::DuplicateSendSession(DuplicateKind::Uri)); | ||
| } | ||
| if duplicate_rk { | ||
| return Err(Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey)); | ||
| } | ||
zealsham marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Create a new session in send_sessions and get its ID | ||
| let session_id: i64 = conn.query_row( | ||
| "INSERT INTO send_sessions (session_id, receiver_pubkey) VALUES (NULL, ?1) RETURNING session_id", | ||
| params![receiver_pubkey.to_compressed_bytes()], | ||
| "INSERT INTO send_sessions (pj_uri, receiver_pubkey) VALUES (?1, ?2) RETURNING session_id", | ||
| params![pj_uri, &receiver_pubkey_bytes], | ||
| |row| row.get(0), | ||
| )?; | ||
|
|
||
|
|
@@ -42,7 +62,6 @@ impl SenderPersister { | |
|
|
||
| pub fn from_id(db: Arc<Database>, id: SessionId) -> Self { Self { db, session_id: id } } | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think this needs rustfmt
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems according to rustfmt rules this is the right way to write it |
||
| impl SessionPersister for SenderPersister { | ||
| type SessionEvent = SenderSessionEvent; | ||
| type InternalStorageError = crate::db::error::Error; | ||
|
|
@@ -268,3 +287,83 @@ impl Database { | |
| Ok(session_ids) | ||
| } | ||
| } | ||
|
|
||
| #[cfg(all(test, feature = "v2"))] | ||
| mod tests { | ||
| use std::sync::Arc; | ||
|
|
||
| use payjoin::HpkeKeyPair; | ||
|
|
||
| use super::*; | ||
|
|
||
| fn create_test_db() -> Arc<Database> { | ||
| // Use an in-memory database for tests | ||
| let manager = r2d2_sqlite::SqliteConnectionManager::memory() | ||
| .with_init(|conn| conn.execute_batch("PRAGMA locking_mode = EXCLUSIVE;")); | ||
| let pool = r2d2::Pool::new(manager).expect("pool creation should succeed"); | ||
| let conn = pool.get().expect("connection should succeed"); | ||
| Database::init_schema(&conn).expect("schema init should succeed"); | ||
| Arc::new(Database(pool)) | ||
| } | ||
|
|
||
| fn make_receiver_pubkey() -> payjoin::HpkePublicKey { HpkeKeyPair::gen_keypair().1 } | ||
|
|
||
| // Second call with the same URI (same active session) should return DuplicateSendSession(Uri). | ||
| #[test] | ||
| fn test_duplicate_uri_returns_error() { | ||
| let db = create_test_db(); | ||
| let rk1 = make_receiver_pubkey(); | ||
| let rk2 = make_receiver_pubkey(); | ||
| let uri = "bitcoin:addr1?pj=https://example.com/BBBBBBBB"; | ||
|
|
||
| SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed"); | ||
|
|
||
| let err = SenderPersister::new(db, uri, &rk2).expect_err("duplicate URI should fail"); | ||
| assert!( | ||
| matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)), | ||
| "expected DuplicateSendSession(Uri), got: {err:?}" | ||
| ); | ||
| } | ||
|
|
||
| // Same receiver pubkey under a different URI should return DuplicateSendSession(ReceiverPubkey). | ||
| #[test] | ||
| fn test_duplicate_rk_returns_error() { | ||
| let db = create_test_db(); | ||
| let rk = make_receiver_pubkey(); | ||
| let uri1 = "bitcoin:addr1?pj=https://example.com/CCCCCCCC"; | ||
| let uri2 = "bitcoin:addr1?pj=https://example.com/DDDDDDDD"; | ||
|
|
||
| SenderPersister::new(db.clone(), uri1, &rk).expect("first session should succeed"); | ||
|
|
||
| let err = SenderPersister::new(db, uri2, &rk).expect_err("duplicate RK should fail"); | ||
| assert!( | ||
| matches!(err, Error::DuplicateSendSession(DuplicateKind::ReceiverPubkey)), | ||
| "expected DuplicateSendSession(ReceiverPubkey), got: {err:?}" | ||
| ); | ||
| } | ||
|
|
||
| // After a session is marked completed, a new session with the same URI must still be rejected | ||
| // to prevent address reuse, HPKE receiver-key reuse | ||
| #[test] | ||
| fn test_completed_session_blocks_reuse() { | ||
| let db = create_test_db(); | ||
| let rk1 = make_receiver_pubkey(); | ||
| let rk2 = make_receiver_pubkey(); | ||
| let uri = "bitcoin:addr1?pj=https://example.com/EEEEEEEE"; | ||
|
|
||
| let persister = | ||
| SenderPersister::new(db.clone(), uri, &rk1).expect("first session should succeed"); | ||
|
|
||
| // Mark the session as completed | ||
| use payjoin::persist::SessionPersister; | ||
| persister.close().expect("close should succeed"); | ||
|
|
||
| // A new session with the same URI must be rejected even after completion | ||
| let err = SenderPersister::new(db, uri, &rk2) | ||
| .expect_err("reuse of a completed session URI must be rejected"); | ||
| assert!( | ||
| matches!(err, Error::DuplicateSendSession(DuplicateKind::Uri)), | ||
| "expected DuplicateSendSession(Uri), got: {err:?}" | ||
| ); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case it doesn't matter but as i'm lookinmg at this change for the 3rd time and remembering it's just a typo fix that's unrelated, it'd be helpful if in the future this kind of change was in a separate commit