Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions adminapi/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,88 @@ def parse_function_string(args, strict=True): # NOQA C901
parsed_args.append(('literal', args[string_start:]))

return parsed_args


def build_query(query_args):
"""Build a text query from parsed query arguments.

This is the inverse of parse_query(). It takes a dictionary mapping
attribute names to filter values and returns a text query string.

Args:
query_args: A dictionary like {'hostname': BaseFilter('web.*'),
'state': BaseFilter('online')}

Returns:
A text query string like 'hostname=Regexp(web.*) state=online'
"""
if not query_args:
return ''

parts = []
for attr, filter_obj in query_args.items():
value_str = _format_filter_value(filter_obj)
parts.append(f'{attr}={value_str}')

return ' '.join(parts)


def _format_filter_value(filter_obj):
"""Format a filter object as a text query value.

Args:
filter_obj: A BaseFilter instance or subclass

Returns:
A string representation suitable for text queries
"""
if not isinstance(filter_obj, BaseFilter):
# Plain value, format it directly
return _format_literal(filter_obj)

filter_type = type(filter_obj)

# BaseFilter with plain value - no function wrapper needed
if filter_type == BaseFilter:
return _format_literal(filter_obj.value)

# Empty filter - no arguments
if filter_type.__name__ == 'Empty':
return 'Empty()'

# Any/All filters - multiple values
if hasattr(filter_obj, 'values'):
inner_parts = [_format_filter_value(v) for v in filter_obj.values]
return '{}({})'.format(filter_type.__name__, ' '.join(inner_parts))

# Not filter - single nested filter
if filter_type.__name__ == 'Not':
inner = _format_filter_value(filter_obj.value)
return '{}({})'.format(filter_type.__name__, inner)

# Other filters (Regexp, GreaterThan, etc.) - single value
return '{}({})'.format(filter_type.__name__, _format_literal(filter_obj.value))


def _format_literal(value):
"""Format a literal value for text query output.

Args:
value: A Python value (str, int, bool, etc.)

Returns:
A string representation suitable for text queries
"""
if isinstance(value, bool):
return 'true' if value else 'false'

if isinstance(value, str):
# Check if the string needs quoting (contains spaces or special chars)
if ' ' in value or '(' in value or ')' in value or '=' in value:
# Escape backslashes and quotes, then quote the string
escaped = value.replace('\\', '\\\\').replace('"', '\\"')
return '"{}"'.format(escaped)
return value

# For numbers, IP addresses, dates, etc. - use string representation
return str(value)
253 changes: 253 additions & 0 deletions adminapi/tests/test_parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
import unittest

from adminapi.parse import parse_query, build_query
from adminapi.filters import (
BaseFilter, Regexp, Any, All, Not, Empty,
GreaterThan, GreaterThanOrEquals, LessThan, LessThanOrEquals,
Contains, StartsWith,
)


class TestBuildQuery(unittest.TestCase):
"""Tests for the build_query function."""

def test_simple_filter(self):
"""Test building a query with a simple attribute filter."""
query = 'servertype=vm'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'servertype=vm')

def test_multiple_attributes(self):
"""Test building a query with multiple attributes."""
query = 'servertype=vm state=online'
parsed = parse_query(query)
rebuilt = build_query(parsed)
# Order may vary due to dict iteration, check both attributes present
self.assertIn('servertype=vm', rebuilt)
self.assertIn('state=online', rebuilt)

def test_regexp_filter_explicit(self):
"""Test building a query with explicit Regexp filter."""
query = 'hostname=Regexp(web.*)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=Regexp(web.*)')

def test_any_filter(self):
"""Test building a query with Any filter."""
query = 'state=Any(online offline)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'state=Any(online offline)')

def test_all_filter(self):
"""Test building a query with All filter."""
query = 'tags=All(production web)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'tags=All(production web)')

def test_not_filter(self):
"""Test building a query with Not filter."""
query = 'state=Not(offline)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'state=Not(offline)')

def test_empty_filter(self):
"""Test building a query with Empty filter."""
query = 'comment=Empty()'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'comment=Empty()')

def test_numeric_value(self):
"""Test building a query with numeric value."""
query = 'cpu_cores=4'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'cpu_cores=4')

def test_boolean_true(self):
"""Test building a query with boolean true value."""
query = 'active=true'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'active=true')

def test_boolean_false(self):
"""Test building a query with boolean false value."""
query = 'active=false'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'active=false')

def test_hostname_only_plain(self):
"""Test building a query with plain hostname."""
query = 'webserver01'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=webserver01')

def test_hostname_only_with_regex(self):
"""Test building a query with hostname containing regex chars."""
query = 'web.*'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=Regexp(web.*)')

def test_greater_than_filter(self):
"""Test building a query with GreaterThan filter."""
query = 'cpu_cores=GreaterThan(4)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'cpu_cores=GreaterThan(4)')

def test_less_than_filter(self):
"""Test building a query with LessThan filter."""
query = 'memory=LessThan(8192)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'memory=LessThan(8192)')

def test_nested_not_any(self):
"""Test building a query with nested Not(Any(...))."""
query = 'state=Not(Any(offline maintenance))'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'state=Not(Any(offline maintenance))')

