Skip to content

Commit 0600923

Browse files
committed
Inital commit of cxxrocksdb.
Signed-off-by: subains <[email protected]>
1 parent 9baee54 commit 0600923

File tree

16 files changed

+714
-7
lines changed

16 files changed

+714
-7
lines changed

.cargo/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[env]
2+
CXX="g++"
3+
CXXFLAGS="-std=c++20 -DCXXASYNC_HAVE_COROUTINE_HEADER"

.gitmodules

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
[submodule "rocksdb"]
22
path = librocksdb_sys/rocksdb
3-
url = https://github.com/tikv/rocksdb.git
4-
branch = 6.29.tikv
3+
url = https://github.com/subains/rocksdb.git
4+
branch = coroutines+iouring
55

66
[submodule "titan"]
77
path = librocksdb_sys/libtitan_sys/titan
8-
url = https://github.com/tikv/titan.git
9-
branch = master
8+
url = https://github.com/subains/titan.git
9+
branch = coroutine+iouring

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ valgrind = []
2626
[dependencies]
2727
libc = "0.2.11"
2828
librocksdb_sys = { path = "librocksdb_sys" }
29+
#cxxrocksdb = { path = "cxxrocksdb" }
2930

3031
[dev-dependencies]
3132
crc = "1.8"

cxxrocksdb/Cargo.toml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[package]
2+
edition = "2018"
3+
version = "0.1.0"
4+
name = "cxxrocksdb"
5+
authors = ["Sunny Bains <[email protected]>"]
6+
7+
[lib]
8+
name = "cxxrocksdb"
9+
path = "src/lib.rs"
10+
crate-type = ["staticlib"]
11+
12+
[dependencies]
13+
once_cell = "1"
14+
async-recursion = "0.3"
15+
tokio = { version = "1", features = ["full"] }
16+
rocksdb = { path = ".." }
17+
#iou = "0.3.3"
18+
19+
[dependencies.cxx]
20+
version = "1"
21+
cxx-build = "1"
22+
features = ["c++20"]
23+
24+
[dependencies.futures]
25+
version = "0.3"
26+
features = ["thread-pool"]
27+
28+
[build-dependencies]
29+
cc = "1.0.3"
30+
cmake = "0.1"
31+
cxx-build = "1"
32+
pkg-config = "0.3"
33+
34+
[dependencies.snappy-sys]
35+
git = "https://github.com/busyjay/rust-snappy.git"
36+
branch = "static-link"

cxxrocksdb/build.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
fn main() {
2+
println!("cargo:rerun-if-changed=build.rs");
3+
println!("cargo:rerun-if-changed=include/cxxrocksdb.h");
4+
println!("cargo:rerun-if-changed=src/cxxrocksdb.cc");
5+
println!("cargo:rustc-link-arg=-lcxxbridge1");
6+
7+
// FIXME: Use linklib() from top level build.rs
8+
println!("cargo:rustc-link-arg=-lstdc++");
9+
println!("cargo:rerun-if-changed=src/cxxrocksdb.cc");
10+
11+
println!("cargo:rustc-link-lib=static=cxxrocksdb");
12+
13+
cxx_build::bridge("src/lib.rs")
14+
.file("src/cxxrocksdb.cc")
15+
.flag("-DCXXASYNC_HAVE_COROUTINE_HEADER")
16+
.flag("-fcoroutines")
17+
.flag("-std=c++20")
18+
.flag_if_supported("-Wall")
19+
.include("include")
20+
.include("../librocksdb_sys/rocksdb/include")
21+
.include("../librocksdb_sys/crocksdb")
22+
.compile("cxxrocksdb");
23+
}

cxxrocksdb/include/cxxrocksdb.h

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#pragma once
2+
3+
#include "rocksdb/db.h"
4+
#include "crocksdb/c.h"
5+
#include "rust/cxx.h"
6+
#include "rocksdb/async_future.h"
7+
8+
#include <memory>
9+
#include <tuple>
10+
#include <unordered_map>
11+
#include <set>
12+
13+
#include <sys/uio.h>
14+
15+
struct CRocksDB;
16+
struct RustStatus;
17+
struct Async_result;
18+
19+
using ROCKSDB_NAMESPACE::Async_future;
20+
using ROCKSDB_NAMESPACE::ReadOptions;
21+
using ROCKSDB_NAMESPACE::PinnableSlice;
22+
using ReadTier = ROCKSDB_NAMESPACE::ReadTier;
23+
using Submit_queue = Async_future::Submit_queue;
24+
using Return_type = Async_future::Promise_type::Return_type;
25+
26+
struct Async_reader {
27+
Async_reader() = delete;
28+
Async_reader(Async_reader&&) = delete;
29+
Async_reader(const Async_reader&) = delete;
30+
Async_reader& operator=(Async_reader&&) = delete;
31+
Async_reader& operator=(const Async_reader&) = delete;
32+
33+
Async_reader(rocksdb::DB *db, size_t io_uring_size);
34+
35+
~Async_reader() noexcept;
36+
37+
/** Reap entries from the io_uring completion queue (CQ).
38+
@return Number of processed CQEs */
39+
uint32_t io_uring_reap_cq() const;
40+
41+
/** Peek and check if there are any CQEs to process.
42+
@return true if there are CQEs in the CQ. */
43+
bool io_uring_peek_cq() const;
44+
45+
Async_result get(const ReadOptions *ropts, rust::String k) const;
46+
47+
void setup_io_uring_sq_handler(ReadOptions *ropts) const;
48+
49+
uint32_t pending_io_uring_sqe_count() const {
50+
return m_n_pending_sqe.load();
51+
}
52+
53+
static RustStatus get_result(Async_result async_result, rust::String &v);
54+
55+
private:
56+
using Promise = Async_future::promise_type;
57+
58+
static void schedule_task(Promise* promise) noexcept;
59+
60+
private:
61+
struct IO_key {
62+
bool operator==(const IO_key& rhs) const {
63+
return m_fd == rhs.m_fd && m_off == rhs.m_off;
64+
}
65+
66+
int m_fd{-1};
67+
off_t m_off{};
68+
};
69+
70+
struct IO_key_hash {
71+
size_t operator()(const IO_key &io_key) const noexcept {
72+
return io_key.m_fd ^ io_key.m_off;
73+
}
74+
};
75+
76+
using IO_value = std::unordered_set<size_t>;
77+
78+
/** All data members are mutable so that we can use const functions.
79+
This allows us to use std::shared_ptr from Rust with an immutable
80+
reference. */
81+
mutable rocksdb::DB *m_db{};
82+
mutable std::atomic<int> m_n_pending_sqe{};
83+
mutable std::shared_ptr<io_uring> m_io_uring{};
84+
mutable std::shared_ptr<Submit_queue> m_submit_queue{};
85+
mutable std::unordered_map<IO_key, IO_value, IO_key_hash> m_pending_io{};
86+
};
87+
88+
std::shared_ptr<Async_reader> new_async_reader(CRocksDB* db, uint32_t io_uring_size);
89+
90+
RustStatus get_async_result(Async_result async_result, rust::String &v);

0 commit comments

Comments
 (0)