diff --git a/README.md b/README.md index 93a1d26..8526f6d 100644 --- a/README.md +++ b/README.md @@ -326,14 +326,16 @@ tcs-garr --environment stg init -h, --help show this help message and exit --id ID ID of the certificate to download. --output-filename OUTPUT_FILENAME - Optional filename to save the certificate inside default output_folder. - --force, -f Force overwrite if the output file already exists. + Save the certificate inside default output_folder using a custom filename. + --save Save the certificate to a file inside default output_folder using a + filename automatically generated from Common Name and download type. + --force, -f Force overwrite if the output file already exists. --download-type {pemBundle,certificate} - Type of download: 'pemBundle' or 'certificate'. Default is 'pemBundle'. + Type of download: 'pemBundle' or 'certificate'. Default is 'pemBundle'. ``` - Replace `ID` with the ID of the certificate you wish to download. You can use - `pemBundle` or `certificate` as arguments for specific download formats. + Replace `ID` with the ID of the certificate you wish to download. For default the certificate is dumped + to standard output. You can use `pemBundle` or `certificate` as arguments for specific download formats. The `download` command allows you to download certificates requested via API or ACME. @@ -568,16 +570,17 @@ docker build -t tcs-garr:latest . ### Environment variables -| Name | Description | Default Value | -| -------------------- | ---------------------------------- | --------------------- | -| HARICA_USERNAME | Username for HARICA authentication | None | -| HARICA_PASSWORD | Password for HARICA authentication | None | -| HARICA_TOTP_SEED | TOTP seed for two-factor auth | None | -| HARICA_OUTPUT_FOLDER | Directory for output files | ~/harica_certificates | -| HARICA_HTTP_PROXY | HTTP Proxy | None | -| HARICA_HTTPS_PROXY | HTTPS Proxy | None | -| WEBHOOK_URL | Webhook URL | None | -| WEBHOOK_TYPE | Webhook Type | Slack | +| Name | Description | Default Value | +|------------------------|------------------------------------------------------|-------------------------| +| HARICA_USERNAME | Username for HARICA authentication | None | +| HARICA_PASSWORD | Password for HARICA authentication | None | +| HARICA_TOTP_SEED | TOTP seed for two-factor auth | None | +| HARICA_OUTPUT_FOLDER | Directory for output files | `~/harica_certificates` | +| HARICA_OUTPUT_TEMPLATE | String template to map the Common Name to a filename | None | +| HARICA_HTTP_PROXY | HTTP Proxy | None | +| HARICA_HTTPS_PROXY | HTTPS Proxy | None | +| WEBHOOK_URL | Webhook URL | None | +| WEBHOOK_TYPE | Webhook Type | Slack | Info about [webhook](https://github.com/ConsortiumGARR/tcs-garr?tab=readme-ov-file#webhook) @@ -601,6 +604,7 @@ docker run --name tcs-garr \ -e HARICA_PASSWORD=${HARICA_PASSWORD} \ -e HARICA_TOTP_SEED=${HARICA_TOTP_SEED} \ -e HARICA_OUTPUT_FOLDER=${HARICA_OUTPUT_FOLDER} \ + -e HARICA_FILENAME_TEMPLATE=${HARICA_FILENAME_TEMPLATE} \ -e HARICA_HTTP_PROXY=${HARICA_HTTP_PROXY} \ -e HARICA_HTTPS_PROXY=${HARICA_HTTPS_PROXY} \ -e HARICA_WEBHOOK_URL=${HARICA_WEBHOOK_URL} \ diff --git a/tcs_garr/commands/base.py b/tcs_garr/commands/base.py index db873f8..1028e84 100644 --- a/tcs_garr/commands/base.py +++ b/tcs_garr/commands/base.py @@ -1,12 +1,13 @@ from abc import ABC, abstractmethod from functools import wraps +from pathlib import Path from colorama import Fore, Style from tcs_garr.harica_client import HaricaClient from tcs_garr.logger import setup_logger from tcs_garr.notifications import NotificationManager -from tcs_garr.utils import HaricaClientConfig +from tcs_garr.utils import HaricaClientConfig, OutputTemplate def requires_any_role(*roles): @@ -65,7 +66,7 @@ def wrapper(self, *args, **kwargs): # Ensure we have a client with valid authentication client = self.harica_client - # Check if the user has all of the required roles (AND logic) + # Check if the user has all the required roles (AND logic) has_all_required_roles = all(client.has_role(role) for role in roles) if not has_all_required_roles: @@ -180,14 +181,8 @@ def check_required_role(self, client: HaricaClient): exit(1) @abstractmethod - def execute(self, args): - """ - Execute the command with the parsed arguments. - - Args: - args: The parsed command arguments - """ - pass + def execute(self): + """Execute the command with the parsed arguments.""" def call_webhook(self, cert_type, cn, cert_id=None): webhook_url = self._harica_config.webhook_url @@ -210,3 +205,49 @@ def call_webhook(self, cert_type, cn, cert_id=None): except Exception as e: self.logger.error(f"Error sending webhook via NotificationManager: {e}") + + def get_output_folder(self): + """ + Retrieve the default output folder from the configuration. + + Returns: + str: The output folder path from the configuration. + """ + # Load environment-specific configuration + return self.harica_config.output_folder + + def get_output_filepath(self, filename, output_folder: str | None = None, **kwargs: str) -> Path: + """ + Return an absolute filepath for the provided filename, joining it with the output_folder. + If output_template option is configured and the provided argument starts with a FQDN, the + filename is substituted with a filename/filepath determined by configured template. In all + cases the resulting path must be a subfolder of output_folder. + + Args: + filename: the output filename. + output_folder: the output folder path, if not provided use the configured output_folder. + kwargs: additional keyword arguments to provide in case of template substitution. + Returns: + An `pathlib.Path` object that represents an absolute filepath. + """ + if not isinstance(filename, str) or not (filename := filename.strip()): + raise TypeError("1st argument must be a not empty string") + + base_path = Path(output_folder or self.get_output_folder()) + if not base_path.is_absolute(): + raise ValueError("Configuration error: output_folder must be an absolute path") + + if len(Path(filename).parts) == 1 and (output_template := self.harica_config.output_template): + try: + template = OutputTemplate(output_template) + except KeyError as err: + msg = "Your configuration for 'output_template' is invalid: {}" + raise ValueError(msg.format(err)) + else: + filepath = base_path.joinpath(template.get_filepath(filename, **kwargs)) + else: + filepath = base_path.joinpath(filename) + + if not filepath.is_relative_to(base_path): + raise ValueError(f"File path {filepath} is not relative to {base_path}") + return filepath diff --git a/tcs_garr/commands/download.py b/tcs_garr/commands/download.py index 31cbadc..60364f1 100644 --- a/tcs_garr/commands/download.py +++ b/tcs_garr/commands/download.py @@ -1,6 +1,7 @@ import base64 import importlib.resources as pkg_resources import os +import pathlib import warnings from cryptography import x509 @@ -41,11 +42,18 @@ def configure_parser(self, parser): # Add the argument to specify the certificate ID to download parser.add_argument("--id", required=True, help="ID of the certificate to download.") + # Option for saving the certificate to the filesystem using a default filename + parser.add_argument( + "--save", + action="store_true", + default=False, + help="Save the certificate inside configured output_folder using a default filename.", + ) # Optional output filename for saving the certificate parser.add_argument( "--output-filename", default=None, - help="Optional filename to save the certificate inside the default output folder.", + help="Save the certificate inside configured output_folder using a custom filename.", ) # Add force flag to allow overwriting the file parser.add_argument("--force", "-f", action="store_true", help="Force overwrite if the output file already exists.") @@ -58,19 +66,6 @@ def configure_parser(self, parser): help="Type of download: 'pemBundle' or 'certificate'. Default is 'pemBundle'.", ) - def get_output_folder(self): - """ - Retrieve the default output folder from the configuration. - - Args: - args (argparse.Namespace): The command-line arguments passed to the command. - - Returns: - str: The output folder path from the configuration. - """ - # Load environment-specific configuration - return self.harica_config.output_folder - def get_trusted_intermediates(self): """ Load all trusted intermediate certificates from the 'certs' folder. @@ -217,48 +212,65 @@ def execute(self): p7b_data = base64.b64decode(p7b_base64) # Load and extract the certificates from the PKCS7 data - if p7b_data: - pkcs7_cert = pkcs7.load_der_pkcs7_certificates(p7b_data) - if pkcs7_cert: - certificates = pkcs7_cert + if p7b_data and (pkcs7_cert := pkcs7.load_der_pkcs7_certificates(p7b_data)): + certificates = pkcs7_cert - # Load trusted intermediates - trusted_intermediates = self.get_trusted_intermediates() + # Load trusted intermediates + trusted_intermediates = self.get_trusted_intermediates() - # Complete the certificate chain with trusted intermediates - complete_chain = self.complete_chain(certificates, trusted_intermediates) + # Complete the certificate chain with trusted intermediates + complete_chain = self.complete_chain(certificates, trusted_intermediates) - # Convert certificates to PEM format and join them into a single string - data_to_write = "".join( - cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") for cert in complete_chain - ) + # Convert certificates to PEM format and join them into a single string + data_to_write = "".join( + cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") for cert in complete_chain + ) - # Optionally inspect the certificate chain - if not self.inspect_certificate_chain(complete_chain, trusted_intermediates): - self.logger.error("Certificate chain is not complete or valid.") - return + # Optionally inspect the certificate chain + if not self.inspect_certificate_chain(complete_chain, trusted_intermediates): + self.logger.error("Certificate chain is not complete or valid.") + return - if data_to_write: - # Determine the output folder from the config - output_folder = self.get_output_folder() + if not data_to_write: + # Handle case where no data is found for the given certificate ID + self.logger.error(f"No data found for certificate ID {self.args.id}.") - # If the output folder and filename are provided, save the certificate to a file - if output_folder and self.args.output_filename: - output_path = os.path.join(output_folder, self.args.output_filename) + elif not self.args.save and not self.args.output_filename: + # No options for save to file: print the certificate data + print(data_to_write) + else: + # Save the certificate file under the configured output_folder + output_folder = self.get_output_folder() - # Check if the file already exists, and handle the force flag for overwriting - if os.path.exists(output_path) and not self.args.force: - self.logger.error(f"File {output_path} already exists. Use --force to overwrite.") + # Save with a custom output filename + if self.args.output_filename: + output_filepath = pathlib.Path(output_folder).joinpath(self.args.output_filename) + else: + cn = certificate["dN"].strip().partition("CN=")[-1].strip() + for chunk in cn.split("."): + if not chunk.replace("-", "").isalnum(): + self.logger.error(f"Certificate Common Name ({chunk}) is not valid.") + exit(1) + + if self.args.download_type == "pemBundle": + filename = cn + "_fullchain.pem" else: - # Write the certificate data to the file (binary or text based on data type) - with open(output_path, "wb" if isinstance(data_to_write, bytes) else "w") as cert_file: - cert_file.write(data_to_write) - self.logger.info(f"Certificate saved to {output_path}") + filename = cn + ".pem" + + output_filepath = self.get_output_filepath(filename, base_path=output_folder) + + # Check if the file already exists, and handle the force flag for overwriting + if output_filepath.is_dir(): + self.logger.error(f"File {output_filepath} is a directory!") + elif output_filepath.exists() and not self.args.force: + self.logger.error(f"File {output_filepath} already exists. Use --force to overwrite.") else: - # If no filename is provided, print the certificate data - print(data_to_write) - else: - # Handle case where no data is found for the given certificate ID - self.logger.error(f"No data found for certificate ID {self.args.id}.") + output_filepath.parent.mkdir(parents=True, exist_ok=True) + + # Write the certificate data to the file (binary or text based on data type) + with output_filepath.open("wb" if isinstance(data_to_write, bytes) else "w") as cert_file: + cert_file.write(data_to_write) + self.logger.info(f"Certificate saved to {output_filepath}") + except CertificateNotApprovedException: self.logger.error(f"Certificate with id {self.args.id} has not been approved yet. Retry later.") diff --git a/tcs_garr/commands/init.py b/tcs_garr/commands/init.py index 4a4a6b6..a3524da 100644 --- a/tcs_garr/commands/init.py +++ b/tcs_garr/commands/init.py @@ -6,6 +6,7 @@ import tcs_garr.settings as settings from tcs_garr.commands.base import BaseCommand +from tcs_garr.utils import OutputTemplate class InitCommand(BaseCommand): @@ -84,6 +85,7 @@ def _create_config_file(self, environment="production", force=False): "password": "", "totp_seed": "", "output_folder": settings.OUTPUT_PATH, + "output_template": "", "http_proxy": "", "https_proxy": "", "webhook_url": "", @@ -158,6 +160,25 @@ def _create_config_file(self, environment="production", force=False): # Expand in case input was a relative path output_folder = os.path.abspath(os.path.expanduser(output_folder)) + # Prompt for optional output filename setting + while True: + output_template_prompt = f"{Fore.GREEN}🗎 Enter output_template (optional)" + if force and has_existing_config and existing_values["output_template"]: + output_template_prompt += f" [{existing_values['output_template']}]" + output_template = input(f"{output_template_prompt}: {Style.RESET_ALL}") or ( + existing_values["output_template"] if force and has_existing_config else "" + ) + + if output_template: + try: + OutputTemplate(output_template) + except ValueError: + retry_prompt = f"{Fore.YELLOW} Provided output_template is invalid! Retry? (Y/n)" + retry_option = input(f"{retry_prompt}: {Style.RESET_ALL}") + if retry_option.lower() == "y": + continue + break + # Prompt for optional proxy settings with existing values in brackets if force with existing config http_proxy_prompt = f"{Fore.GREEN}🌐 Enter HTTP proxy (optional)" if force and has_existing_config and existing_values["http_proxy"]: @@ -206,7 +227,9 @@ def _create_config_file(self, environment="production", force=False): "output_folder": output_folder, } - # Add proxy settings only if they were provided + # Add optional settings only if they were provided + if output_template: + config[section_name]["output_template"] = output_template if http_proxy: config[section_name]["http_proxy"] = http_proxy if https_proxy: diff --git a/tcs_garr/commands/k8s.py b/tcs_garr/commands/k8s.py index 4d29609..595f7fe 100644 --- a/tcs_garr/commands/k8s.py +++ b/tcs_garr/commands/k8s.py @@ -52,16 +52,6 @@ def configure_parser(self, parser): help="Name for the yaml file without the extension (optional).", ) - def get_output_folder(self): - """ - Retrieve the default output folder from the configuration. - - Returns: - str: The output folder path from the configuration. - """ - # Load environment-specific configuration to get the output folder - return self.harica_config.output_folder - def execute(self): """ Executes the command to generate a Kubernetes TLS secret YAML file. diff --git a/tcs_garr/commands/request.py b/tcs_garr/commands/request.py index f57bc1f..b1392f1 100644 --- a/tcs_garr/commands/request.py +++ b/tcs_garr/commands/request.py @@ -78,19 +78,6 @@ def configure_parser(self, parser): create_group.add_argument("--cn", help="Common name of the certificate.") self.parser.add_argument("--alt_names", default="", help="Comma-separated alternative names (only used with --cn).") - def get_output_folder(self): - """ - Retrieve the default output folder from the configuration. - - Args: - args (argparse.Namespace): The command-line arguments passed to the command. - - Returns: - str: The output folder path from the configuration. - """ - # Load environment-specific configuration - return self.harica_config.output_folder - def execute(self): """ Executes the command to generate a CSR or request a certificate. @@ -142,11 +129,11 @@ def __generate_key_csr(self, cn, alt_names, output_folder): if not organizations: self.logger.error("No available organization for this domain list") - return + exit(1) if len(organizations) > 1: self.logger.error("Multiple orgs possible but no selection made (use -O org)") - return + exit(1) organization = organizations[0] @@ -158,9 +145,9 @@ def __generate_key_csr(self, cn, alt_names, output_folder): key = rsa.generate_private_key(public_exponent=65537, key_size=4096, backend=default_backend()) # Write the private key to disk - os.makedirs(output_folder, exist_ok=True) - key_path = os.path.join(output_folder, f"{cn}.key") - with open(key_path, "wb") as f: + key_path = self.get_output_filepath(f"{cn}.key", output_folder) + key_path.parent.mkdir(parents=True, exist_ok=True) + with key_path.open("wb") as f: f.write( key.private_bytes( encoding=serialization.Encoding.PEM, @@ -169,7 +156,7 @@ def __generate_key_csr(self, cn, alt_names, output_folder): ) ) self.logger.info(f"{Fore.BLUE}Private key created in {key_path}{Style.RESET_ALL}") - os.chmod(key_path, 0o600) + key_path.chmod(0o600) # Prepare Subject Alternative Names subject_alt_names = [cn] @@ -196,12 +183,12 @@ def __generate_key_csr(self, cn, alt_names, output_folder): ) # Write the CSR to disk - csr_path = os.path.join(output_folder, f"{cn}.csr") - with open(csr_path, "wb") as f: + csr_path = self.get_output_filepath(f"{cn}.csr", output_folder) + with csr_path.open("wb") as f: f.write(csr.public_bytes(serialization.Encoding.PEM)) self.logger.info(f"{Fore.BLUE}CSR created in {csr_path}{Style.RESET_ALL}") - return csr_path + return str(csr_path) def __issue_certificate(self, csr_file, profile): """ diff --git a/tcs_garr/commands/smime.py b/tcs_garr/commands/smime.py index d01efcb..294c298 100644 --- a/tcs_garr/commands/smime.py +++ b/tcs_garr/commands/smime.py @@ -102,19 +102,6 @@ def configure_parser(self, parser): help="Type of download: 'pemBundle' or 'certificate'. Default is 'pemBundle'.", ) - def get_output_folder(self): - """ - Retrieve the default output folder from the configuration. - - Args: - args (argparse.Namespace): The command-line arguments passed to the command. - - Returns: - str: The output folder path from the configuration. - """ - # Load environment-specific configuration - return self.harica_config.output_folder - def execute(self): """ Executes the command to generate a CSR or request a certificate and download it. diff --git a/tcs_garr/harica_client.py b/tcs_garr/harica_client.py index 7e7db75..e784c6a 100644 --- a/tcs_garr/harica_client.py +++ b/tcs_garr/harica_client.py @@ -27,7 +27,7 @@ class HaricaClient: certificate management, and domain validation. It uses JWT tokens for authentication and automatically refreshes the token if needed. - Attributes: + Arguments: email (str): The email of the user for login. password (str): The password of the user for login. totp_seed (str): Optional TOTP seed for 2FA. @@ -807,12 +807,14 @@ def request_single_smime_bulk_certificate(self, emails, gn, sn, csr, transaction if len(organizations) > 1: raise ValueError("Multiple organizations found.'") - organization = organizations[0].get('id') + organization = organizations[0].get("id") csvio = io.StringIO() writer = csv.writer(csvio) - writer.writerow(["FriendlyName","Email","Email2","Email3","GivenName","Surname","PickupPassword","CertType","CSR"]) # header - writer.writerow([email[0], email[0], email[1], email[2], gn, sn, "", cert_type, csr]) # cert data + writer.writerow( + ["FriendlyName", "Email", "Email2", "Email3", "GivenName", "Surname", "PickupPassword", "CertType", "CSR"] + ) # header + writer.writerow([email[0], email[0], email[1], email[2], gn, sn, "", cert_type, csr]) # cert data # Note: the example CSV supplied by Harica adds a trailing comma after the CSR (but not on the header line). seems to work without just fine. csvdata = csvio.getvalue() @@ -825,7 +827,7 @@ def request_single_smime_bulk_certificate(self, emails, gn, sn, csr, transaction data = self.__make_post_request( "/api/OrganizationAdmin/CreateBulkCertificatesSMIME", data=payload, content_type="multipart/form-data" ) - if data.history: # empty if not redirected + if data.history: # empty if not redirected # if user is not authorized, we get redirected to login page (status codes 308->200) raise PermissionError("User is not authorized, must be an admin.") zipped_certificates = data.content @@ -834,7 +836,7 @@ def request_single_smime_bulk_certificate(self, emails, gn, sn, csr, transaction try: zipio = io.BytesIO(zipped_certificates) zipf = zipfile.ZipFile(zipio, "r") - p7b_data = next(zipf.read(name) for name in zipf.namelist()) # only contains 1 file, named "1..p7b" + p7b_data = next(zipf.read(name) for name in zipf.namelist()) # only contains 1 file, named "1..p7b" except Exception as e: raise ValueError("could not extract certificate from response") diff --git a/tcs_garr/main.py b/tcs_garr/main.py index 070431a..bd65fe1 100755 --- a/tcs_garr/main.py +++ b/tcs_garr/main.py @@ -35,11 +35,14 @@ def discover_commands(args): return command_classes -def main(): +def main(argv=None): """ Main function to handle command line arguments and initiate the certificate issuance or listing process. + + Args: + argv (list): Command line arguments (except the command name), if not specified will use sys.argv. """ - parser = argparse.ArgumentParser(description="Harica Certificate Manager") + parser = argparse.ArgumentParser(prog="tcs-garr", description="Harica Certificate Manager") parser.add_argument("--debug", action="store_true", default=False, help="Enable DEBUG logging.") parser.add_argument( @@ -81,7 +84,7 @@ def main(): cmd_instance.configure_parser(command_parser) # Parse arguments - args = parser.parse_args() + args = parser.parse_args(args=argv) # Now, pass the args to command discovery and update command instances command_instances = discover_commands(args) diff --git a/tcs_garr/utils.py b/tcs_garr/utils.py index 1fa1c9e..f4a8102 100644 --- a/tcs_garr/utils.py +++ b/tcs_garr/utils.py @@ -1,7 +1,9 @@ import configparser import os +import pathlib import re import shutil +import string import subprocess import sys from datetime import datetime @@ -66,6 +68,7 @@ def __init__(self, environment="production", alt_config_path=None): self.password = None self.totp_seed = None self.output_folder = None + self.output_template = None self.http_proxy = None self.https_proxy = None self.webhook_url = None @@ -108,6 +111,7 @@ def _load_config(self, environment="production", alt_config_path=None): "password": config.get(section_name, "password"), "totp_seed": config.get(section_name, "totp_seed", fallback=None), "output_folder": config.get(section_name, "output_folder"), + "output_template": config.get(section_name, "output_template", fallback=None), "http_proxy": config.get(section_name, "http_proxy", fallback=None), "https_proxy": config.get(section_name, "https_proxy", fallback=None), "webhook_url": config.get(section_name, "webhook_url", fallback=None), @@ -133,6 +137,7 @@ def _load_config(self, environment="production", alt_config_path=None): "password": os.getenv("HARICA_PASSWORD"), "totp_seed": os.getenv("HARICA_TOTP_SEED"), "output_folder": os.getenv("HARICA_OUTPUT_FOLDER", settings.OUTPUT_PATH), + "output_template": os.getenv("HARICA_OUTPUT_TEMPLATE"), "http_proxy": os.getenv("HARICA_HTTP_PROXY") or os.getenv("HTTP_PROXY"), "https_proxy": os.getenv("HARICA_HTTPS_PROXY") or os.getenv("HTTPS_PROXY"), "webhook_url": os.getenv("HARICA_WEBHOOK_URL") or os.getenv("WEBHOOK_URL"), @@ -156,12 +161,14 @@ def _load_config(self, environment="production", alt_config_path=None): self.password = config_data["password"] self.totp_seed = config_data["totp_seed"] self.output_folder = config_data["output_folder"] + self.output_template = config_data["output_template"] self.http_proxy = config_data["http_proxy"] self.https_proxy = config_data["https_proxy"] self.webhook_url = config_data["webhook_url"] self.webhook_type = config_data["webhook_type"] - def _validate_config(self, config_data): + @staticmethod + def _validate_config(config_data): """ Validate the configuration data. @@ -190,6 +197,20 @@ def _validate_config(self, config_data): logger.error(f"❌ Invalid TOTP seed: {totp_seed}") exit(1) + # Validate the output folder path + output_folder = config_data["output_folder"] + if not pathlib.Path(output_folder).is_absolute(): + logger.error(f"x Invalid output_folder {output_folder!r}: must be an absolute path") + exit(1) + + output_template = config_data["output_template"] + if output_template: + try: + OutputTemplate(output_template) + except ValueError as e: + logger.error(f"❌ Invalid output_template: {e}") + exit(1) + def generate_otp(totp_seed): totp = pyotp.parse_uri(totp_seed) @@ -291,3 +312,97 @@ def format_date_and_check_expiry(date: str) -> tuple[str, bool]: expired = True return formatted_date.strftime("%Y-%m-%d %H:%M"), expired + + +def is_hostname_valid(hostname: str) -> bool: + """ + Check if a hostname is a valid Fully Qualified Domain Name. Does not resolve the name to an IP address. + """ + return re.match(r"^([a-z0-9]+(-[a-z0-9]+)*\.)+[a-z]{4,}$", hostname, re.IGNORECASE) is not None + + +class OutputTemplate(string.Template): + """ + A string template class for mapping a filename to a predefined filename/path. + The template can be defined with any identifiers, but the following ones have + a special meaning:: + + - "fqdn": the Fully Qualified Domain Name. + - "host": the host part of the FQDN, without the domain. + - "domain": the domain part of the FQDN, if any. + - "year": the current year as 4-digits integer value. + - "month": the current month as zero-filled integer value with 2-digits. + - "day": the current day as zero-filled integer value with 2-digits. + - "suffix": the suffix part filename provided for translation, that could be a file extension. + + The template is valid if contains at least either "fqdn" or "host" identifiers. + """ + + def __init__(self, template: str = "${fqdn}${suffix}") -> None: + super().__init__(template) + try: + self.substitute({k: k.upper() for k in self.get_identifiers()}) + except ValueError as e: + raise ValueError(f"Invalid template string {template!r}: {e}") from None + + identifiers = self.get_identifiers() + if "fqdn" not in identifiers and "host" not in identifiers: + raise ValueError(f"Invalid template string {template!r}: missing both 'fqdn' and 'host' keys") + + def get_identifiers(self): + return [t[1] or t[2] for t in self.pattern.findall(self.template)] + + def get_filepath(self, filename: str, **kwargs: str) -> str: + """ + Apply the template substitution to a filename. The substitution can be applied if + a *fqdn* identifier is provided using the keyword arguments. If *fqdn* is not + provided the Fully Qualified Domain Name is extracted from the filename. If *fqdn* is + missing returns the *filename* argument unchanged. + + Missing *host* and *domain* values are extracted from *fqdn*. Missing values for *year*, + *month* and *day* ar filled using the current date. Missing additional identifiers are + replaced with an empty string. + + Arguments + --------- + filename : str + the source filename string to be processed by template. + kwargs: str + additional keyword arguments passed to template substitution. + + Returns + ------- + str + The resulting filepath by substitution. + """ + filepath = pathlib.Path(filename) + if "_" in filepath.name: + fqdn, _, suffix = filepath.name.partition("_") + suffix = _ + suffix + else: + fqdn = filepath.stem + suffix = filepath.suffix + + if "fqdn" in kwargs: + fqdn = kwargs.pop("fqdn") + + host, _, domain = fqdn.partition(".") + today = datetime.today() + + mapping = { + "fqdn": fqdn.lower(), + "host": kwargs.pop("host", host).lower(), + "domain": kwargs.pop("domain", domain).lower(), + "year": kwargs.pop("year", str(today.year).zfill(4)), + "month": kwargs.pop("month", str(today.month).zfill(2)), + "day": kwargs.pop("day", str(today.day).zfill(2)), + "suffix": kwargs.pop("suffix", suffix), + } + + # Additional identifiers + mapping.update(kwargs) + for k in self.get_identifiers(): + if k not in mapping: + mapping[k] = "" + + return self.substitute(mapping) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_tcs_garr.py b/tests/test_tcs_garr.py new file mode 100644 index 0000000..6e38296 --- /dev/null +++ b/tests/test_tcs_garr.py @@ -0,0 +1,169 @@ +import pathlib +import unittest +import unittest.mock as mock +import io +import uuid +from argparse import Namespace +from datetime import datetime +from contextlib import redirect_stdout, redirect_stderr + +from tcs_garr.utils import is_hostname_valid, OutputTemplate +from tcs_garr.main import main +from tcs_garr.commands.download import DownloadCommand + + +monday = datetime(year=1999, month=9, day=13) + + +def set_test_config(**kwargs): + """ + Returns a patch __init__ for tests, use kwargs to override test defaults. + + TODO: It would be better to have a patch to simulate a configuration file. + """ + + def client_config_init(self, environment="production", alt_config_path=None): + self.username = "dummy@tcs-garr.test" + self.password = "****************" + self.totp_seed = "otpauth://totp/TEST:dummy@tcs-garr.test?secret=JBSWY3DPEHPK3PXP&issuer=TEST" + self.output_folder = "/home/dummy/harica_certificates" + self.output_template = None + self.http_proxy = None + self.https_proxy = (None,) + self.webhook_url = (None,) + self.webhook_type = "generic" + self.__dict__.update(kwargs) + + return client_config_init + + +class TestHelpers(unittest.TestCase): + def test_is_hostname_valid(self): + self.assertTrue(is_hostname_valid("foo.example.test")) + self.assertFalse(is_hostname_valid("foo")) + self.assertTrue(is_hostname_valid("foo.test")) + + +class TestOutputTemplate(unittest.TestCase): + def test_initialization(self): + template = OutputTemplate("$fqdn") + self.assertEqual(template.get_identifiers(), ["fqdn"]) + + template = OutputTemplate("${host}_${domain}") + self.assertEqual(template.get_identifiers(), ["host", "domain"]) + + with self.assertRaises(ValueError) as cm: + OutputTemplate("${domain}") + self.assertEqual(str(cm.exception), "Invalid template string '${domain}': missing both 'fqdn' and 'host' keys") + + with self.assertRaises(ValueError) as cm: + OutputTemplate("${cn}.${ext}") + self.assertEqual(str(cm.exception), "Invalid template string '${cn}.${ext}': missing both 'fqdn' and 'host' keys") + + template = OutputTemplate("${host}") + self.assertEqual(template.get_identifiers(), ["host"]) + + template = OutputTemplate("${host}/${fqdn}") + self.assertEqual(template.get_identifiers(), ["host", "fqdn"]) + + def test_substitution(self): + template = OutputTemplate() + self.assertEqual(template.get_filepath("foo.example.test"), "foo.example.test") + + @mock.patch("tcs_garr.utils.datetime") + def test_hostname_mapping(self, mock_datetime): + mock_datetime.today.return_value = monday + + template = OutputTemplate("$fqdn-${year}${month}${day}${suffix}") + self.assertEqual(template.get_filepath("foo.example.test.key"), "foo.example.test-19990913.key") + + template = OutputTemplate("$domain/$host/$fqdn-${year}${month}${day}") + self.assertEqual( + template.get_filepath("foo.example.test.key", year="2000", month="01", day="01"), + "example.test/foo/foo.example.test-20000101", + ) + + template = OutputTemplate("$fqdn/${year}${month}${day}/${fqdn}${suffix}") + self.assertEqual(template.get_filepath("foo.example.test.pem"), "foo.example.test/19990913/foo.example.test.pem") + + +class TestCommandLineInterface(unittest.TestCase): + def exec_main(self, argv=None, exc=None): + """Execute main() function and returns stdout and stderr produced by the call.""" + with redirect_stdout(io.StringIO()) as out, redirect_stderr(io.StringIO()) as err: + if exc is None: + main(argv) + else: + with self.assertRaises(exc): + main(argv) + + return out.getvalue().strip(), err.getvalue().strip() + + def test_main_options(self): + out, err = self.exec_main(argv=["--help"], exc=SystemExit) + self.assertTrue(out.lstrip().startswith("usage: tcs-garr [-h]")) + self.assertEqual(err, "") + + out, err = self.exec_main(argv=["--version"], exc=SystemExit) + self.assertRegex(out, r"\d+(\.\d+)*((a|b|rc)\d+)?") + self.assertEqual(err, "") + + +class TestDownloadCommand(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.args = Namespace( + debug=False, + no_check_release=False, + command="download", + environment="production", + config=None, + id=str(uuid.uuid4()), + save=False, + output_filename=None, + force=False, + download_type="pemBundle", + ) + + @mock.patch("tcs_garr.utils.HaricaClientConfig.__init__", set_test_config()) + def test_get_output_folder(self): + command = DownloadCommand(args=self.args) + self.assertEqual(command.harica_config.username, "dummy@tcs-garr.test") + self.assertEqual(command.get_output_folder(), "/home/dummy/harica_certificates") + + @mock.patch("tcs_garr.utils.HaricaClientConfig.__init__", set_test_config()) + def test_get_output_filepath(self): + command = DownloadCommand(args=self.args) + self.assertEqual(command.harica_config.username, "dummy@tcs-garr.test") + + self.assertEqual( + command.get_output_filepath("foo.example.test.pem"), + pathlib.Path("/home/dummy/harica_certificates/foo.example.test.pem"), + ) + self.assertEqual( + command.get_output_filepath(" foo.example.test.pem"), + pathlib.Path("/home/dummy/harica_certificates/foo.example.test.pem"), + ) + self.assertEqual( + command.get_output_filepath("foo_bundle.pem"), pathlib.Path("/home/dummy/harica_certificates/foo_bundle.pem") + ) + + @mock.patch( + "tcs_garr.utils.HaricaClientConfig.__init__", + set_test_config(output_template="${host}/${year}${month}${day}/${fqdn}${suffix}"), + ) + @mock.patch("tcs_garr.utils.datetime") + def test_get_output_filepath_with_subpath(self, mock_datetime): + mock_datetime.today.return_value = monday + + command = DownloadCommand(args=self.args) + self.assertEqual(command.harica_config.username, "dummy@tcs-garr.test") + + self.assertEqual( + command.get_output_filepath("foo.example.test.key"), + pathlib.Path("/home/dummy/harica_certificates/foo/19990913/foo.example.test.key"), + ) + + +if __name__ == "__main__": + unittest.main()