diff --git a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py index 99137c010..de022e0fd 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_artifacts.py +++ b/integration_tests/tests/test_dbt_artifacts/test_artifacts.py @@ -222,7 +222,7 @@ def test_run_results_partitioned(dbt_project: DbtProject): ) assert len(results) >= 1 - # Verify the partition column is created_at in BigQuery + # Verify the partition column is created_at in BigQuery (uses get_partition_by default) partition_cols = dbt_project.run_query( "SELECT column_name " "FROM `{{ ref('dbt_run_results').database }}.{{ ref('dbt_run_results').schema }}.INFORMATION_SCHEMA.COLUMNS` " @@ -255,3 +255,39 @@ def test_dbt_invocations_partitioned(dbt_project: DbtProject): assert [row["column_name"] for row in partition_cols] == [ "created_at" ], "dbt_invocations should be partitioned by created_at in BigQuery" + + +@pytest.mark.only_on_targets(["bigquery"]) +def test_data_monitoring_metrics_partitioned(dbt_project: DbtProject): + # data_monitoring_metrics is partitioned by bucket_end on BigQuery. + # Full-refresh to ensure the table is created with partitioning. + dbt_project.dbt_runner.run(select="data_monitoring_metrics", full_refresh=True) + + partition_cols = dbt_project.run_query( + "SELECT column_name " + "FROM `{{ ref('data_monitoring_metrics').database }}.{{ ref('data_monitoring_metrics').schema }}.INFORMATION_SCHEMA.COLUMNS` " + "WHERE table_name = '{{ ref('data_monitoring_metrics').identifier }}' " + "AND is_partitioning_column = 'YES'" + ) + assert [row["column_name"] for row in partition_cols] == [ + "bucket_end" + ], "data_monitoring_metrics should be partitioned by bucket_end in BigQuery" + + +@pytest.mark.only_on_targets(["bigquery"]) +def test_data_monitoring_metrics_clustered(dbt_project: DbtProject): + # data_monitoring_metrics is clustered by full_table_name and metric_name on BigQuery. + # Full-refresh to ensure the table is created with clustering. + dbt_project.dbt_runner.run(select="data_monitoring_metrics", full_refresh=True) + + clustering_cols = dbt_project.run_query( + "SELECT column_name " + "FROM `{{ ref('data_monitoring_metrics').database }}.{{ ref('data_monitoring_metrics').schema }}.INFORMATION_SCHEMA.COLUMNS` " + "WHERE table_name = '{{ ref('data_monitoring_metrics').identifier }}' " + "AND clustering_ordinal_position IS NOT NULL " + "ORDER BY clustering_ordinal_position" + ) + assert [row["column_name"] for row in clustering_cols] == [ + "full_table_name", + "metric_name", + ], "data_monitoring_metrics should be clustered by full_table_name, metric_name in BigQuery" diff --git a/macros/edr/system/system_utils/get_config_var.sql b/macros/edr/system/system_utils/get_config_var.sql index 186901a7e..fc3e1efb1 100644 --- a/macros/edr/system/system_utils/get_config_var.sql +++ b/macros/edr/system/system_utils/get_config_var.sql @@ -144,6 +144,7 @@ "disable_samples_on_pii_tags": false, "pii_tags": ["pii"], "bigquery_disable_partitioning": false, + "bigquery_disable_clustering": false, "upload_only_current_project_artifacts": false, } %} {{- return(default_config) -}} diff --git a/macros/utils/cross_db_utils/cluster_by.sql b/macros/utils/cross_db_utils/cluster_by.sql new file mode 100644 index 000000000..aaa59ad06 --- /dev/null +++ b/macros/utils/cross_db_utils/cluster_by.sql @@ -0,0 +1,12 @@ +{% macro get_cluster_by(columns) %} + {% do return(adapter.dispatch("get_cluster_by", "elementary")(columns)) %} +{% endmacro %} + +{%- macro bigquery__get_cluster_by(columns) %} + {% if not elementary.get_config_var("bigquery_disable_clustering") %} + {% do return(columns) %} + {% endif %} + {% do return(none) %} +{% endmacro %} + +{% macro default__get_cluster_by(columns) %} {% do return(none) %} {% endmacro %} diff --git a/macros/utils/cross_db_utils/partition_by.sql b/macros/utils/cross_db_utils/partition_by.sql index 9aa406eed..49b97d569 100644 --- a/macros/utils/cross_db_utils/partition_by.sql +++ b/macros/utils/cross_db_utils/partition_by.sql @@ -1,12 +1,17 @@ +{% macro get_partition_by(column="created_at") %} + {% do return(adapter.dispatch("get_partition_by", "elementary")(column)) %} +{% endmacro %} + +{# Backward-compatible alias so existing user overrides / references keep working. #} {% macro get_default_partition_by() %} - {% do return(adapter.dispatch("get_default_partition_by", "elementary")()) %} + {% do return(elementary.get_partition_by()) %} {% endmacro %} -{%- macro bigquery__get_default_partition_by() %} +{%- macro bigquery__get_partition_by(column) %} {% if not elementary.get_config_var("bigquery_disable_partitioning") %} {% do return( { - "field": "created_at", + "field": column, "data_type": "timestamp", "granularity": "day", } @@ -15,4 +20,4 @@ {% do return(none) %} {% endmacro %} -{% macro default__get_default_partition_by() %} {% do return(none) %} {% endmacro %} +{% macro default__get_partition_by(column) %} {% do return(none) %} {% endmacro %} diff --git a/models/edr/data_monitoring/data_monitoring/data_monitoring_metrics.sql b/models/edr/data_monitoring/data_monitoring/data_monitoring_metrics.sql index 5d9aae2dd..431540784 100644 --- a/models/edr/data_monitoring/data_monitoring/data_monitoring_metrics.sql +++ b/models/edr/data_monitoring/data_monitoring/data_monitoring_metrics.sql @@ -13,6 +13,10 @@ "timestamp_column": "created_at", "prev_timestamp_column": "updated_at", }, + partition_by=elementary.get_partition_by(column="bucket_end"), + cluster_by=elementary.get_cluster_by( + columns=["full_table_name", "metric_name"] + ), table_type=elementary.get_default_table_type(), incremental_strategy=elementary.get_default_incremental_strategy(), ) diff --git a/models/edr/dbt_artifacts/dbt_invocations.sql b/models/edr/dbt_artifacts/dbt_invocations.sql index 64e67c23c..c61798b88 100644 --- a/models/edr/dbt_artifacts/dbt_invocations.sql +++ b/models/edr/dbt_artifacts/dbt_invocations.sql @@ -4,7 +4,7 @@ transient=False, unique_key="invocation_id", on_schema_change="append_new_columns", - partition_by=elementary.get_default_partition_by(), + partition_by=elementary.get_partition_by(), full_refresh=elementary.get_config_var("elementary_full_refresh"), meta={ "timestamp_column": "created_at", diff --git a/models/edr/dbt_artifacts/dbt_run_results.sql b/models/edr/dbt_artifacts/dbt_run_results.sql index 16f076638..6479e0786 100644 --- a/models/edr/dbt_artifacts/dbt_run_results.sql +++ b/models/edr/dbt_artifacts/dbt_run_results.sql @@ -13,7 +13,7 @@ if target.type == "postgres" else [] ), - partition_by=elementary.get_default_partition_by(), + partition_by=elementary.get_partition_by(), full_refresh=elementary.get_config_var("elementary_full_refresh"), meta={ "dedup_by_column": "model_execution_id",