Skip to content

Conversation

@evanh
Copy link
Member

@evanh evanh commented Jan 13, 2026

This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose between the adapters in the configuration. This adapter will also work with AlloyDB.

In postgres, the keyword offset is reserved, so that column is called kafka_offset in the PG tables and converted to offset.

The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The create_test_store function was updated to be the standard for all tests, and to allow choosing between a SQLite and Postgres DB.

A remove_db function was added to the trait and the existing adapters, since the tests create a unique PG database on every run that should be cleaned up.

The create_test_store function was updated to be the standard for all tests, and to allow choosing between an SQLite and Postgres DB.

@evanh evanh requested a review from a team as a code owner January 13, 2026 20:48
@evanh evanh force-pushed the evanh/feat/use-postgresql-interface branch from 6fe9265 to dcf8130 Compare January 13, 2026 21:35
@evanh evanh requested a review from a team January 13, 2026 21:35
This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose
between the adapters in the configuration. This adapter will also work with AlloyDB.

In postgres, the keyword `offset` is reserved, so that column is called `kafka_offset` in the PG
tables and converted to `offset`.

The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The
`create_test_store` function was updated to be the standard for all tests, and to allow choosing
between a SQLite and Postgres DB.

A `remove_db` function was added to the trait and the existing adapters, since the tests create a
unique PG database on every run that should be cleaned up.

The `create_test_store` function was updated to be the standard for all tests, and to allow choosing
between an SQLite and Postgres DB.
@evanh evanh force-pushed the evanh/feat/use-postgresql-interface branch from dcf8130 to f70bfda Compare January 13, 2026 21:39
Copy link

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we start a postgres as part of devservices ?

Copy link

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend reducing the duplication in the ActivationStore between the two stores. I don't know whether we will remove SQLite from the implementation for good, anyway it will take some time before we get there, having so much copy paste is quite dangerous.

Comment on lines 404 to 405
/// Remove the database, used only in tests
async fn remove_db(&self) -> Result<(), Error>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WE should create an MCP in front of production taskbroker and we absolutely need to expose this, which will certainly be used only in tests.

Copy link
Member

@markstory markstory Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what could go wrong? 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests can't fail if there's no data to run the tests on.

@markstory
Copy link
Member

I'd recommend reducing the duplication in the ActivationStore between the two stores. I don't know whether we will remove SQLite from the implementation for good, anyway it will take some time before we get there, having so much copy paste is quite dangerous.

Having the duplication is also useful to allow the implementations to diverge as the SQL dialects of postgres and sqlite are different enough to make code that creates queries challenging to share. For non-query logic having common logic pulled out makes sense.

Comment on lines 404 to 405
/// Remove the database, used only in tests
async fn remove_db(&self) -> Result<(), Error>;
Copy link
Member

@markstory markstory Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what could go wrong? 😄

Comment on lines +136 to +143
let row_result: (i64,) = sqlx::query_as("SELECT pg_database_size($1) as size")
.bind(&self.config.pg_database_name)
.fetch_one(&self.read_pool)
.await?;
if row_result.0 < 0 {
return Ok(0);
}
Ok(row_result.0 as u64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still I doubt we will want to accumulate data indefinitely in the DB. At a point we should trigger backpressure.

👍 I don't think we want an unbounded table size as the consumer component of taskbroker can process much faster than workers can complete tasks. Perhaps we start with a high limit in the 20k-80k+ range? 20k leaves capacity for all pending tasks in small regions, and large regions have ~80k pending across all pools.

Comment on lines 246 to 269
#[instrument(skip_all)]
async fn get_pending_activation(
&self,
application: Option<&str>,
namespace: Option<&str>,
) -> Result<Option<InflightActivation>, Error> {
// Convert single namespace to vector for internal use
let namespaces = namespace.map(|ns| vec![ns.to_string()]);

// If a namespace filter is used, an application must also be used.
if namespaces.is_some() && application.is_none() {
warn!(
"Received request for namespaced task without application. namespaces = {namespaces:?}"
);
return Ok(None);
}
let result = self
.get_pending_activations_from_namespaces(application, namespaces.as_deref(), Some(1))
.await?;
if result.is_empty() {
return Ok(None);
}
Ok(Some(result[0].clone()))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The InflightActivationStore trait could provide a default implementation for methods like this 🤷

@evanh
Copy link
Member Author

evanh commented Jan 15, 2026

I'd recommend reducing the duplication in the ActivationStore between the two stores. I don't know whether we will remove SQLite from the implementation for good, anyway it will take some time before we get there, having so much copy paste is quite dangerous.

I opened a separate PR to do this, but ran into a lot of typing issues. https://github.com/getsentry/taskbroker/pull/533/changes#r2696022042

For this PR, I only created default implementations for the functions that don't directly query the DB. I'm not going to remove any other duplication in this PR.

@evanh evanh requested a review from fpacifici January 15, 2026 21:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants