Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions snowcap/blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ def process_commands(commands, roles, available_roles):
session_ctx = data_provider.fetch_session(session)
_raise_if_plan_would_drop_session_user(session_ctx, plan)

sql_commands_per_change = compile_plan_to_sql(session_ctx, plan)
sql_commands_per_change, available_roles = compile_plan_to_sql(session_ctx, plan)
roles_list: list[Any] = []
additive_commands = []
destructive_commands = []
Expand All @@ -1763,11 +1763,11 @@ def process_commands(commands, roles, available_roles):
# Suppress SQL execution logs during apply (plan details already shown above)
logging.getLogger("snowcap").setLevel(logging.WARNING)

# Process additive changes
process_commands(additive_commands, roles_set, session_ctx["available_roles"])
# Process additive changes (use available_roles which includes roles being created)
process_commands(additive_commands, roles_set, available_roles)

# Process destructive changes
process_commands(destructive_commands, roles_set, session_ctx["available_roles"])
process_commands(destructive_commands, roles_set, available_roles)

# Restore logging level
logging.getLogger("snowcap").setLevel(logging.INFO)
Expand Down Expand Up @@ -1998,8 +1998,15 @@ def sql_commands_for_change(
return execution_role, [cmd for cmd in all_cmds if cmd is not None]


def compile_plan_to_sql(session_ctx: SessionContext, plan: Plan) -> list[dict]:
"""Compile the plan into a list of SQL command lists, one per change."""
def compile_plan_to_sql(
session_ctx: SessionContext, plan: Plan
) -> tuple[list[dict], list[ResourceName]]:
"""Compile the plan into a list of SQL command lists, one per change.

Returns:
A tuple of (sql_commands_per_change, available_roles) where available_roles
includes any roles being created in this plan.
"""
sql_commands_per_change = []
available_roles = session_ctx["available_roles"].copy()
default_role = session_ctx["role"]
Expand All @@ -2013,7 +2020,7 @@ def compile_plan_to_sql(session_ctx: SessionContext, plan: Plan) -> list[dict]:
for change in plan:
role, commands = sql_commands_for_change(change, available_roles, default_role)
sql_commands_per_change.append({"role": role, "commands": commands, "change": change})
return sql_commands_per_change
return sql_commands_per_change, available_roles


def compute_levels(resource_set: Set[URN], references: Set[tuple[URN, URN]]) -> dict[URN, int]:
Expand Down Expand Up @@ -2143,9 +2150,12 @@ def _diff_resource_data(lhs: dict, rhs: dict) -> dict:
for field_name in lhs.keys():
lhs_value = lhs[field_name]
rhs_value = rhs[field_name]
# Skip fields where manifest value is None - means "use Snowflake default/inherit"
if rhs_value is None:
# Skip fields where manifest value is None or empty string - means "use Snowflake default/inherit"
if rhs_value is None or rhs_value == "":
continue
# Normalize empty strings to None for comparison (Snowflake returns None for unset fields)
if lhs_value == "":
lhs_value = None
if lhs_value != rhs_value:
delta[field_name] = rhs_value
return delta
Expand Down
22 changes: 7 additions & 15 deletions snowcap/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def execute(
session_header = f"[{session.user}:{session.role}] > {sql_text}"

# Thread-safe cache check with pending query deduplication
cache_key: tuple[str, str] | None = None
if cacheable:
cache_key = (session.role, sql_text)
with _EXECUTION_CACHE_LOCK:
Expand Down Expand Up @@ -132,40 +133,31 @@ def execute(
# Log execution details
logger.info(f"{session_header} \033[94m({len(result)} rows, {runtime:.2f}s)\033[0m")
if cacheable:
cache_key = (session.role, sql_text)
with _EXECUTION_CACHE_LOCK:
if session.role not in _EXECUTION_CACHE:
_EXECUTION_CACHE[session.role] = {}
_EXECUTION_CACHE[session.role][sql_text] = result
# Signal waiting threads and cleanup
if cache_key in _PENDING_QUERIES:
_PENDING_QUERIES[cache_key].set()
del _PENDING_QUERIES[cache_key]
return result
except ProgrammingError as err:
if empty_response_codes and err.errno in empty_response_codes:
runtime = time.time() - start
logger.info(f"{session_header} \033[94m(empty, {runtime:.2f}s)\033[0m")
if cacheable:
cache_key = (session.role, sql_text)
with _EXECUTION_CACHE_LOCK:
if session.role not in _EXECUTION_CACHE:
_EXECUTION_CACHE[session.role] = {}
_EXECUTION_CACHE[session.role][sql_text] = []
# Signal waiting threads and cleanup
if cache_key in _PENDING_QUERIES:
_PENDING_QUERIES[cache_key].set()
del _PENDING_QUERIES[cache_key]
return []
# On error, also cleanup pending query marker
if cacheable:
cache_key = (session.role, sql_text)
logger.error(f"{session_header} \033[31m(err {err.errno}, {time.time() - start:.2f}s)\033[0m")
raise ProgrammingError(f"{err} on {sql_text}", errno=err.errno) from err
finally:
# Always signal waiting threads and cleanup pending query marker
# This must happen regardless of success, failure, or exception type
if cache_key is not None:
with _EXECUTION_CACHE_LOCK:
if cache_key in _PENDING_QUERIES:
_PENDING_QUERIES[cache_key].set()
del _PENDING_QUERIES[cache_key]
logger.error(f"{session_header} \033[31m(err {err.errno}, {time.time() - start:.2f}s)\033[0m")
raise ProgrammingError(f"{err} on {sql_text}", errno=err.errno) from err


def execute_in_parallel(
Expand Down
3 changes: 2 additions & 1 deletion snowcap/resources/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def __post_init__(self):
if self.login_name is None or self.login_name == "":
self.login_name = ResourceName(str(self.name).upper())
if self.display_name is None:
self.display_name = self.name._name
# Snowflake stores display_name as uppercase for unquoted identifiers
self.display_name = self.name._name.upper()
if self.must_change_password is None:
self.must_change_password = False

Expand Down
14 changes: 12 additions & 2 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,18 @@ def safe_fetch(cursor, urn):
return data


def flatten_sql_commands(sql_commands: list[dict]) -> list[str]:
"""Flatten compile_plan_to_sql output to a list of SQL strings for testing."""
def flatten_sql_commands(sql_commands_result) -> list[str]:
"""Flatten compile_plan_to_sql output to a list of SQL strings for testing.

Args:
sql_commands_result: Either a list[dict] (legacy) or a tuple of (list[dict], available_roles)
"""
# Handle both old and new return formats from compile_plan_to_sql
if isinstance(sql_commands_result, tuple):
sql_commands = sql_commands_result[0]
else:
sql_commands = sql_commands_result

result = ["USE SECONDARY ROLES ALL"]
last_role = None
for cmd in sql_commands:
Expand Down
14 changes: 12 additions & 2 deletions tests/test_blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,18 @@ def find_change_by_urn(plan, urn):
return None


def flatten_sql_commands(sql_commands: list[dict]) -> list[str]:
"""Flatten compile_plan_to_sql output to a list of SQL strings for testing."""
def flatten_sql_commands(sql_commands_result) -> list[str]:
"""Flatten compile_plan_to_sql output to a list of SQL strings for testing.

Args:
sql_commands_result: Either a list[dict] (legacy) or a tuple of (list[dict], available_roles)
"""
# Handle both old and new return formats from compile_plan_to_sql
if isinstance(sql_commands_result, tuple):
sql_commands = sql_commands_result[0]
else:
sql_commands = sql_commands_result

result = ["USE SECONDARY ROLES ALL"]
last_role = None
for cmd in sql_commands:
Expand Down
14 changes: 12 additions & 2 deletions tests/test_blueprint_ownership.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,18 @@
from snowcap.resource_name import ResourceName


def flatten_sql_commands(sql_commands: list[dict]) -> list[str]:
"""Flatten compile_plan_to_sql output to a list of SQL strings for testing."""
def flatten_sql_commands(sql_commands_result) -> list[str]:
"""Flatten compile_plan_to_sql output to a list of SQL strings for testing.

Args:
sql_commands_result: Either a list[dict] (legacy) or a tuple of (list[dict], available_roles)
"""
# Handle both old and new return formats from compile_plan_to_sql
if isinstance(sql_commands_result, tuple):
sql_commands = sql_commands_result[0]
else:
sql_commands = sql_commands_result

result = ["USE SECONDARY ROLES ALL"]
last_role = None
for cmd in sql_commands:
Expand Down
Loading