-
-
Notifications
You must be signed in to change notification settings - Fork 3
feat(v2): Postgres storage adapter #529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
6fe9265 to
dcf8130
Compare
This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose between the adapters in the configuration. This adapter will also work with AlloyDB. In postgres, the keyword `offset` is reserved, so that column is called `kafka_offset` in the PG tables and converted to `offset`. The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between a SQLite and Postgres DB. A `remove_db` function was added to the trait and the existing adapters, since the tests create a unique PG database on every run that should be cleaned up. The `create_test_store` function was updated to be the standard for all tests, and to allow choosing between an SQLite and Postgres DB.
dcf8130 to
f70bfda
Compare
fpacifici
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we start a postgres as part of devservices ?
fpacifici
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend reducing the duplication in the ActivationStore between the two stores. I don't know whether we will remove SQLite from the implementation for good, anyway it will take some time before we get there, having so much copy paste is quite dangerous.
src/store/inflight_activation.rs
Outdated
| /// Remove the database, used only in tests | ||
| async fn remove_db(&self) -> Result<(), Error>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WE should create an MCP in front of production taskbroker and we absolutely need to expose this, which will certainly be used only in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what could go wrong? 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests can't fail if there's no data to run the tests on.
Having the duplication is also useful to allow the implementations to diverge as the SQL dialects of postgres and sqlite are different enough to make code that creates queries challenging to share. For non-query logic having common logic pulled out makes sense. |
src/store/inflight_activation.rs
Outdated
| /// Remove the database, used only in tests | ||
| async fn remove_db(&self) -> Result<(), Error>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what could go wrong? 😄
| let row_result: (i64,) = sqlx::query_as("SELECT pg_database_size($1) as size") | ||
| .bind(&self.config.pg_database_name) | ||
| .fetch_one(&self.read_pool) | ||
| .await?; | ||
| if row_result.0 < 0 { | ||
| return Ok(0); | ||
| } | ||
| Ok(row_result.0 as u64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still I doubt we will want to accumulate data indefinitely in the DB. At a point we should trigger backpressure.
👍 I don't think we want an unbounded table size as the consumer component of taskbroker can process much faster than workers can complete tasks. Perhaps we start with a high limit in the 20k-80k+ range? 20k leaves capacity for all pending tasks in small regions, and large regions have ~80k pending across all pools.
| #[instrument(skip_all)] | ||
| async fn get_pending_activation( | ||
| &self, | ||
| application: Option<&str>, | ||
| namespace: Option<&str>, | ||
| ) -> Result<Option<InflightActivation>, Error> { | ||
| // Convert single namespace to vector for internal use | ||
| let namespaces = namespace.map(|ns| vec![ns.to_string()]); | ||
|
|
||
| // If a namespace filter is used, an application must also be used. | ||
| if namespaces.is_some() && application.is_none() { | ||
| warn!( | ||
| "Received request for namespaced task without application. namespaces = {namespaces:?}" | ||
| ); | ||
| return Ok(None); | ||
| } | ||
| let result = self | ||
| .get_pending_activations_from_namespaces(application, namespaces.as_deref(), Some(1)) | ||
| .await?; | ||
| if result.is_empty() { | ||
| return Ok(None); | ||
| } | ||
| Ok(Some(result[0].clone())) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The InflightActivationStore trait could provide a default implementation for methods like this 🤷
I opened a separate PR to do this, but ran into a lot of typing issues. https://github.com/getsentry/taskbroker/pull/533/changes#r2696022042 For this PR, I only created default implementations for the functions that don't directly query the DB. I'm not going to remove any other duplication in this PR. |
This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose between the adapters in the configuration. This adapter will also work with AlloyDB.
In postgres, the keyword
offsetis reserved, so that column is calledkafka_offsetin the PG tables and converted tooffset.The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The
create_test_storefunction was updated to be the standard for all tests, and to allow choosing between a SQLite and Postgres DB.A
remove_dbfunction was added to the trait and the existing adapters, since the tests create a unique PG database on every run that should be cleaned up.The
create_test_storefunction was updated to be the standard for all tests, and to allow choosing between an SQLite and Postgres DB.