def test_nested_any_not(self):
"""Test building a query with nested Any containing Not."""
query = 'state=Any(online Not(offline))'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'state=Any(online Not(offline))')

def test_empty_query(self):
"""Test building an empty query."""
parsed = {}
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, '')

def test_contains_filter(self):
"""Test building a query with Contains filter."""
query = 'hostname=Contains(web)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=Contains(web)')

def test_startswith_filter(self):
"""Test building a query with StartsWith filter."""
query = 'hostname=StartsWith(web)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=StartsWith(web)')


class TestBuildQueryRoundTrip(unittest.TestCase):
"""Tests to verify parse_query and build_query are inverses."""

def assert_filters_equal(self, filter1, filter2):
"""Recursively compare two filter objects for equality."""
self.assertEqual(type(filter1), type(filter2))

if hasattr(filter1, 'values'):
# Any/All filters have multiple values
self.assertEqual(len(filter1.values), len(filter2.values))
for v1, v2 in zip(filter1.values, filter2.values):
self.assert_filters_equal(v1, v2)
elif hasattr(filter1, 'value'):
# Single value filters (BaseFilter, Not, Regexp, etc.)
if isinstance(filter1.value, BaseFilter):
self.assert_filters_equal(filter1.value, filter2.value)
else:
self.assertEqual(filter1.value, filter2.value)

def assert_round_trip(self, query):
"""Assert that parsing and rebuilding produces equivalent result."""
parsed = parse_query(query)
rebuilt = build_query(parsed)
reparsed = parse_query(rebuilt)

# Compare keys
self.assertEqual(set(parsed.keys()), set(reparsed.keys()))

# Compare filter types and values recursively
for key in parsed:
self.assert_filters_equal(parsed[key], reparsed[key])

def test_round_trip_simple(self):
"""Test round-trip for simple query."""
self.assert_round_trip('servertype=vm')

def test_round_trip_any(self):
"""Test round-trip for Any filter."""
self.assert_round_trip('state=Any(online offline)')

def test_round_trip_not(self):
"""Test round-trip for Not filter."""
self.assert_round_trip('state=Not(offline)')

def test_round_trip_nested(self):
"""Test round-trip for nested filters."""
self.assert_round_trip('state=Not(Any(offline maintenance))')

def test_round_trip_numeric(self):
"""Test round-trip for numeric value."""
self.assert_round_trip('cpu_cores=4')

def test_round_trip_boolean(self):
"""Test round-trip for boolean value."""
self.assert_round_trip('active=true')


class TestBuildQueryDirectConstruction(unittest.TestCase):
"""Tests for build_query with directly constructed filter objects."""

def test_direct_base_filter(self):
"""Test building query from directly constructed BaseFilter."""
query_args = {'servertype': BaseFilter('vm')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'servertype=vm')

def test_direct_regexp(self):
"""Test building query from directly constructed Regexp."""
query_args = {'hostname': Regexp('web.*')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'hostname=Regexp(web.*)')

def test_direct_any(self):
"""Test building query from directly constructed Any."""
query_args = {'state': Any('online', 'offline')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'state=Any(online offline)')

def test_direct_not(self):
"""Test building query from directly constructed Not."""
query_args = {'state': Not('offline')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'state=Not(offline)')

def test_direct_empty(self):
"""Test building query from directly constructed Empty."""
query_args = {'comment': Empty()}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'comment=Empty()')

def test_direct_nested(self):
"""Test building query from nested filter construction."""
query_args = {'state': Not(Any('offline', 'maintenance'))}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'state=Not(Any(offline maintenance))')

def test_direct_all(self):
"""Test building query from directly constructed All."""
query_args = {'tags': All('production', 'web')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'tags=All(production web)')


if __name__ == '__main__':
unittest.main()
15 changes: 13 additions & 2 deletions serveradmin/serverdb/query_executer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
"""Serveradmin - Query Executer

Copyright (c) 2019 InnoGames GmbH
Copyright (c) 2025 InnoGames GmbH
"""

import logging
import time

from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import DataError, connection, transaction

from adminapi.filters import Any
from adminapi.parse import build_query
from serveradmin.serverdb.models import Attribute, ServertypeAttribute, Server
from serveradmin.serverdb.sql_generator import get_server_query
from serveradmin.serverdb.query_materializer import QueryMaterializer
Expand All @@ -15,6 +19,8 @@
def execute_query(filters, restrict, order_by):
"""The main function to execute queries"""

start = time.time()

# We need the restrict argument in slightly different structure.
if restrict is None:
joins = None
Expand Down Expand Up @@ -98,7 +104,12 @@ def cast(join):
# materializer module for its details. The functions on this module
# continues with the filtering step.
servers = _get_servers(filters, attribute_lookup, related_vias)
return list(QueryMaterializer(servers, *materializer_args))
result = list(QueryMaterializer(servers, *materializer_args))

duration = time.time() - start
logging.getLogger('queries').debug(f"{build_query(filters)};{duration:.3f}s")

return result


def _get_joins(restrict):
Expand Down
Loading