diff --git a/damus.xcodeproj/project.pbxproj b/damus.xcodeproj/project.pbxproj index 5a6148285..73fbaeb48 100644 --- a/damus.xcodeproj/project.pbxproj +++ b/damus.xcodeproj/project.pbxproj @@ -1668,6 +1668,10 @@ D74EC8522E1856B70091DC51 /* NonCopyableLinkedList.swift in Sources */ = {isa = PBXBuildFile; fileRef = D74EC84E2E1856AF0091DC51 /* NonCopyableLinkedList.swift */; }; D74F430A2B23F0BE00425B75 /* DamusPurple.swift in Sources */ = {isa = PBXBuildFile; fileRef = D74F43092B23F0BE00425B75 /* DamusPurple.swift */; }; D74F430C2B23FB9B00425B75 /* StoreObserver.swift in Sources */ = {isa = PBXBuildFile; fileRef = D74F430B2B23FB9B00425B75 /* StoreObserver.swift */; }; + D75154BF2EC5910A00BF2CB2 /* NdbUseLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = D75154BE2EC5910600BF2CB2 /* NdbUseLock.swift */; }; + D75154C02EC5910A00BF2CB2 /* NdbUseLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = D75154BE2EC5910600BF2CB2 /* NdbUseLock.swift */; }; + D75154C12EC5910A00BF2CB2 /* NdbUseLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = D75154BE2EC5910600BF2CB2 /* NdbUseLock.swift */; }; + D75154C22EC5910A00BF2CB2 /* NdbUseLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = D75154BE2EC5910600BF2CB2 /* NdbUseLock.swift */; }; D753CEAA2BE9DE04001C3A5D /* MutingTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D753CEA92BE9DE04001C3A5D /* MutingTests.swift */; }; D755B28D2D3E7D8800BBEEFA /* NIP37Draft.swift in Sources */ = {isa = PBXBuildFile; fileRef = D755B28C2D3E7D7D00BBEEFA /* NIP37Draft.swift */; }; D755B28E2D3E7D8800BBEEFA /* NIP37Draft.swift in Sources */ = {isa = PBXBuildFile; fileRef = D755B28C2D3E7D7D00BBEEFA /* NIP37Draft.swift */; }; @@ -1709,6 +1713,7 @@ D78F08182D7F7F7500FC6C75 /* NIP04.swift in Sources */ = {isa = PBXBuildFile; fileRef = D78F08162D7F7F6C00FC6C75 /* NIP04.swift */; }; D78F08192D7F7F7500FC6C75 /* NIP04.swift in Sources */ = {isa = PBXBuildFile; fileRef = D78F08162D7F7F6C00FC6C75 /* NIP04.swift */; }; D78F081A2D7F803100FC6C75 /* NIP04.swift in Sources */ = {isa = PBXBuildFile; fileRef = D78F08162D7F7F6C00FC6C75 /* NIP04.swift */; }; + D795356B2EBD28A800AACF98 /* AppLifecycleHandlingTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D795356A2EBD289D00AACF98 /* AppLifecycleHandlingTests.swift */; }; D798D21A2B0856CC00234419 /* Mentions.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C7FF7D42823313F009601DB /* Mentions.swift */; }; D798D21B2B0856F200234419 /* NdbTagsIterator.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4CDD1AE12A6B3074001CD4DF /* NdbTagsIterator.swift */; }; D798D21C2B0857E400234419 /* Bech32Object.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4CF0ABEF29857E9200D66079 /* Bech32Object.swift */; }; @@ -2766,6 +2771,7 @@ D74EC84E2E1856AF0091DC51 /* NonCopyableLinkedList.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NonCopyableLinkedList.swift; sourceTree = ""; }; D74F43092B23F0BE00425B75 /* DamusPurple.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DamusPurple.swift; sourceTree = ""; }; D74F430B2B23FB9B00425B75 /* StoreObserver.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = StoreObserver.swift; sourceTree = ""; }; + D75154BE2EC5910600BF2CB2 /* NdbUseLock.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NdbUseLock.swift; sourceTree = ""; }; D753CEA92BE9DE04001C3A5D /* MutingTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MutingTests.swift; sourceTree = ""; }; D755B28C2D3E7D7D00BBEEFA /* NIP37Draft.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NIP37Draft.swift; sourceTree = ""; }; D76556D52B1E6C08001B0CCC /* DamusPurpleWelcomeView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DamusPurpleWelcomeView.swift; sourceTree = ""; }; @@ -2786,6 +2792,7 @@ D78F080B2D7F78EB00FC6C75 /* Request.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Request.swift; sourceTree = ""; }; D78F08102D7F78F600FC6C75 /* Response.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Response.swift; sourceTree = ""; }; D78F08162D7F7F6C00FC6C75 /* NIP04.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NIP04.swift; sourceTree = ""; }; + D795356A2EBD289D00AACF98 /* AppLifecycleHandlingTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppLifecycleHandlingTests.swift; sourceTree = ""; }; D798D21D2B0858BB00234419 /* MigratedTypes.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MigratedTypes.swift; sourceTree = ""; }; D798D2272B085CDA00234419 /* NdbNote+.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "NdbNote+.swift"; sourceTree = ""; }; D798D22B2B086C7400234419 /* NostrEvent+.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "NostrEvent+.swift"; sourceTree = ""; }; @@ -3297,6 +3304,7 @@ 4C9054862A6AEB4500811EEC /* nostrdb */ = { isa = PBXGroup; children = ( + D75154BE2EC5910600BF2CB2 /* NdbUseLock.swift */, D74EC84E2E1856AF0091DC51 /* NonCopyableLinkedList.swift */, D733F9E42D92C75C00317B11 /* UnownedNdbNote.swift */, D7F5630F2DEE71BB008509DE /* NdbFilter.swift */, @@ -5275,6 +5283,7 @@ D7EBF8BC2E5946F9004EAE29 /* NostrNetworkManagerTests */ = { isa = PBXGroup; children = ( + D795356A2EBD289D00AACF98 /* AppLifecycleHandlingTests.swift */, D7EBF8BD2E594708004EAE29 /* test_notes.jsonl */, D7EBF8BA2E5901F7004EAE29 /* NostrNetworkManagerTests.swift */, D7EBF8BF2E5D39D1004EAE29 /* ThreadModelTests.swift */, @@ -5737,6 +5746,7 @@ 4CC6AA792CAB688500989CEF /* sha256.c in Sources */, 4CC6AA7B2CAB688500989CEF /* likely.c in Sources */, 4CC6AA7F2CAB688500989CEF /* htable.c in Sources */, + D75154C02EC5910A00BF2CB2 /* NdbUseLock.swift in Sources */, 4CC6AA862CAB688500989CEF /* list.c in Sources */, 4CC6AA872CAB688500989CEF /* utf8.c in Sources */, 4CC6AA892CAB688500989CEF /* debug.c in Sources */, @@ -6259,6 +6269,7 @@ D72E127A2BEEEED000F4F781 /* NostrFilterTests.swift in Sources */, B5B4D1432B37D47600844320 /* NdbExtensions.swift in Sources */, 3ACBCB78295FE5C70037388A /* TimeAgoTests.swift in Sources */, + D795356B2EBD28A800AACF98 /* AppLifecycleHandlingTests.swift in Sources */, D72A2D072AD9C1FB002AFF62 /* MockProfiles.swift in Sources */, B5A75C2A2B546D94007AFBC0 /* MuteItemTests.swift in Sources */, D7DB1FEE2D5AC51B00CF06DA /* NIP44v2EncryptionTests.swift in Sources */, @@ -6777,6 +6788,7 @@ 82D6FC3A2CD99F7900C925F4 /* WideEventView.swift in Sources */, 82D6FC3B2CD99F7900C925F4 /* LongformView.swift in Sources */, 82D6FC3C2CD99F7900C925F4 /* LongformPreview.swift in Sources */, + D75154C22EC5910A00BF2CB2 /* NdbUseLock.swift in Sources */, 82D6FC3D2CD99F7900C925F4 /* EventShell.swift in Sources */, 82D6FC3E2CD99F7900C925F4 /* MentionView.swift in Sources */, 82D6FC3F2CD99F7900C925F4 /* EventLoaderView.swift in Sources */, @@ -7202,6 +7214,7 @@ D73E5F302C6A97F4007EB227 /* EventProfile.swift in Sources */, D73E5F312C6A97F4007EB227 /* EventMenu.swift in Sources */, D73E5F322C6A97F4007EB227 /* EventMutingContainerView.swift in Sources */, + D75154C12EC5910A00BF2CB2 /* NdbUseLock.swift in Sources */, D73E5F332C6A97F4007EB227 /* ZapEvent.swift in Sources */, 5C8F97362EB46145009399B1 /* LiveStreamView.swift in Sources */, D73E5F342C6A97F4007EB227 /* TextEvent.swift in Sources */, @@ -7466,6 +7479,7 @@ 4CC6AAC52CAB688500989CEF /* likely.c in Sources */, 4CC6AAC92CAB688500989CEF /* htable.c in Sources */, 4CC6AAD02CAB688500989CEF /* list.c in Sources */, + D75154BF2EC5910A00BF2CB2 /* NdbUseLock.swift in Sources */, 4CC6AAD12CAB688500989CEF /* utf8.c in Sources */, 4CC6AAD32CAB688500989CEF /* debug.c in Sources */, 4CC6AAD42CAB688500989CEF /* str.c in Sources */, diff --git a/damus/ContentView.swift b/damus/ContentView.swift index bc6025122..da6121ea3 100644 --- a/damus/ContentView.swift +++ b/damus/ContentView.swift @@ -538,9 +538,7 @@ struct ContentView: View { Log.debug("App background signal handling: App being backgrounded", for: .app_lifecycle) let startTime = CFAbsoluteTimeGetCurrent() await damus_state.nostrNetwork.handleAppBackgroundRequest() // Close ndb streaming tasks before closing ndb to avoid memory errors - Log.debug("App background signal handling: Nostr network closed after %.2f seconds", for: .app_lifecycle, CFAbsoluteTimeGetCurrent() - startTime) - damus_state.ndb.close() - Log.debug("App background signal handling: Ndb closed after %.2f seconds", for: .app_lifecycle, CFAbsoluteTimeGetCurrent() - startTime) + Log.debug("App background signal handling: Nostr network and Ndb closed after %.2f seconds", for: .app_lifecycle, CFAbsoluteTimeGetCurrent() - startTime) this_app.endBackgroundTask(bgTask) } break @@ -552,9 +550,7 @@ struct ContentView: View { Task { await damusClosingTask?.value // Wait for the closing task to finish before reopening things, to avoid race conditions damusClosingTask = nil - damus_state.ndb.reopen() - // Pinging the network will automatically reconnect any dead websocket connections - await damus_state.nostrNetwork.ping() + await damus_state.nostrNetwork.handleAppForegroundRequest() } @unknown default: break @@ -1144,7 +1140,6 @@ extension LossyLocalNotification { } } - func logout(_ state: DamusState?) { state?.close() diff --git a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift index 6bc1a17fa..69b61fe88 100644 --- a/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift +++ b/damus/Core/Networking/NostrNetworkManager/NostrNetworkManager.swift @@ -59,9 +59,19 @@ class NostrNetworkManager { await self.pool.disconnect() } - func handleAppBackgroundRequest() async { + func handleAppBackgroundRequest(beforeClosingNdb operationBeforeClosingNdb: (() async -> Void)? = nil) async { + // Mark NDB as closed without actually closing it, to avoid new tasks from using NostrDB + // self.delegate.ndb.markClosed() await self.reader.cancelAllTasks() await self.pool.cleanQueuedRequestForSessionEnd() + await operationBeforeClosingNdb?() + self.delegate.ndb.close() + } + + func handleAppForegroundRequest() async { + self.delegate.ndb.reopen() + // Pinging the network will automatically reconnect any dead websocket connections + await self.ping() } func close() async { diff --git a/damusTests/NostrNetworkManagerTests/AppLifecycleHandlingTests.swift b/damusTests/NostrNetworkManagerTests/AppLifecycleHandlingTests.swift new file mode 100644 index 000000000..11aa5ca77 --- /dev/null +++ b/damusTests/NostrNetworkManagerTests/AppLifecycleHandlingTests.swift @@ -0,0 +1,73 @@ +// +// AppLifecycleHandlingTests.swift +// damus +// +// Created by Daniel D’Aquino on 2025-11-06. +// + +import XCTest +@testable import damus + + +class AppLifecycleHandlingTests: XCTestCase { + + func getTestNotesJSONL() -> String { + // Get the path for the test_notes.jsonl file in the same folder as this test file + let testBundle = Bundle(for: type(of: self)) + let fileURL = testBundle.url(forResource: "test_notes", withExtension: "jsonl")! + + // Load the contents of the file + return try! String(contentsOf: fileURL, encoding: .utf8) + } + + /// Tests for some race conditions between the app closing down and streams opening throughout the app + /// See https://github.com/damus-io/damus/issues/3245 for more context. + /// + /// **Note:** Time delays are intentionally added because we actually want to provoke possible race conditions, + /// so using proper waiting mechanisms would defeat the purpose of the test. + func testAppLifecycleRaceConditions() async throws { + let damusState = generate_test_damus_state(mock_profile_info: nil) + + let notesJSONL = getTestNotesJSONL() + for noteText in notesJSONL.split(separator: "\n") { + let _ = damusState.ndb.processEvent("[\"EVENT\",\"subid\",\(String(noteText))]") + } + + // Give some time ndb some time to fill up + try? await Task.sleep(for: .milliseconds(2000)) + + + + // Start measuring the time elapsed for debugging + let startTime = CFAbsoluteTimeGetCurrent() + func getElapsedTimeMiliseconds() -> String { + return "\((CFAbsoluteTimeGetCurrent() - startTime) * 1000) ms" + } + + + Task.detached { + for i in 0...10000 { + try await Task.sleep(for: .milliseconds(Int.random(in: 0...10))) + print("APP_LIFECYCLE_TEST \(i): About to close Ndb. Elapsed time: \(getElapsedTimeMiliseconds())") + damusState.ndb.close() + print("APP_LIFECYCLE_TEST \(i): Closed Ndb. Elapsed time: \(getElapsedTimeMiliseconds())") + print("APP_LIFECYCLE_TEST \(i): Reopening Ndb. Elapsed time: \(getElapsedTimeMiliseconds())") + _ = damusState.ndb.reopen() + print("APP_LIFECYCLE_TEST \(i): Reopened Ndb. Elapsed time: \(getElapsedTimeMiliseconds())") + + } + } + for i in 0...10000 { + do { + try await Task.sleep(for: .milliseconds(Int.random(in: 0...10))) + print("APP_LIFECYCLE_TEST \(i): Starting new query. Elapsed time: \(getElapsedTimeMiliseconds())") + guard let txn = NdbTxn(ndb: damusState.ndb) else { continue } + _ = try damusState.ndb.query(with: txn, filters: [try NdbFilter(from: NostrFilter(kinds: [.text], limit: 1000))], maxResults: 500) + } + catch { + print("APP_LIFECYCLE_TEST \(i): Query error: \(error). Elapsed time: \(getElapsedTimeMiliseconds())") + } + print("APP_LIFECYCLE_TEST \(i): Finished query. Elapsed time: \(getElapsedTimeMiliseconds())") + } + } +} diff --git a/nostrdb/Ndb+.swift b/nostrdb/Ndb+.swift index 79fc39c18..00b22beb4 100644 --- a/nostrdb/Ndb+.swift +++ b/nostrdb/Ndb+.swift @@ -18,12 +18,12 @@ extension Ndb { /// - maxSimultaneousResults: Maximum number of initial results to return /// - Returns: AsyncStream of StreamItem events /// - Throws: NdbStreamError if subscription fails - func subscribe(filters: [NostrFilter], maxSimultaneousResults: Int = 1000) throws(NdbStreamError) -> AsyncStream { + func subscribe(filters: [NostrFilter], maxSimultaneousResults: Int = 1000) throws -> AsyncStream { let ndbFilters: [NdbFilter] do { ndbFilters = try filters.toNdbFilters() } catch { - throw .cannotConvertFilter(error) + throw NdbStreamError.cannotConvertFilter(error) } return try self.subscribe(filters: ndbFilters, maxSimultaneousResults: maxSimultaneousResults) } diff --git a/nostrdb/Ndb.swift b/nostrdb/Ndb.swift index e16daaa37..feab9587f 100644 --- a/nostrdb/Ndb.swift +++ b/nostrdb/Ndb.swift @@ -7,6 +7,7 @@ import Foundation import OSLog +import Synchronization fileprivate let APPLICATION_GROUP_IDENTIFIER = "group.com.damus" @@ -34,6 +35,7 @@ class Ndb { var generation: Int private var closed: Bool private var callbackHandler: Ndb.CallbackHandler + let ndbAccessLock: Ndb.UseLockProtocol = initLock() private static let DEFAULT_WRITER_SCRATCH_SIZE: Int32 = 2097152; // 2mb scratch size for the writer thread, it should match with the one specified in nostrdb.c @@ -158,6 +160,7 @@ class Ndb { self.ndb = db self.closed = false self.callbackHandler = callbackHandler + self.ndbAccessLock.markNdbOpen() } private static func migrate_db_location_if_needed() throws { @@ -206,14 +209,23 @@ class Ndb { // This simple initialization will cause subscriptions not to be ever called. Probably fine because this initializer is used only for empty example ndb instances. self.callbackHandler = Ndb.CallbackHandler() } + + /// Mark NostrDB as "closed" without actually closing it. + /// Useful when shutting down tasks that use NostrDB while avoiding new tasks from using it. + func markClosed() { + self.closed = true + } func close() { guard !self.is_closed else { return } self.closed = true - print("txn: CLOSING NOSTRDB") - ndb_destroy(self.ndb.ndb) - self.generation += 1 - print("txn: NOSTRDB CLOSED") + try! self.ndbAccessLock.waitUntilNdbCanClose(thenClose: { + print("txn: CLOSING NOSTRDB") + ndb_destroy(self.ndb.ndb) + self.generation += 1 + print("txn: NOSTRDB CLOSED") + return false + }, maxTimeout: .milliseconds(2000)) } func reopen() -> Bool { @@ -223,9 +235,10 @@ class Ndb { } print("txn: NOSTRDB REOPENED (gen \(generation))") - + + self.ndb = db // Set the new DB before marking it as open to prevent access to the old DB self.closed = false - self.ndb = db + self.ndbAccessLock.markNdbOpen() return true } @@ -622,32 +635,37 @@ class Ndb { /// - maxResults: Maximum number of results to return /// - Returns: Array of note keys matching the filters /// - Throws: NdbStreamError if the query fails - func query(with txn: NdbTxn, filters: [NdbFilter], maxResults: Int) throws(NdbStreamError) -> [NoteKey] { - guard !self.is_closed else { throw .ndbClosed } - let filtersPointer = UnsafeMutablePointer.allocate(capacity: filters.count) - defer { filtersPointer.deallocate() } - - for (index, ndbFilter) in filters.enumerated() { - filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter - } - - let count = UnsafeMutablePointer.allocate(capacity: 1) - defer { count.deallocate() } - - let results = UnsafeMutablePointer.allocate(capacity: maxResults) - defer { results.deallocate() } - - guard !self.is_closed else { throw .ndbClosed } - guard ndb_query(&txn.txn, filtersPointer, Int32(filters.count), results, Int32(maxResults), count) == 1 else { - throw NdbStreamError.initialQueryFailed - } - - var noteIds: [NoteKey] = [] - for i in 0..(with txn: NdbTxn, filters: [NdbFilter], maxResults: Int) throws -> [NoteKey] { + guard !self.is_closed else { throw NdbStreamError.ndbClosed } + return try self.ndbAccessLock.keepNdbOpen(during: { + // Double-check things to avoid TOCTOU race conditions + guard !self.is_closed else { throw NdbStreamError.ndbClosed } + guard txn.generation == self.generation else { throw NdbStreamError.cannotOpenTransaction } + + let filtersPointer = UnsafeMutablePointer.allocate(capacity: filters.count) + defer { filtersPointer.deallocate() } + + for (index, ndbFilter) in filters.enumerated() { + filtersPointer.advanced(by: index).pointee = ndbFilter.ndbFilter + } + + let count = UnsafeMutablePointer.allocate(capacity: 1) + defer { count.deallocate() } + + let results = UnsafeMutablePointer.allocate(capacity: maxResults) + defer { results.deallocate() } + + guard ndb_query(&txn.txn, filtersPointer, Int32(filters.count), results, Int32(maxResults), count) == 1 else { + throw NdbStreamError.initialQueryFailed + } + + var noteIds: [NoteKey] = [] + for i in 0.. AsyncStream { - guard !self.is_closed else { throw .ndbClosed } + func subscribe(filters: [NdbFilter], maxSimultaneousResults: Int = 1000) throws -> AsyncStream { + guard !self.is_closed else { throw NdbStreamError.ndbClosed } - do { try Task.checkCancellation() } catch { throw .cancelled } + do { try Task.checkCancellation() } catch { throw NdbStreamError.cancelled } // CRITICAL: Create the subscription FIRST before querying to avoid race condition // This ensures that any events indexed after subscription but before query won't be missed let newEventsStream = ndbSubscribe(filters: filters) // Now fetch initial results after subscription is registered - guard let txn = NdbTxn(ndb: self) else { throw .cannotOpenTransaction } + guard let txn = NdbTxn(ndb: self) else { throw NdbStreamError.cannotOpenTransaction } // Use our safe wrapper instead of direct C function call let noteIds = try query(with: txn, filters: filters, maxResults: maxSimultaneousResults) - do { try Task.checkCancellation() } catch { throw .cancelled } + do { try Task.checkCancellation() } catch { throw NdbStreamError.cancelled } // Create a cascading stream that combines initial results with new events return AsyncStream { continuation in @@ -996,4 +1014,3 @@ func getDebugCheckedRoot(byteBuffer: inout ByteBuffer) thro func remove_file_prefix(_ str: String) -> String { return str.replacingOccurrences(of: "file://", with: "") } - diff --git a/nostrdb/NdbTxn.swift b/nostrdb/NdbTxn.swift index e4130fe6d..efae249a2 100644 --- a/nostrdb/NdbTxn.swift +++ b/nostrdb/NdbTxn.swift @@ -43,16 +43,18 @@ class NdbTxn: RawNdbTxnAccessible { let new_ref_count = ref_count + 1 Thread.current.threadDictionary["ndb_txn_ref_count"] = new_ref_count } else { - self.txn = ndb_txn() guard !ndb.is_closed else { return nil } - self.generation = ndb.generation - #if TXNDEBUG - txn_count += 1 - #endif - let ok = ndb_begin_query(ndb.ndb.ndb, &self.txn) != 0 - if !ok { - return nil - } + let txn: ndb_txn? = try? ndb.ndbAccessLock.keepNdbOpen(during: { + var txn = ndb_txn() + #if TXNDEBUG + txn_count += 1 + #endif + let ok = ndb_begin_query(ndb.ndb.ndb, &txn) != 0 + guard ok else { return nil } + return txn + }, maxWaitTimeout: .milliseconds(200)) + guard let txn else { return nil } + self.txn = txn self.generation = ndb.generation Thread.current.threadDictionary["ndb_txn"] = self.txn Thread.current.threadDictionary["ndb_txn_ref_count"] = 1 @@ -97,7 +99,13 @@ class NdbTxn: RawNdbTxnAccessible { Thread.current.threadDictionary["ndb_txn_ref_count"] = new_ref_count assert(new_ref_count >= 0, "NdbTxn reference count should never be below zero") if new_ref_count <= 0 { - ndb_end_query(&self.txn) + _ = try? ndb.ndbAccessLock.keepNdbOpen(during: { + // Check again to avoid TOCTOU race conditions where the ndb may have upgraded generations in the meantime + if self.generation != self.ndb.generation { return } + if self.ndb.is_closed { return } + + ndb_end_query(&self.txn) + }, maxWaitTimeout: .milliseconds(200)) Thread.current.threadDictionary.removeObject(forKey: "ndb_txn") Thread.current.threadDictionary.removeObject(forKey: "ndb_txn_ref_count") } diff --git a/nostrdb/NdbUseLock.swift b/nostrdb/NdbUseLock.swift new file mode 100644 index 000000000..a0105ca29 --- /dev/null +++ b/nostrdb/NdbUseLock.swift @@ -0,0 +1,197 @@ +// +// NdbUseLock.swift +// damus +// +// Created by Daniel D’Aquino on 2025-11-12. +// + +import Dispatch +import Synchronization + +extension Ndb { + /// Creates a `sync` mechanism for coordinating usages of ndb (read or write) with the app's ability to close ndb. + /// + /// This prevents race condition between threads reading from `ndb` and the app trying to close `ndb` + /// + /// Implementation notes: + /// - This was made as a synchronous mechanism because using `async` solutions (e.g. isolating `Ndb` into an `NdbActor`) + /// creates a necessity to change way too much code around the codebase, the interface becomes more cumbersome and difficult to use, + /// and might create unnecessary async delays (e.g. it would prevent two tasks from reading Ndb data at once) + @available(iOS 18.0, *) + class UseLock: UseLockProtocol { + /// Number of functions using the `ndb` object (for reading or writing data) + private let ndbUserCount = Mutex(0) + /// Semaphore for general access to `ndb`. A closing task requires exclusive access. Users of `ndb` (read/write tasks) share the access + private let ndbAccessSemaphore: DispatchSemaphore = DispatchSemaphore(value: 0) + private let ndbIsOpen = Mutex(false) + /// How long a thread can block before throwing an error + private static let DEFAULT_TIMEOUT: DispatchTimeInterval = .milliseconds(500) + + /// Keeps the ndb open while performing some specified operation. + /// + /// **WARNING:** Ensure ndb is open _before_ calling this, otherwise the thread may block for the `maxTimeout` period. + /// **Implementation note:** NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation + /// + /// - Parameter operation: The operation to perform while `ndb` is open. Keep this as short as safely possible! + /// - Parameter maxTimeout: The maximum amount of time the function will wait for the lock before giving up. + /// - Returns: The return result for the given operation + func keepNdbOpen(during operation: @escaping () throws -> T, maxWaitTimeout: DispatchTimeInterval = DEFAULT_TIMEOUT) throws -> T { + try self.incrementUserCount(maxTimeout: maxWaitTimeout) + defer { self.decrementUserCount() } // Use defer to guarantee this will always be called no matter the outcome of the function + return try operation() + } + + /// Waits for ndb to be able to close, then closes it. + /// + /// - Parameter operation: The operation to close. Must return the final boolean value indicating if ndb was closed in the end + /// + /// Implementation note: NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation + func waitUntilNdbCanClose(thenClose operation: @escaping () -> Bool, maxTimeout: DispatchTimeInterval = DEFAULT_TIMEOUT) throws { + try ndbAccessSemaphore.waitOrThrow(timeout: .now() + maxTimeout) + ndbIsOpen.withLock { ndbIsOpen in + ndbIsOpen = operation() + if ndbIsOpen { + ndbAccessSemaphore.signal() + } + } + } + + func markNdbOpen() { + ndbIsOpen.withLock { ndbIsOpen in + if !ndbIsOpen { + ndbIsOpen = true + ndbAccessSemaphore.signal() + } + } + } + + private func incrementUserCount(maxTimeout: DispatchTimeInterval = .seconds(2)) throws { + try ndbUserCount.withLock { currentCount in + // Signal that ndb cannot close while we have at least one user using ndb + if currentCount == 0 { + try ndbAccessSemaphore.waitOrThrow(timeout: .now() + maxTimeout) + } + currentCount += 1 + } + } + + private func decrementUserCount() { + ndbUserCount.withLock { currentCount in + currentCount -= 1 + // Signal that ndb can close if we have zero users using ndb + if currentCount == 0 { + ndbAccessSemaphore.signal() + } + } + } + + enum LockError: Error { + case timeout + } + } + + /// A fallback implementation for `UseLock` that works in iOS older than iOS 18, with reduced syncing mechanisms + class FallbackUseLock: UseLockProtocol { + /// Number of functions using the `ndb` object (for reading or writing data) + private var ndbUserCount: UInt = 0 + /// Semaphore for general access to `ndb`. A closing task requires exclusive access. Users of `ndb` (read/write tasks) share the access + private let ndbAccessSemaphore: DispatchSemaphore = DispatchSemaphore(value: 0) + /// How long a thread can block before throwing an error + private static let DEFAULT_TIMEOUT: DispatchTimeInterval = .milliseconds(500) + + /// Keeps the ndb open while performing some specified operation. + /// + /// **WARNING:** Ensure ndb is open _before_ calling this, otherwise the thread may block for the `maxTimeout` period. + /// **Implementation note:** NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation + /// + /// - Parameter operation: The operation to perform while `ndb` is open. Keep this as short as safely possible! + /// - Parameter maxTimeout: The maximum amount of time the function will wait for the lock before giving up. + /// - Returns: The return result for the given operation + func keepNdbOpen(during operation: @escaping () throws -> T, maxWaitTimeout: DispatchTimeInterval = DEFAULT_TIMEOUT) throws -> T { + try self.incrementUserCount(maxTimeout: maxWaitTimeout) + defer { self.decrementUserCount() } // Use defer to guarantee this will always be called no matter the outcome of the function + return try operation() + } + + /// Waits for ndb to be able to close, then closes it. + /// + /// - Parameter operation: The operation to close. Must return the final boolean value indicating if ndb was closed in the end + /// + /// Implementation note: NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation + func waitUntilNdbCanClose(thenClose operation: @escaping () -> Bool, maxTimeout: DispatchTimeInterval = DEFAULT_TIMEOUT) throws { + try ndbAccessSemaphore.waitOrThrow(timeout: .now() + maxTimeout) + let ndbIsOpen = operation() + if ndbIsOpen { + ndbAccessSemaphore.signal() + } + } + + /// Marks `ndb` as open to allow other users to use it. Do not call this more than once + func markNdbOpen() { + ndbAccessSemaphore.signal() + } + + private func incrementUserCount(maxTimeout: DispatchTimeInterval = .seconds(2)) throws { + if ndbUserCount == 0 { + try ndbAccessSemaphore.waitOrThrow(timeout: .now() + maxTimeout) + } + ndbUserCount += 1 + } + + private func decrementUserCount() { + ndbUserCount -= 1 + // Signal that ndb can close if we have zero users using ndb + if ndbUserCount == 0 { + ndbAccessSemaphore.signal() + } + } + + enum LockError: Error { + case timeout + } + } + + protocol UseLockProtocol { + /// Keeps the ndb open while performing some specified operation. + /// + /// **WARNING:** Ensure ndb is open _before_ calling this, otherwise the thread may block for the `maxTimeout` period. + /// **Implementation note:** NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation + /// + /// - Parameter operation: The operation to perform while `ndb` is open. Keep this as short as safely possible! + /// - Parameter maxTimeout: The maximum amount of time the function will wait for the lock before giving up. + /// - Returns: The return result for the given operation + func keepNdbOpen(during operation: @escaping () throws -> T, maxWaitTimeout: DispatchTimeInterval) throws -> T + + /// Waits for ndb to be able to close, then closes it. + /// + /// - Parameter operation: The operation to close. Must return the final boolean value indicating if ndb was closed in the end + /// + /// Implementation note: NEVER change this to `async`! This is a blocking operation, so we want to minimize the time of the operation + func waitUntilNdbCanClose(thenClose operation: @escaping () -> Bool, maxTimeout: DispatchTimeInterval) throws + + /// Marks `ndb` as open to allow other users to use it. Do not call this more than once + func markNdbOpen() + } + + static func initLock() -> UseLockProtocol { + if #available(iOS 18.0, *) { + return UseLock() + } else { + return FallbackUseLock() + } + } +} + +fileprivate extension DispatchSemaphore { + func waitOrThrow(timeout: DispatchTime) throws(TimingError) { + let result = self.wait(timeout: timeout) + switch result { + case .success: return + case .timedOut: throw .timeout + } + } + + enum TimingError: Error { + case timeout + } +}