Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions macros/edr/dbt_artifacts/upload_run_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,25 @@
{% endmacro %}


{% macro normalize_artifact_timestamp_precision(timestamp_value) %}
{% if target.type != "bigquery" %}
{{ return(timestamp_value) }}
{% endif %}

{% if timestamp_value is string and timestamp_value.endswith("Z") and "." in timestamp_value %}
{% set ts_no_z = timestamp_value[:-1] %}
{% set ts_parts = ts_no_z.split(".") %}
{% if ts_parts | length == 2 %}
{% set fractional_part = ts_parts[1] %}
{% if fractional_part | length > 6 %}
{{ return(ts_parts[0] ~ "." ~ fractional_part[:6] ~ "Z") }}
{% endif %}
{% endif %}
{% endif %}
{{ return(timestamp_value) }}
{% endmacro %}


{% macro get_dbt_run_results_empty_table_query() %}
{% set dbt_run_results_empty_table_query = elementary.empty_table(
[
Expand Down Expand Up @@ -102,15 +121,23 @@
{% if timing.get("name") == "execute" %}
{% do flatten_run_result_dict.update(
{
"execute_started_at": timing.get("started_at"),
"execute_completed_at": timing.get("completed_at"),
"execute_started_at": elementary.normalize_artifact_timestamp_precision(
timing.get("started_at")
),
"execute_completed_at": elementary.normalize_artifact_timestamp_precision(
timing.get("completed_at")
),
}
) %}
{% elif timing.get("name") == "compile" %}
{% do flatten_run_result_dict.update(
{
"compile_started_at": timing.get("started_at"),
"compile_completed_at": timing.get("completed_at"),
"compile_started_at": elementary.normalize_artifact_timestamp_precision(
timing.get("started_at")
),
"compile_completed_at": elementary.normalize_artifact_timestamp_precision(
timing.get("completed_at")
),
}
) %}
{% endif %}
Expand Down
16 changes: 12 additions & 4 deletions macros/edr/dbt_artifacts/upload_source_freshness.sql
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,18 @@
"filter": criteria_dict.get("filter"),
"generated_at": elementary.datetime_now_utc_as_string(),
"invocation_id": source_freshness_invocation_id,
"compile_started_at": compile_timing.get("started_at"),
"compile_completed_at": compile_timing.get("completed_at"),
"execute_started_at": execute_timing.get("started_at"),
"execute_completed_at": execute_timing.get("completed_at"),
"compile_started_at": elementary.normalize_artifact_timestamp_precision(
compile_timing.get("started_at")
),
"compile_completed_at": elementary.normalize_artifact_timestamp_precision(
compile_timing.get("completed_at")
),
"execute_started_at": elementary.normalize_artifact_timestamp_precision(
execute_timing.get("started_at")
),
"execute_completed_at": elementary.normalize_artifact_timestamp_precision(
execute_timing.get("completed_at")
),
} %}
{{ return(flatten_source_freshness_dict) }}
{% endmacro %}
24 changes: 20 additions & 4 deletions macros/edr/tests/on_run_start/create_elementary_tests_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,25 @@
{% set tests_schema_name = elementary.get_elementary_tests_schema(
database_name, schema_name
) %}
{%- if tests_schema_name != schema_name and not adapter.check_schema_exists(
database_name, tests_schema_name
) %}
{% if target.type == "bigquery" %}
{% set schema_exists_sql %}
select count(*) as schema_count
from `{{ database_name }}`.INFORMATION_SCHEMA.SCHEMATA
where upper(schema_name) = upper('{{ tests_schema_name }}')
{% endset %}
{% set schema_exists_result = elementary.run_query(schema_exists_sql) %}
{% set schema_count_rows = [] %}
{% if schema_exists_result is not none %}
{% set schema_count_rows = elementary.agate_to_dicts(schema_exists_result) %}
{% endif %}
{% set schema_exists = (
schema_count_rows | length > 0
and schema_count_rows[0]["schema_count"] | int > 0
) %}
{% else %}
{% set schema_exists = adapter.check_schema_exists(database_name, tests_schema_name) %}
{% endif %}
{%- if tests_schema_name != schema_name and not schema_exists %}
{{ elementary.edr_log("Creating Elementary's tests schema.") }}
{% set schema_relation = api.Relation.create(
database=database_name, schema=tests_schema_name
Expand All @@ -18,4 +34,4 @@
{%- endif %}
{% endif %}
{{ return("") }}
{% endmacro %}
{% endmacro %}
26 changes: 22 additions & 4 deletions macros/edr/tests/test_utils/get_elementary_tests_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,32 @@
{% set legacy_tests_schema_name = (
elementary_schema ~ LEGACY_TESTS_SCHEMA_SUFFIX
) %}
{% if adapter.check_schema_exists(
elementary_database, legacy_tests_schema_name
) %}
{% if target.type == "bigquery" %}
{% set legacy_schema_exists_sql %}
select count(*) as schema_count
from `{{ elementary_database }}`.INFORMATION_SCHEMA.SCHEMATA
where upper(schema_name) = upper('{{ legacy_tests_schema_name }}')
{% endset %}
{% set legacy_schema_exists_result = elementary.run_query(legacy_schema_exists_sql) %}
{% set legacy_schema_count_rows = [] %}
{% if legacy_schema_exists_result is not none %}
{% set legacy_schema_count_rows = elementary.agate_to_dicts(legacy_schema_exists_result) %}
{% endif %}
{% set legacy_schema_exists = (
legacy_schema_count_rows | length > 0
and legacy_schema_count_rows[0]["schema_count"] | int > 0
) %}
{% else %}
{% set legacy_schema_exists = adapter.check_schema_exists(
elementary_database, legacy_tests_schema_name
) %}
{% endif %}
{% if legacy_schema_exists %}
{% set tests_schema_name = legacy_tests_schema_name %}
{% endif %}
{% endif %}

{% do elementary.set_cache("tests_schema_name", tests_schema_name) %}

{{ return(tests_schema_name) }}
{% endmacro %}
{% endmacro %}
Loading