Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion integration_tests/tests/test_dbt_artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def test_run_results_partitioned(dbt_project: DbtProject):
)
assert len(results) >= 1

# Verify the partition column is created_at in BigQuery
# Verify the partition column is created_at in BigQuery (uses get_partition_by default)
partition_cols = dbt_project.run_query(
"SELECT column_name "
"FROM `{{ ref('dbt_run_results').database }}.{{ ref('dbt_run_results').schema }}.INFORMATION_SCHEMA.COLUMNS` "
Expand Down Expand Up @@ -255,3 +255,39 @@ def test_dbt_invocations_partitioned(dbt_project: DbtProject):
assert [row["column_name"] for row in partition_cols] == [
"created_at"
], "dbt_invocations should be partitioned by created_at in BigQuery"


@pytest.mark.only_on_targets(["bigquery"])
def test_data_monitoring_metrics_partitioned(dbt_project: DbtProject):
# data_monitoring_metrics is partitioned by bucket_end on BigQuery.
# Full-refresh to ensure the table is created with partitioning.
dbt_project.dbt_runner.run(select="data_monitoring_metrics", full_refresh=True)

partition_cols = dbt_project.run_query(
"SELECT column_name "
"FROM `{{ ref('data_monitoring_metrics').database }}.{{ ref('data_monitoring_metrics').schema }}.INFORMATION_SCHEMA.COLUMNS` "
"WHERE table_name = '{{ ref('data_monitoring_metrics').identifier }}' "
"AND is_partitioning_column = 'YES'"
)
assert [row["column_name"] for row in partition_cols] == [
"bucket_end"
], "data_monitoring_metrics should be partitioned by bucket_end in BigQuery"


@pytest.mark.only_on_targets(["bigquery"])
def test_data_monitoring_metrics_clustered(dbt_project: DbtProject):
# data_monitoring_metrics is clustered by full_table_name and metric_name on BigQuery.
# Full-refresh to ensure the table is created with clustering.
dbt_project.dbt_runner.run(select="data_monitoring_metrics", full_refresh=True)

clustering_cols = dbt_project.run_query(
"SELECT column_name "
"FROM `{{ ref('data_monitoring_metrics').database }}.{{ ref('data_monitoring_metrics').schema }}.INFORMATION_SCHEMA.COLUMNS` "
"WHERE table_name = '{{ ref('data_monitoring_metrics').identifier }}' "
"AND clustering_ordinal_position IS NOT NULL "
"ORDER BY clustering_ordinal_position"
)
assert [row["column_name"] for row in clustering_cols] == [
"full_table_name",
"metric_name",
], "data_monitoring_metrics should be clustered by full_table_name, metric_name in BigQuery"
1 change: 1 addition & 0 deletions macros/edr/system/system_utils/get_config_var.sql
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
"disable_samples_on_pii_tags": false,
"pii_tags": ["pii"],
"bigquery_disable_partitioning": false,
"bigquery_disable_clustering": false,
"upload_only_current_project_artifacts": false,
} %}
{{- return(default_config) -}}
Expand Down
12 changes: 12 additions & 0 deletions macros/utils/cross_db_utils/cluster_by.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{% macro get_cluster_by(columns) %}
{% do return(adapter.dispatch("get_cluster_by", "elementary")(columns)) %}
{% endmacro %}

{%- macro bigquery__get_cluster_by(columns) %}
{% if not elementary.get_config_var("bigquery_disable_clustering") %}
{% do return(columns) %}
{% endif %}
{% do return(none) %}
{% endmacro %}

{% macro default__get_cluster_by(columns) %} {% do return(none) %} {% endmacro %}
13 changes: 9 additions & 4 deletions macros/utils/cross_db_utils/partition_by.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
{% macro get_partition_by(column="created_at") %}
{% do return(adapter.dispatch("get_partition_by", "elementary")(column)) %}
{% endmacro %}

{# Backward-compatible alias so existing user overrides / references keep working. #}
{% macro get_default_partition_by() %}
{% do return(adapter.dispatch("get_default_partition_by", "elementary")()) %}
{% do return(elementary.get_partition_by()) %}
{% endmacro %}

{%- macro bigquery__get_default_partition_by() %}
{%- macro bigquery__get_partition_by(column) %}
{% if not elementary.get_config_var("bigquery_disable_partitioning") %}
{% do return(
{
"field": "created_at",
"field": column,
"data_type": "timestamp",
"granularity": "day",
}
Expand All @@ -15,4 +20,4 @@
{% do return(none) %}
{% endmacro %}

{% macro default__get_default_partition_by() %} {% do return(none) %} {% endmacro %}
{% macro default__get_partition_by(column) %} {% do return(none) %} {% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"timestamp_column": "created_at",
"prev_timestamp_column": "updated_at",
},
partition_by=elementary.get_partition_by(column="bucket_end"),
cluster_by=elementary.get_cluster_by(
columns=["full_table_name", "metric_name"]
),
table_type=elementary.get_default_table_type(),
incremental_strategy=elementary.get_default_incremental_strategy(),
)
Expand Down
2 changes: 1 addition & 1 deletion models/edr/dbt_artifacts/dbt_invocations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
transient=False,
unique_key="invocation_id",
on_schema_change="append_new_columns",
partition_by=elementary.get_default_partition_by(),
partition_by=elementary.get_partition_by(),
full_refresh=elementary.get_config_var("elementary_full_refresh"),
meta={
"timestamp_column": "created_at",
Expand Down
2 changes: 1 addition & 1 deletion models/edr/dbt_artifacts/dbt_run_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
if target.type == "postgres"
else []
),
partition_by=elementary.get_default_partition_by(),
partition_by=elementary.get_partition_by(),
full_refresh=elementary.get_config_var("elementary_full_refresh"),
meta={
"dedup_by_column": "model_execution_id",
Expand Down
Loading