diff --git a/TM1py/Services/ApplicationService.py b/TM1py/Services/ApplicationService.py index 3d8eb666..3d20b7d4 100644 --- a/TM1py/Services/ApplicationService.py +++ b/TM1py/Services/ApplicationService.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- import json -from typing import Union +from typing import Dict, List, Optional, Tuple, Union from requests import Response +from TM1py.Exceptions import TM1pyRestException from TM1py.Objects.Application import ( Application, ApplicationTypes, @@ -32,6 +33,138 @@ def __init__(self, tm1_rest: RestService): """ super().__init__(tm1_rest) self._rest = tm1_rest + self._private_path_cache: Dict[str, int] = {} + + def _build_path_url(self, segments: List[str], private_boundary: Optional[int] = None) -> str: + """Build URL path from segments with optional private boundary. + + :param segments: list of folder names + :param private_boundary: index from which to use PrivateContents (None = all public) + :return: URL path string for the segments + """ + if not segments: + return "" + + # All public + if private_boundary is None or private_boundary >= len(segments): + return "".join(format_url("/Contents('{}')", s) for s in segments) + + # All private + if private_boundary <= 0: + return "".join(format_url("/PrivateContents('{}')", s) for s in segments) + + # Mixed: public up to boundary, then private + public_part = "".join(format_url("/Contents('{}')", s) for s in segments[:private_boundary]) + private_part = "".join(format_url("/PrivateContents('{}')", s) for s in segments[private_boundary:]) + return public_part + private_part + + def _find_private_boundary(self, segments: List[str], **kwargs) -> int: + """Find the first private folder in the path. + + Iteratively checks each segment to find where the path transitions + from public to private. Once a private folder is found, all subsequent + folders are also private (TM1 rule). + + :param segments: list of folder names + :return: index of first private folder, or len(segments) if all public, or -1 if path doesn't exist + """ + current_prefix = "/Contents('Applications')" + + for i, segment in enumerate(segments): + # Try public access first + public_url = current_prefix + format_url("/Contents('{}')", segment) + "?$top=0" + try: + self._rest.GET(public_url, **kwargs) + # Segment is public, continue building prefix + current_prefix = current_prefix + format_url("/Contents('{}')", segment) + except TM1pyRestException as e: + if e.status_code != 404: + raise + # Not found as public, try private + private_url = current_prefix + format_url("/PrivateContents('{}')", segment) + "?$top=0" + try: + self._rest.GET(private_url, **kwargs) + return i # Found the first private segment + except TM1pyRestException as e2: + if e2.status_code != 404: + raise + return -1 # Path doesn't exist + + return len(segments) # All segments are public + + def _resolve_path(self, path: str, private: bool = False, use_cache: bool = False, + **kwargs) -> Tuple[str, bool]: + """Resolve application path, handling mixed public/private folder hierarchies. + + For public access (private=False), returns direct URL without probing. + For private access (private=True), probes to find where private folders begin. + + :param path: path with forward slashes (e.g., "Planning Sample/Reports") + :param private: whether we're accessing private content (triggers path probing) + :param use_cache: whether to use/update the private boundary cache + :return: tuple of (resolved_base_url, in_private_context) + - resolved_base_url: URL from Contents('Applications') through the path + - in_private_context: True if any folder in path is private + """ + base = "/Contents('Applications')" + + if not path.strip(): + return base, False + + segments = path.split("/") + + # For public access, assume all-public path (no probing needed) + if not private: + mid = self._build_path_url(segments, None) + return base + mid, False + + # For private access, we need to find where private folders begin + cache_key = "/".join(segments) + + # Check cache first + if use_cache and cache_key in self._private_path_cache: + boundary = self._private_path_cache[cache_key] + mid = self._build_path_url(segments, boundary) + in_private_context = boundary < len(segments) + return base + mid, in_private_context + + # Optimistic: try all-public first (common case) + mid_public = self._build_path_url(segments, None) + url_public = base + mid_public + try: + self._rest.GET(url_public + "?$top=0", **kwargs) + if use_cache: + self._private_path_cache[cache_key] = len(segments) + return url_public, False + except TM1pyRestException as e: + if e.status_code != 404: + raise + + # Optimistic: try all-private + mid_private = self._build_path_url(segments, 0) + url_private = base + mid_private + try: + self._rest.GET(url_private + "?$top=0", **kwargs) + if use_cache: + self._private_path_cache[cache_key] = 0 + return url_private, True + except TM1pyRestException as e: + if e.status_code != 404: + raise + + # Iterative search to find the exact transition point + boundary = self._find_private_boundary(segments, **kwargs) + + if boundary == -1: + # Path doesn't exist - return public URL to get proper error message + return url_public, False + + if use_cache: + self._private_path_cache[cache_key] = boundary + + mid = self._build_path_url(segments, boundary) + in_private_context = boundary < len(segments) + return base + mid, in_private_context def get_all_public_root_names(self, **kwargs): """ @@ -46,39 +179,49 @@ def get_all_public_root_names(self, **kwargs): return applications def get_all_private_root_names(self, **kwargs): + """ + Retrieve all private root application names. + :param kwargs: Additional arguments for the REST request. + :return: List of private root application names. + """ url = "/Contents('Applications')/PrivateContents" response = self._rest.GET(url, **kwargs) applications = list(application["Name"] for application in response.json()["value"]) return applications - def get_names(self, path: str, private: bool = False, **kwargs): + def get_names(self, path: str, private: bool = False, use_cache: bool = False, **kwargs): """Retrieve Planning Analytics Application names in given path + Automatically handles mixed public/private folder hierarchies. + :param path: path with forward slashes - :param private: boolean + :param private: boolean - whether to retrieve private or public contents at the leaf + :param use_cache: boolean - whether to cache discovered private boundaries :return: list of application names """ - contents = "PrivateContents" if private else "Contents" - mid = "" - if path.strip() != "": - mid = "".join([format_url("/Contents('{}')", element) for element in path.split("/")]) - base_url = "/api/v1/Contents('Applications')" + mid + "/" + contents + base_url, in_private_context = self._resolve_path(path, private, use_cache, **kwargs) - response = self._rest.GET(url=base_url, **kwargs) - applications = list(application["Name"] for application in response.json()["value"]) + # Use PrivateContents if we're in a private context OR if private=True + contents = "PrivateContents" if (private or in_private_context) else "Contents" + url = base_url + "/" + contents - return applications + response = self._rest.GET(url=url, **kwargs) + return [application["Name"] for application in response.json()["value"]] def get( - self, path: str, application_type: Union[str, ApplicationTypes], name: str, private: bool = False, **kwargs + self, path: str, application_type: Union[str, ApplicationTypes], name: str, private: bool = False, + use_cache: bool = False, **kwargs ) -> Application: """Retrieve Planning Analytics Application + Automatically handles mixed public/private folder hierarchies. + :param path: path with forward slashes :param application_type: str or ApplicationType from Enum :param name: :param private: + :param use_cache: boolean - whether to cache discovered private boundaries :return: """ # raise ValueError if not a valid ApplicationType @@ -86,21 +229,18 @@ def get( # documents require special treatment if application_type == ApplicationTypes.DOCUMENT: - return self.get_document(path=path, name=name, private=private, **kwargs) + return self.get_document(path=path, name=name, private=private, use_cache=use_cache, **kwargs) if not application_type == ApplicationTypes.FOLDER and not verify_version( required_version="12", version=self.version ): name += application_type.suffix - contents = "PrivateContents" if private else "Contents" - mid = "" - if path.strip() != "": - mid = "".join([format_url("/Contents('{}')", element) for element in path.split("/")]) + base_url, in_private_context = self._resolve_path(path, private, use_cache, **kwargs) - base_url = format_url( - "/Contents('Applications')" + mid + "/" + contents + "('{application_name}')", application_name=name - ) + # Use PrivateContents if we're in a private context OR if private=True + contents = "PrivateContents" if (private or in_private_context) else "Contents" + base_url = format_url(base_url + "/" + contents + "('{}')", name) if application_type == ApplicationTypes.CUBE: response = self._rest.GET(url=base_url + "?$expand=Cube($select=Name)", **kwargs) @@ -154,12 +294,16 @@ def get( view_name=response.json()["View"]["Name"], ) - def get_document(self, path: str, name: str, private: bool = False, **kwargs) -> DocumentApplication: + def get_document(self, path: str, name: str, private: bool = False, use_cache: bool = False, + **kwargs) -> DocumentApplication: """Get Excel Application from TM1 Server in binary format. Can be dumped to file. + Automatically handles mixed public/private folder hierarchies. + :param path: path through folder structure to application. For instance: "Finance/P&L.xlsx" :param name: name of the application :param private: boolean + :param use_cache: boolean - whether to cache discovered private boundaries :return: Return DocumentApplication """ if not name.endswith(ApplicationTypes.DOCUMENT.suffix) and not verify_version( @@ -167,13 +311,15 @@ def get_document(self, path: str, name: str, private: bool = False, **kwargs) -> ): name += ApplicationTypes.DOCUMENT.suffix - contents = "PrivateContents" if private else "Contents" - mid = "".join([format_url("/Contents('{}')", element) for element in path.split("/")]) - url = format_url("/Contents('Applications')" + mid + "/" + contents + "('{name}')/Document/Content", name=name) + base_url, in_private_context = self._resolve_path(path, private, use_cache, **kwargs) + + # Use PrivateContents if we're in a private context OR if private=True + contents = "PrivateContents" if (private or in_private_context) else "Contents" + url = format_url(base_url + "/" + contents + "('{}')/Document/Content", name) content = self._rest.GET(url, **kwargs).content - url = format_url("/Contents('Applications')" + mid + "/" + contents + "('{name}')/Document", name=name) + url = format_url(base_url + "/" + contents + "('{}')/Document", name) document_fields = self._rest.GET(url, **kwargs).json() return DocumentApplication( @@ -191,17 +337,20 @@ def delete( application_type: Union[str, ApplicationTypes], application_name: str, private: bool = False, + use_cache: bool = False, **kwargs, ) -> Response: """Delete Planning Analytics application reference + Automatically handles mixed public/private folder hierarchies. + :param path: path through folder structure to delete the applications entry. For instance: "Finance/Reports" :param application_type: type of the to be deleted application entry :param application_name: name of the to be deleted application entry :param private: Access level of the to be deleted object + :param use_cache: boolean - whether to cache discovered private boundaries :return: """ - # raise ValueError if not a valid ApplicationType application_type = ApplicationTypes(application_type) @@ -210,15 +359,12 @@ def delete( ): application_name += application_type.suffix - contents = "PrivateContents" if private else "Contents" - mid = "" - if path.strip() != "": - mid = "".join([format_url("/Contents('{}')", element) for element in path.split("/")]) + base_url, in_private_context = self._resolve_path(path, private, use_cache, **kwargs) + + # Use PrivateContents if we're in a private context OR if private=True + contents = "PrivateContents" if (private or in_private_context) else "Contents" + url = format_url(base_url + "/" + contents + "('{}')", application_name) - url = format_url( - "/Contents('Applications')" + mid + "/" + contents + "('{application_name}')", - application_name=application_name, - ) return self._rest.DELETE(url, **kwargs) def rename( @@ -228,8 +374,21 @@ def rename( application_name: str, new_application_name: str, private: bool = False, + use_cache: bool = False, **kwargs, ): + """Rename a Planning Analytics application. + + Automatically handles mixed public/private folder hierarchies. + + :param path: path through folder structure + :param application_type: type of the application + :param application_name: current name of the application + :param new_application_name: new name for the application + :param private: Access level of the object + :param use_cache: boolean - whether to cache discovered private boundaries + :return: + """ # raise ValueError if not a valid ApplicationType application_type = ApplicationTypes(application_type) @@ -238,38 +397,37 @@ def rename( ): application_name += application_type.suffix - contents = "PrivateContents" if private else "Contents" - mid = "" - if path.strip() != "": - mid = "".join([format_url("/Contents('{}')", element) for element in path.split("/")]) + base_url, in_private_context = self._resolve_path(path, private, use_cache, **kwargs) - url = format_url( - "/Contents('Applications')" + mid + "/" + contents + "('{application_name}')/tm1.Move", - application_name=application_name, - ) - data = {"Name": new_application_name} + # Use PrivateContents if we're in a private context OR if private=True + contents = "PrivateContents" if (private or in_private_context) else "Contents" + url = format_url(base_url + "/" + contents + "('{}')/tm1.Move", application_name) + data = {"Name": new_application_name} return self._rest.POST(url, data=json.dumps(data), **kwargs) - def create(self, application: Union[Application, DocumentApplication], private: bool = False, **kwargs) -> Response: + def create(self, application: Union[Application, DocumentApplication], private: bool = False, + use_cache: bool = False, **kwargs) -> Response: """Create Planning Analytics application + Automatically handles mixed public/private folder hierarchies. + :param application: instance of Application :param private: boolean + :param use_cache: boolean - whether to cache discovered private boundaries :return: """ + base_url, in_private_context = self._resolve_path(application.path, private, use_cache, **kwargs) - contents = "PrivateContents" if private else "Contents" + # Use PrivateContents if we're in a private context OR if private=True + contents = "PrivateContents" if (private or in_private_context) else "Contents" + url = base_url + "/" + contents - mid = "" - if application.path.strip() != "": - mid = "".join([format_url("/Contents('{}')", element) for element in application.path.split("/")]) - url = "/Contents('Applications')" + mid + "/" + contents response = self._rest.POST(url, application.body, **kwargs) if application.application_type == ApplicationTypes.DOCUMENT: url = format_url( - "/Contents('Applications')" + mid + "/" + contents + "('{name}{suffix}')/Document/Content", + base_url + "/" + contents + "('{name}{suffix}')/Document/Content", name=application.name, suffix=".blob" if not verify_version(required_version="12", version=self.version) else "", ) @@ -277,40 +435,46 @@ def create(self, application: Union[Application, DocumentApplication], private: return response - def update(self, application: Union[Application, DocumentApplication], private: bool = False, **kwargs) -> Response: + def update(self, application: Union[Application, DocumentApplication], private: bool = False, + use_cache: bool = False, **kwargs) -> Response: """Update Planning Analytics application + Automatically handles mixed public/private folder hierarchies. + :param application: instance of Application :param private: boolean + :param use_cache: boolean - whether to cache discovered private boundaries :return: """ + base_url, in_private_context = self._resolve_path(application.path, private, use_cache, **kwargs) - contents = "PrivateContents" if private else "Contents" - - mid = "" - if application.path.strip() != "": - mid = "".join([format_url("/Contents('{}')", element) for element in application.path.split("/")]) + # Use PrivateContents if we're in a private context OR if private=True + contents = "PrivateContents" if (private or in_private_context) else "Contents" if application.application_type == ApplicationTypes.DOCUMENT: url = format_url( - "/Contents('Applications')" + mid + "/" + contents + "('{name}{extension}')/Document/Content", + base_url + "/" + contents + "('{name}{extension}')/Document/Content", name=application.name, extension="" if verify_version("12", self.version) else ".blob" ) response = self._rest.PATCH(url=url, data=application.content, headers=self.binary_http_header, **kwargs) else: - url = "/Contents('Applications')" + mid + "/" + contents + url = base_url + "/" + contents response = self._rest.POST(url, application.body, **kwargs) return response def update_or_create( - self, application: Union[Application, DocumentApplication], private: bool = False, **kwargs + self, application: Union[Application, DocumentApplication], private: bool = False, + use_cache: bool = False, **kwargs ) -> Response: """Update or create Planning Analytics application + Automatically handles mixed public/private folder hierarchies. + :param application: instance of Application :param private: boolean + :param use_cache: boolean - whether to cache discovered private boundaries :return: Response """ if self.exists( @@ -318,45 +482,55 @@ def update_or_create( application_type=application.application_type, name=application.name, private=private, + use_cache=use_cache, **kwargs, ): - response = self.update(application=application, private=private, **kwargs) + response = self.update(application=application, private=private, use_cache=use_cache, **kwargs) else: - response = self.create(application=application, private=private, **kwargs) + response = self.create(application=application, private=private, use_cache=use_cache, **kwargs) return response def update_or_create_document_from_file( - self, path: str, name: str, path_to_file: str, private: bool = False, **kwargs + self, path: str, name: str, path_to_file: str, private: bool = False, use_cache: bool = False, **kwargs ) -> Response: """Update or create application from file + Automatically handles mixed public/private folder hierarchies. + :param path: application path on server, i.e. 'Finance/Reports' :param name: name of the application on server, i.e. 'Flash.xlsx' :param path_to_file: full local file path of file, i.e. 'C:\\Users\\User\\Flash.xslx' :param private: access level of the object + :param use_cache: boolean - whether to cache discovered private boundaries :return: Response """ - - if self.exists(path=path, application_type=ApplicationTypes.DOCUMENT, name=name, private=private): + if self.exists(path=path, application_type=ApplicationTypes.DOCUMENT, name=name, private=private, + use_cache=use_cache, **kwargs): response = self.update_document_from_file( - path_to_file=path_to_file, application_path=path, application_name=name, private=private + path_to_file=path_to_file, application_path=path, application_name=name, private=private, + use_cache=use_cache, **kwargs ) else: response = self.create_document_from_file( - path_to_file=path_to_file, application_path=path, application_name=name, private=private + path_to_file=path_to_file, application_path=path, application_name=name, private=private, + use_cache=use_cache, **kwargs ) return response def exists( - self, path: str, application_type: Union[str, ApplicationTypes], name: str, private: bool = False, **kwargs + self, path: str, application_type: Union[str, ApplicationTypes], name: str, private: bool = False, + use_cache: bool = False, **kwargs ) -> bool: """Check if application exists + Automatically handles mixed public/private folder hierarchies. + :param path: :param application_type: :param name: :param private: + :param use_cache: boolean - whether to cache discovered private boundaries :return: """ # raise ValueError if not a valid ApplicationType @@ -367,42 +541,316 @@ def exists( ): name += application_type.suffix - contents = "PrivateContents" if private else "Contents" - mid = "" - if path.strip() != "": - mid = "".join(["/Contents('{}')".format(element) for element in path.split("/")]) - - url = format_url( - "/Contents('Applications')" + mid + "/" + contents + "('{application_name}')", application_name=name - ) + # For exists check with private=True, we need special handling + # because we want to check both public and private paths + if not private: + # Simple public check + segments = path.split("/") if path.strip() else [] + mid = self._build_path_url(segments, None) + url = "/Contents('Applications')" + mid + "/Contents('" + name + "')" + return self._exists(url, **kwargs) + + # For private access, first try to resolve the path + segments = path.split("/") if path.strip() else [] + + if not segments: + # Root level - just check directly + url = "/Contents('Applications')/PrivateContents('" + name + "')" + return self._exists(url, **kwargs) + + # Check cache first + cache_key = "/".join(segments) + if use_cache and cache_key in self._private_path_cache: + boundary = self._private_path_cache[cache_key] + mid = self._build_path_url(segments, boundary) + in_private_context = boundary < len(segments) + contents = "PrivateContents" if in_private_context else "Contents" + url = "/Contents('Applications')" + mid + "/" + contents + "('" + name + "')" + return self._exists(url, **kwargs) + + # Try all-public path first + mid_public = self._build_path_url(segments, None) + url_public = "/Contents('Applications')" + mid_public + "/PrivateContents('" + name + "')" + if self._exists(url_public, **kwargs): + if use_cache: + self._private_path_cache[cache_key] = len(segments) + return True + + # Try to find the private boundary + boundary = self._find_private_boundary(segments, **kwargs) + + if boundary == -1: + return False # Path doesn't exist + + if use_cache: + self._private_path_cache[cache_key] = boundary + + mid = self._build_path_url(segments, boundary) + # If path has private folders, item must be accessed via PrivateContents + contents = "PrivateContents" if boundary < len(segments) else "PrivateContents" + url = "/Contents('Applications')" + mid + "/" + contents + "('" + name + "')" return self._exists(url, **kwargs) def create_document_from_file( - self, path_to_file: str, application_path: str, application_name: str, private: bool = False, **kwargs + self, path_to_file: str, application_path: str, application_name: str, private: bool = False, + use_cache: bool = False, **kwargs ) -> Response: """Create DocumentApplication in TM1 from local file + Automatically handles mixed public/private folder hierarchies. + :param path_to_file: :param application_path: :param application_name: :param private: + :param use_cache: boolean - whether to cache discovered private boundaries :return: """ with open(path_to_file, "rb") as file: application = DocumentApplication(path=application_path, name=application_name, content=file.read()) - return self.create(application=application, private=private, **kwargs) + return self.create(application=application, private=private, use_cache=use_cache, **kwargs) def update_document_from_file( - self, path_to_file: str, application_path: str, application_name: str, private: bool = False, **kwargs + self, path_to_file: str, application_path: str, application_name: str, private: bool = False, + use_cache: bool = False, **kwargs ) -> Response: """Update DocumentApplication in TM1 from local file + Automatically handles mixed public/private folder hierarchies. + :param path_to_file: :param application_path: :param application_name: :param private: + :param use_cache: boolean - whether to cache discovered private boundaries :return: """ with open(path_to_file, "rb") as file: application = DocumentApplication(path=application_path, name=application_name, content=file.read()) - return self.update(application=application, private=private, **kwargs) + return self.update(application=application, private=private, use_cache=use_cache, **kwargs) + + def _extract_type_from_odata(self, odata_type: str) -> str: + """Extract the type name from @odata.type string. + + :param odata_type: e.g., '#ibm.tm1.api.v1.Folder' + :return: e.g., 'Folder' + """ + if odata_type and "." in odata_type: + return odata_type.split(".")[-1] + return odata_type or "Unknown" + + def _get_contents_raw(self, path: str, private: bool, in_private_context: bool, **kwargs) -> List[Dict]: + """Get raw contents from API for a given path. + + :param path: the path to query + :param private: whether to access private contents + :param in_private_context: whether we're already in a private context + :return: list of raw item dictionaries from API + """ + base = "/Contents('Applications')" + + if not path.strip(): + # Root level + url = base + ("/PrivateContents" if private else "/Contents") + else: + # Need to properly resolve the path to handle mixed public/private hierarchies + if in_private_context or private: + # Find the actual private boundary in the path + segments = path.split("/") + boundary = self._find_private_boundary(segments, **kwargs) + if boundary == -1: + # Path doesn't exist + return [] + mid = self._build_path_url(segments, boundary) + else: + # All public path + segments = path.split("/") + mid = self._build_path_url(segments, None) + + contents = "PrivateContents" if (private or in_private_context) else "Contents" + url = base + mid + "/" + contents + + try: + response = self._rest.GET(url, **kwargs) + return response.json().get("value", []) + except TM1pyRestException as e: + if e.status_code == 404: + return [] + raise + + def _discover_at_path( + self, + path: str, + include_private: bool, + recursive: bool, + flat: bool, + in_private_context: bool, + results: List[Dict], + **kwargs + ) -> List[Dict]: + """Discover items at a specific path, handling both public and private contents. + + :param path: current path being explored + :param include_private: whether to include private assets + :param recursive: whether to recurse into folders + :param flat: whether to return flat list + :param in_private_context: whether we're in a private folder context (everything below is private) + :param results: accumulator for flat mode + :return: list of discovered items + """ + items = [] + + if in_private_context: + # Inside a private folder - everything is private, only one call needed + raw_items = self._get_contents_raw(path, private=True, in_private_context=True, **kwargs) + self._process_items( + raw_items=raw_items, + path=path, + is_private=True, + in_private_context=True, + include_private=include_private, + recursive=recursive, + flat=flat, + results=results, + items=items, + **kwargs + ) + else: + # Public context - get public contents + raw_public = self._get_contents_raw(path, private=False, in_private_context=False, **kwargs) + self._process_items( + raw_items=raw_public, + path=path, + is_private=False, + in_private_context=False, + include_private=include_private, + recursive=recursive, + flat=flat, + results=results, + items=items, + **kwargs + ) + + # Also get private contents if requested (private items in a public folder) + if include_private: + raw_private = self._get_contents_raw(path, private=True, in_private_context=False, **kwargs) + self._process_items( + raw_items=raw_private, + path=path, + is_private=True, + in_private_context=False, + include_private=include_private, + recursive=recursive, + flat=flat, + results=results, + items=items, + **kwargs + ) + + return items if not flat else results + + def _process_items( + self, + raw_items: List[Dict], + path: str, + is_private: bool, + in_private_context: bool, + include_private: bool, + recursive: bool, + flat: bool, + results: List[Dict], + items: List[Dict], + **kwargs + ): + """Process raw items from API and handle recursion. + + :param raw_items: raw items from API + :param path: current path + :param is_private: whether these items are private + :param in_private_context: whether we're in a private folder context + :param include_private: whether to include private assets + :param recursive: whether to recurse into folders + :param flat: whether to return flat list + :param results: accumulator for flat mode + :param items: accumulator for nested mode + """ + for raw_item in raw_items: + odata_type = raw_item.get("@odata.type", "") + item_type = self._extract_type_from_odata(odata_type) + item_name = raw_item.get("Name", "") + item_id = raw_item.get("ID", "") + + # Build full path for this item + item_path = f"{path}/{item_name}" if path else item_name + + item = { + "@odata.type": odata_type, + "type": item_type, + "id": item_id, + "name": item_name, + "path": item_path, + "is_private": is_private or in_private_context + } + + # Handle recursion for folders + if recursive and item_type == "Folder": + # When recursing into a private folder, everything below is private + new_private_context = is_private or in_private_context + children = self._discover_at_path( + path=item_path, + include_private=include_private, + recursive=recursive, + flat=flat, + in_private_context=new_private_context, + results=results, + **kwargs + ) + if not flat: + item["children"] = children + + if flat: + results.append(item) + else: + items.append(item) + + def discover( + self, + path: str = "", + include_private: bool = False, + recursive: bool = False, + flat: bool = False, + **kwargs + ) -> List[Dict]: + """Discover applications in the Applications folder. + + Traverses the application hierarchy and returns information about all + discovered items including folders, documents, views, and other references. + + :param path: starting path (empty string = root 'Applications' folder) + :param include_private: whether to include private assets in the results + :param recursive: whether to recurse into subfolders + :param flat: if True, returns a flat list; if False (default), returns nested structure + :return: list of dictionaries with keys: @odata.type, type, id, name, path, is_private + - @odata.type: full OData type (e.g., '#ibm.tm1.api.v1.Folder') + - type: simplified type name (e.g., 'Folder') + For nested mode, folders also have a 'children' key when recursive=True + """ + results = [] # Accumulator for flat mode + + # Determine if we're starting in a private context (path contains private folders) + in_private_context = False + if path.strip() and include_private: + _, in_private_context = self._resolve_path(path, private=True, use_cache=False, **kwargs) + + # Use the unified discovery method + items = self._discover_at_path( + path=path, + include_private=include_private, + recursive=recursive, + flat=flat, + in_private_context=in_private_context, + results=results, + **kwargs + ) + + return results if flat else items diff --git a/Tests/ApplicationService_test.py b/Tests/ApplicationService_test.py index 1f02d05f..428ab943 100644 --- a/Tests/ApplicationService_test.py +++ b/Tests/ApplicationService_test.py @@ -548,3 +548,968 @@ def test_view_application_private(self): def test_view_application_public(self): self.run_view_application(False) + + # Tests for private path resolution functionality + + def test_build_path_url_all_public(self): + """Test _build_path_url with all public segments""" + segments = ["Folder1", "Folder2", "Folder3"] + result = self.tm1.applications._build_path_url(segments, private_boundary=None) + expected = "/Contents('Folder1')/Contents('Folder2')/Contents('Folder3')" + self.assertEqual(result, expected) + + def test_build_path_url_all_private(self): + """Test _build_path_url with all private segments""" + segments = ["Folder1", "Folder2", "Folder3"] + result = self.tm1.applications._build_path_url(segments, private_boundary=0) + expected = "/PrivateContents('Folder1')/PrivateContents('Folder2')/PrivateContents('Folder3')" + self.assertEqual(result, expected) + + def test_build_path_url_mixed_boundary_at_1(self): + """Test _build_path_url with private boundary at index 1""" + segments = ["Folder1", "Folder2", "Folder3"] + result = self.tm1.applications._build_path_url(segments, private_boundary=1) + expected = "/Contents('Folder1')/PrivateContents('Folder2')/PrivateContents('Folder3')" + self.assertEqual(result, expected) + + def test_build_path_url_mixed_boundary_at_2(self): + """Test _build_path_url with private boundary at index 2""" + segments = ["Folder1", "Folder2", "Folder3"] + result = self.tm1.applications._build_path_url(segments, private_boundary=2) + expected = "/Contents('Folder1')/Contents('Folder2')/PrivateContents('Folder3')" + self.assertEqual(result, expected) + + def test_build_path_url_empty_segments(self): + """Test _build_path_url with empty segments list""" + segments = [] + result = self.tm1.applications._build_path_url(segments, private_boundary=None) + self.assertEqual(result, "") + + def test_build_path_url_single_segment_public(self): + """Test _build_path_url with single public segment""" + segments = ["Folder1"] + result = self.tm1.applications._build_path_url(segments, private_boundary=None) + expected = "/Contents('Folder1')" + self.assertEqual(result, expected) + + def test_build_path_url_single_segment_private(self): + """Test _build_path_url with single private segment""" + segments = ["Folder1"] + result = self.tm1.applications._build_path_url(segments, private_boundary=0) + expected = "/PrivateContents('Folder1')" + self.assertEqual(result, expected) + + def test_build_path_url_special_characters(self): + """Test _build_path_url handles special characters in folder names""" + segments = ["Folder's Name", "Folder & Co"] + result = self.tm1.applications._build_path_url(segments, private_boundary=None) + # format_url should escape single quotes + self.assertIn("Folder''s Name", result) + + def test_private_folder_get_names(self): + """Test get_names on a private folder at root level""" + # Create a private folder + private_folder_name = self.prefix + "PrivateFolder" + private_folder = FolderApplication("", private_folder_name) + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + # Create private folder + self.tm1.applications.create(application=private_folder, private=True) + + try: + # Create a link application inside the private folder + link_app = LinkApplication(private_folder_name, self.application_name, "http://example.com") + self.tm1.applications.create(application=link_app, private=True) + + # Test get_names - should auto-resolve the private path + names = self.tm1.applications.get_names(path=private_folder_name, private=True) + self.assertIn(self.application_name, names) + + # Clean up link + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.LINK, + application_name=self.application_name, + private=True, + ) + finally: + # Clean up private folder + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + def test_private_folder_exists(self): + """Test exists on an application inside a private folder""" + # Create a private folder + private_folder_name = self.prefix + "PrivateFolder2" + private_folder = FolderApplication("", private_folder_name) + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + # Create private folder + self.tm1.applications.create(application=private_folder, private=True) + + try: + # Create a link application inside the private folder + link_app = LinkApplication(private_folder_name, self.application_name, "http://example.com") + self.tm1.applications.create(application=link_app, private=True) + + # Test exists - should auto-resolve the private path + exists = self.tm1.applications.exists( + path=private_folder_name, + application_type=ApplicationTypes.LINK, + name=self.application_name, + private=True, + ) + self.assertTrue(exists) + + # Clean up link + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.LINK, + application_name=self.application_name, + private=True, + ) + finally: + # Clean up private folder + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + def test_private_folder_with_cache(self): + """Test that caching works for private path resolution""" + # Create a private folder + private_folder_name = self.prefix + "PrivateFolderCache" + private_folder = FolderApplication("", private_folder_name) + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + # Create private folder + self.tm1.applications.create(application=private_folder, private=True) + + try: + # Create a link application inside the private folder + link_app = LinkApplication(private_folder_name, self.application_name, "http://example.com") + self.tm1.applications.create(application=link_app, private=True) + + # Clear cache + self.tm1.applications._private_path_cache.clear() + + # First call - should populate cache + names1 = self.tm1.applications.get_names(path=private_folder_name, private=True, use_cache=True) + self.assertIn(self.application_name, names1) + + # Verify cache was populated + self.assertIn(private_folder_name, self.tm1.applications._private_path_cache) + + # Second call - should use cache + names2 = self.tm1.applications.get_names(path=private_folder_name, private=True, use_cache=True) + self.assertEqual(names1, names2) + + # Clean up link + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.LINK, + application_name=self.application_name, + private=True, + ) + finally: + # Clean up private folder + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + # Clear cache + self.tm1.applications._private_path_cache.clear() + + def test_nested_private_folder(self): + """Test accessing applications in nested private folders""" + # Create a private folder with a subfolder + private_folder_name = self.prefix + "PrivateFolderNested" + subfolder_name = self.prefix + "SubFolder" + private_folder = FolderApplication("", private_folder_name) + subfolder = FolderApplication(private_folder_name, subfolder_name) + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + # Try to delete subfolder first + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.FOLDER, + application_name=subfolder_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + # Create private folder and subfolder + self.tm1.applications.create(application=private_folder, private=True) + self.tm1.applications.create(application=subfolder, private=True) + + try: + # Create a link application inside the nested folder + nested_path = f"{private_folder_name}/{subfolder_name}" + link_app = LinkApplication(nested_path, self.application_name, "http://example.com") + self.tm1.applications.create(application=link_app, private=True) + + # Test get_names on nested path - should auto-resolve + names = self.tm1.applications.get_names(path=nested_path, private=True) + self.assertIn(self.application_name, names) + + # Test exists on nested path + exists = self.tm1.applications.exists( + path=nested_path, + application_type=ApplicationTypes.LINK, + name=self.application_name, + private=True, + ) + self.assertTrue(exists) + + # Clean up link + self.tm1.applications.delete( + path=nested_path, + application_type=ApplicationTypes.LINK, + application_name=self.application_name, + private=True, + ) + finally: + # Clean up folders + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.FOLDER, + application_name=subfolder_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + # ==================== Tests for discover() function ==================== + + def test_discover_root_public_only(self): + """Test discover at root level with public items only""" + items = self.tm1.applications.discover(path="", include_private=False, recursive=False) + self.assertIsInstance(items, list) + # Should have at least our test folder + names = [item["name"] for item in items] + self.assertIn(self.tm1py_app_folder, names) + # Verify structure + for item in items: + self.assertIn("@odata.type", item) + self.assertIn("type", item) + self.assertIn("id", item) + self.assertIn("name", item) + self.assertIn("path", item) + self.assertIn("is_private", item) + self.assertFalse(item["is_private"]) + + def test_discover_root_include_private(self): + """Test discover at root level including private items""" + # First create a private folder + private_folder_name = self.prefix + "DiscoverPrivate" + private_folder = FolderApplication("", private_folder_name) + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + self.tm1.applications.create(application=private_folder, private=True) + + try: + # Discover with include_private=True + items = self.tm1.applications.discover(path="", include_private=True, recursive=False) + + # Check both public and private items are returned + names = [item["name"] for item in items] + self.assertIn(self.tm1py_app_folder, names) # Public folder + self.assertIn(private_folder_name, names) # Private folder + + # Verify private flag is set correctly + for item in items: + if item["name"] == private_folder_name: + self.assertTrue(item["is_private"]) + elif item["name"] == self.tm1py_app_folder: + self.assertFalse(item["is_private"]) + + finally: + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + def test_discover_specific_path(self): + """Test discover at a specific path""" + # Create a subfolder inside our test folder + subfolder_name = self.prefix + "DiscoverSub" + subfolder = FolderApplication(self.tm1py_app_folder, subfolder_name) + + # Clean up if exists + if self.tm1.applications.exists( + path=self.tm1py_app_folder, application_type=ApplicationTypes.FOLDER, name=subfolder_name, private=False + ): + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.FOLDER, + application_name=subfolder_name, + private=False, + ) + + self.tm1.applications.create(application=subfolder, private=False) + + try: + # Discover in the test folder + items = self.tm1.applications.discover(path=self.tm1py_app_folder, include_private=False, recursive=False) + + names = [item["name"] for item in items] + self.assertIn(subfolder_name, names) + + # Check path is correctly set + for item in items: + if item["name"] == subfolder_name: + self.assertEqual(item["path"], f"{self.tm1py_app_folder}/{subfolder_name}") + + finally: + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.FOLDER, + application_name=subfolder_name, + private=False, + ) + + def test_discover_recursive(self): + """Test discover with recursive=True""" + # Create a subfolder with a document inside + subfolder_name = self.prefix + "RecursiveSub" + subfolder = FolderApplication(self.tm1py_app_folder, subfolder_name) + doc_name = self.prefix + "RecursiveDoc" + + # Clean up if exists + if self.tm1.applications.exists( + path=self.tm1py_app_folder, application_type=ApplicationTypes.FOLDER, name=subfolder_name, private=False + ): + try: + self.tm1.applications.delete( + path=f"{self.tm1py_app_folder}/{subfolder_name}", + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=False, + ) + except Exception: + pass + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.FOLDER, + application_name=subfolder_name, + private=False, + ) + + self.tm1.applications.create(application=subfolder, private=False) + + # Create document inside subfolder + with open(Path(__file__).parent.joinpath("resources", "document.xlsx"), "rb") as file: + doc = DocumentApplication( + path=f"{self.tm1py_app_folder}/{subfolder_name}", + name=doc_name, + content=file.read(), + ) + self.tm1.applications.create(application=doc, private=False) + + try: + # Discover recursively (nested mode - default) + items = self.tm1.applications.discover( + path=self.tm1py_app_folder, include_private=False, recursive=True, flat=False + ) + + # Find the subfolder and check it has children + found_subfolder = False + for item in items: + if item["name"] == subfolder_name: + found_subfolder = True + self.assertIn("children", item) + child_names = [child["name"] for child in item["children"]] + self.assertIn(doc_name, child_names) + + self.assertTrue(found_subfolder, "Subfolder not found in results") + + finally: + self.tm1.applications.delete( + path=f"{self.tm1py_app_folder}/{subfolder_name}", + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=False, + ) + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.FOLDER, + application_name=subfolder_name, + private=False, + ) + + def test_discover_recursive_flat(self): + """Test discover with recursive=True and flat=True""" + # Create a subfolder with a document inside + subfolder_name = self.prefix + "FlatSub" + subfolder = FolderApplication(self.tm1py_app_folder, subfolder_name) + doc_name = self.prefix + "FlatDoc" + + # Clean up if exists + if self.tm1.applications.exists( + path=self.tm1py_app_folder, application_type=ApplicationTypes.FOLDER, name=subfolder_name, private=False + ): + try: + self.tm1.applications.delete( + path=f"{self.tm1py_app_folder}/{subfolder_name}", + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=False, + ) + except Exception: + pass + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.FOLDER, + application_name=subfolder_name, + private=False, + ) + + self.tm1.applications.create(application=subfolder, private=False) + + # Create document inside subfolder + with open(Path(__file__).parent.joinpath("resources", "document.xlsx"), "rb") as file: + doc = DocumentApplication( + path=f"{self.tm1py_app_folder}/{subfolder_name}", + name=doc_name, + content=file.read(), + ) + self.tm1.applications.create(application=doc, private=False) + + try: + # Discover recursively (flat mode) + items = self.tm1.applications.discover( + path=self.tm1py_app_folder, include_private=False, recursive=True, flat=True + ) + + # All items should be in a flat list + names = [item["name"] for item in items] + self.assertIn(subfolder_name, names) + self.assertIn(doc_name, names) + + # No item should have 'children' key in flat mode + for item in items: + self.assertNotIn("children", item) + + # Check paths are correctly set + for item in items: + if item["name"] == doc_name: + self.assertEqual(item["path"], f"{self.tm1py_app_folder}/{subfolder_name}/{doc_name}") + + finally: + self.tm1.applications.delete( + path=f"{self.tm1py_app_folder}/{subfolder_name}", + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=False, + ) + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.FOLDER, + application_name=subfolder_name, + private=False, + ) + + def test_discover_private_folder_contents(self): + """Test discover inside a private folder""" + # Create a private folder with a document inside + private_folder_name = self.prefix + "DiscoverPrivateFolder" + private_folder = FolderApplication("", private_folder_name) + doc_name = self.prefix + "PrivateDoc" + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + self.tm1.applications.create(application=private_folder, private=True) + + # Create document inside private folder + with open(Path(__file__).parent.joinpath("resources", "document.xlsx"), "rb") as file: + doc = DocumentApplication(path=private_folder_name, name=doc_name, content=file.read()) + self.tm1.applications.create(application=doc, private=True) + + try: + # Discover inside the private folder + items = self.tm1.applications.discover(path=private_folder_name, include_private=True, recursive=False) + + names = [item["name"] for item in items] + self.assertIn(doc_name, names) + + # All items should be marked as private + for item in items: + self.assertTrue(item["is_private"]) + + finally: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + def test_discover_odata_type_included(self): + """Test that discover returns @odata.type field""" + items = self.tm1.applications.discover(path="", include_private=False, recursive=False) + self.assertGreater(len(items), 0) + for item in items: + self.assertIn("@odata.type", item) + self.assertTrue(item["@odata.type"].startswith("#ibm.tm1.api.v1.")) + + # ==================== Tests for document operations in private folders ==================== + + @skip_if_version_lower_than(version="11.4") + def test_document_in_private_folder(self): + """Test document operations (create, get, delete) in a private folder""" + # Create a private folder + private_folder_name = self.prefix + "DocPrivateFolder" + private_folder = FolderApplication("", private_folder_name) + doc_name = self.prefix + "DocInPrivate" + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + self.tm1.applications.create(application=private_folder, private=True) + + try: + # Create document in private folder + with open(Path(__file__).parent.joinpath("resources", "document.xlsx"), "rb") as file: + original_content = file.read() + doc = DocumentApplication(path=private_folder_name, name=doc_name, content=original_content) + self.tm1.applications.create(application=doc, private=True) + + # Verify document exists + exists = self.tm1.applications.exists( + path=private_folder_name, application_type=ApplicationTypes.DOCUMENT, name=doc_name, private=True + ) + self.assertTrue(exists) + + # Get the document and verify content + retrieved_doc = self.tm1.applications.get_document(path=private_folder_name, name=doc_name, private=True) + self.assertEqual(retrieved_doc.name, doc_name) + self.assertEqual(retrieved_doc.content, original_content) + + # Delete the document + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + + # Verify deleted + exists = self.tm1.applications.exists( + path=private_folder_name, application_type=ApplicationTypes.DOCUMENT, name=doc_name, private=True + ) + self.assertFalse(exists) + + finally: + # Clean up folder + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + @skip_if_version_lower_than(version="11.4") + def test_document_in_nested_private_folder(self): + """Test document operations in a nested private folder structure""" + # Create private folder structure: PrivateParent/PrivateChild/Document + parent_folder_name = self.prefix + "NestedParent" + child_folder_name = self.prefix + "NestedChild" + doc_name = self.prefix + "NestedDoc" + + parent_folder = FolderApplication("", parent_folder_name) + child_folder = FolderApplication(parent_folder_name, child_folder_name) + nested_path = f"{parent_folder_name}/{child_folder_name}" + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=parent_folder_name, private=True + ): + try: + self.tm1.applications.delete( + path=nested_path, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + except Exception: + pass + try: + self.tm1.applications.delete( + path=parent_folder_name, + application_type=ApplicationTypes.FOLDER, + application_name=child_folder_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=parent_folder_name, private=True + ) + + # Create folder structure + self.tm1.applications.create(application=parent_folder, private=True) + self.tm1.applications.create(application=child_folder, private=True) + + try: + # Create document in nested folder + with open(Path(__file__).parent.joinpath("resources", "document.xlsx"), "rb") as file: + original_content = file.read() + doc = DocumentApplication(path=nested_path, name=doc_name, content=original_content) + self.tm1.applications.create(application=doc, private=True) + + # Verify document exists + exists = self.tm1.applications.exists( + path=nested_path, application_type=ApplicationTypes.DOCUMENT, name=doc_name, private=True + ) + self.assertTrue(exists) + + # Get the document using get_document + retrieved_doc = self.tm1.applications.get_document(path=nested_path, name=doc_name, private=True) + self.assertEqual(retrieved_doc.content, original_content) + + # Clean up + self.tm1.applications.delete( + path=nested_path, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + + finally: + try: + self.tm1.applications.delete( + path=parent_folder_name, + application_type=ApplicationTypes.FOLDER, + application_name=child_folder_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=parent_folder_name, private=True + ) + + # ==================== Tests for copying documents between private/public ==================== + + @skip_if_version_lower_than(version="11.4") + def test_copy_private_document_to_public(self): + """Test copying a document from a private folder to a public location""" + # Create a private folder with a document + private_folder_name = self.prefix + "CopyPrivateSource" + private_folder = FolderApplication("", private_folder_name) + doc_name = self.prefix + "CopyDoc" + public_doc_name = self.prefix + "CopyDocPublic" + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + # Clean up public document + if self.tm1.applications.exists( + path=self.tm1py_app_folder, application_type=ApplicationTypes.DOCUMENT, name=public_doc_name, private=False + ): + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.DOCUMENT, + application_name=public_doc_name, + private=False, + ) + + # Create private folder and document + self.tm1.applications.create(application=private_folder, private=True) + with open(Path(__file__).parent.joinpath("resources", "document.xlsx"), "rb") as file: + original_content = file.read() + private_doc = DocumentApplication(path=private_folder_name, name=doc_name, content=original_content) + self.tm1.applications.create(application=private_doc, private=True) + + try: + # Step 1: Get the private document + retrieved_private = self.tm1.applications.get_document( + path=private_folder_name, name=doc_name, private=True + ) + self.assertEqual(retrieved_private.content, original_content) + + # Step 2: Create a public document with the same content + public_doc = DocumentApplication( + path=self.tm1py_app_folder, name=public_doc_name, content=retrieved_private.content + ) + self.tm1.applications.create(application=public_doc, private=False) + + # Step 3: Verify the public document exists and has correct content + exists = self.tm1.applications.exists( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.DOCUMENT, + name=public_doc_name, + private=False, + ) + self.assertTrue(exists) + + retrieved_public = self.tm1.applications.get_document( + path=self.tm1py_app_folder, name=public_doc_name, private=False + ) + self.assertEqual(retrieved_public.content, original_content) + + finally: + # Clean up + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + try: + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.DOCUMENT, + application_name=public_doc_name, + private=False, + ) + except Exception: + pass + + @skip_if_version_lower_than(version="11.4") + def test_copy_public_document_to_private(self): + """Test copying a document from a public folder to a private location""" + # Create a public document + public_doc_name = self.prefix + "PublicSourceDoc" + private_folder_name = self.prefix + "CopyPrivateTarget" + private_doc_name = self.prefix + "CopyDocPrivate" + + # Clean up public document + if self.tm1.applications.exists( + path=self.tm1py_app_folder, application_type=ApplicationTypes.DOCUMENT, name=public_doc_name, private=False + ): + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.DOCUMENT, + application_name=public_doc_name, + private=False, + ) + + # Clean up private folder + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=private_doc_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + # Create public document + with open(Path(__file__).parent.joinpath("resources", "document.xlsx"), "rb") as file: + original_content = file.read() + public_doc = DocumentApplication(path=self.tm1py_app_folder, name=public_doc_name, content=original_content) + self.tm1.applications.create(application=public_doc, private=False) + + # Create private folder + private_folder = FolderApplication("", private_folder_name) + self.tm1.applications.create(application=private_folder, private=True) + + try: + # Step 1: Get the public document + retrieved_public = self.tm1.applications.get_document( + path=self.tm1py_app_folder, name=public_doc_name, private=False + ) + self.assertEqual(retrieved_public.content, original_content) + + # Step 2: Create a private document with the same content + private_doc = DocumentApplication( + path=private_folder_name, name=private_doc_name, content=retrieved_public.content + ) + self.tm1.applications.create(application=private_doc, private=True) + + # Step 3: Verify the private document exists and has correct content + exists = self.tm1.applications.exists( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + name=private_doc_name, + private=True, + ) + self.assertTrue(exists) + + retrieved_private = self.tm1.applications.get_document( + path=private_folder_name, name=private_doc_name, private=True + ) + self.assertEqual(retrieved_private.content, original_content) + + finally: + # Clean up + try: + self.tm1.applications.delete( + path=self.tm1py_app_folder, + application_type=ApplicationTypes.DOCUMENT, + application_name=public_doc_name, + private=False, + ) + except Exception: + pass + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=private_doc_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + @skip_if_version_lower_than(version="11.4") + def test_update_or_create_private_document(self): + """Test update_or_create for a document in a private folder""" + # Create a private folder + private_folder_name = self.prefix + "UpdateCreatePrivate" + private_folder = FolderApplication("", private_folder_name) + doc_name = self.prefix + "UpdateCreateDoc" + + # Clean up if exists + if self.tm1.applications.exists( + path="", application_type=ApplicationTypes.FOLDER, name=private_folder_name, private=True + ): + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + ) + + self.tm1.applications.create(application=private_folder, private=True) + + try: + # First update_or_create - should create + with open(Path(__file__).parent.joinpath("resources", "document.xlsx"), "rb") as file: + original_content = file.read() + doc = DocumentApplication(path=private_folder_name, name=doc_name, content=original_content) + self.tm1.applications.update_or_create(application=doc, private=True) + + # Verify created + exists = self.tm1.applications.exists( + path=private_folder_name, application_type=ApplicationTypes.DOCUMENT, name=doc_name, private=True + ) + self.assertTrue(exists) + + # Second update_or_create - should update + with open(Path(__file__).parent.joinpath("resources", "file.csv"), "rb") as file: + new_content = file.read() + doc_updated = DocumentApplication(path=private_folder_name, name=doc_name, content=new_content) + self.tm1.applications.update_or_create(application=doc_updated, private=True) + + # Verify updated content + retrieved = self.tm1.applications.get_document(path=private_folder_name, name=doc_name, private=True) + self.assertEqual(retrieved.content, new_content) + + finally: + try: + self.tm1.applications.delete( + path=private_folder_name, + application_type=ApplicationTypes.DOCUMENT, + application_name=doc_name, + private=True, + ) + except Exception: + pass + self.tm1.applications.delete( + path="", application_type=ApplicationTypes.FOLDER, application_name=private_folder_name, private=True + )