Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 333 additions & 3 deletions cassis/cas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cassis.typesystem import (
FEATURE_BASE_NAME_HEAD,
FEATURE_BASE_NAME_LANGUAGE,
TYPE_NAME_ARRAY_BASE,
TYPE_NAME_DOCUMENT_ANNOTATION,
TYPE_NAME_ANNOTATION,
TYPE_NAME_FS_ARRAY,
Expand All @@ -24,6 +25,7 @@
TypeCheckError,
TypeSystem,
TypeSystemMode,
load_typesystem,
)

_validator_optional_string = validators.optional(validators.instance_of(str))
Expand Down Expand Up @@ -903,9 +905,8 @@ def _find_all_fs(
elif feature.rangeType.name == TYPE_NAME_FS_LIST and hasattr(feature_value, FEATURE_BASE_NAME_HEAD):
v = feature_value
while hasattr(v, FEATURE_BASE_NAME_HEAD):
if not v.head or v.head.xmiID in all_fs:
continue
openlist.append(v.head)
if v.head and v.head.xmiID not in all_fs:
openlist.append(v.head)
v = v.tail
# For primitive arrays / lists, we do not need to handle the elements
continue
Expand Down Expand Up @@ -937,6 +938,335 @@ def _copy(self) -> "Cas":
result._xmi_id_generator = self._xmi_id_generator
return result

def deep_copy(self, copy_typesystem: bool = False) -> "Cas":
"""
Create and return a deep copy of this CAS object.
All feature structures, views, and sofas are copied. If `copy_typesystem` is True, the typesystem is also deep-copied;
otherwise, the original typesystem is shared between the original and the copy.
Args:
copy_typesystem (bool): Whether to copy the original typesystem or not. If True, the typesystem is deep-copied.
Returns:
Cas: A deep copy of this CAS object.
"""
ts = self.typesystem
if copy_typesystem:
ts = self.typesystem.to_xml()
ts = load_typesystem(ts)

cas_copy = Cas(ts, lenient=self._lenient)

cas_copy._views = {}
cas_copy._sofas = {}

def _collect_fs_list_references(fs_list: FeatureStructure) -> List[Optional[int]]:
referenced_list = []
current = fs_list

while hasattr(current, FEATURE_BASE_NAME_HEAD):
head = current.head
if head is None:
referenced_list.append(None)
elif hasattr(head, "xmiID") and head.xmiID is not None:
referenced_list.append(head.xmiID)
else:
warnings.warn("FSList item without xmiID encountered during deep copy; preserving as None in copy.")
referenced_list.append(None)
Comment on lines +966 to +973

current = current.tail

return referenced_list

def _build_fs_list(referenced_list: List[Optional[int]]) -> FeatureStructure:
current = ts.get_type("uima.cas.EmptyFSList")()

for reference_id in reversed(referenced_list):
node = ts.get_type("uima.cas.NonEmptyFSList")()
node.tail = current
node.head = all_copied_fs.get(reference_id) if reference_id is not None else None
current = node

return current

for sofa in self.sofas:
sofa_copy = Sofa(
sofaID=sofa.sofaID,
sofaNum=sofa.sofaNum,
type=ts.get_type(sofa.type.name),
xmiID=sofa.xmiID,
)
sofa_copy.mimeType = sofa.mimeType
sofa_copy.sofaArray = sofa.sofaArray
sofa_copy.sofaString = sofa.sofaString
sofa_copy.sofaURI = sofa.sofaURI

cas_copy._sofas[sofa_copy.sofaID] = sofa_copy
cas_copy._views[sofa_copy.sofaID] = View(sofa=sofa_copy)

# Set the current view to the `_InitialView` entry in the copied CAS.
# (`Cas.__init__` creates an `_InitialView`; here we point the current
# view at that entry in the `cas_copy._views` mapping so subsequent
# `add()` calls index into the initial view by default.)
cas_copy._current_view = cas_copy._views["_InitialView"]

references = dict()
referenced_arrays = dict()
referenced_fs_arrays = dict()
referenced_primitive_arrays = dict()
referenced_lists = dict()
# for primitive lists (e.g. IntegerList) we collect primitive head values
referenced_primitive_lists = dict()

all_copied_fs = dict()
referenced_view = defaultdict(list)

for view in self.views:
for member in view.get_all_annotations():
if hasattr(member, "xmiID") and member.xmiID is not None:
if view.sofa.sofaID not in referenced_view[member.xmiID]:
referenced_view[member.xmiID].append(view.sofa.sofaID)

# Ensure sofa.sofaArray feature structures are discovered even when they
# are not indexed in any view. `_find_all_fs(seeds=...)` replaces the
# default traversal roots, so we include both the original indexed view
# members and any sofaArray roots here.
traversal_seeds = []
for sofa in self.sofas:
traversal_seeds.extend(self.get_view(sofa.sofaID).select_all())
if getattr(sofa, "sofaArray", None) is not None:
traversal_seeds.append(sofa.sofaArray)

for fs in self._find_all_fs(seeds=traversal_seeds):
t = ts.get_type(fs.type.name)
fs_copy = t()

if t.name == TYPE_NAME_FS_ARRAY and fs.elements is not None:
referenced_list = []
for item in fs.elements:
if item is None:
referenced_list.append(None)
elif hasattr(item, "xmiID") and item.xmiID is not None:
referenced_list.append(item.xmiID)
else:
warnings.warn(
f"Standalone FSArray {fs.xmiID} contains an unidentifiable item; preserving as None in copy."
)
referenced_list.append(None)
Comment on lines +1044 to +1053
Comment on lines +1044 to +1053

referenced_fs_arrays[fs.xmiID] = referenced_list
elif t.supertype.name == TYPE_NAME_ARRAY_BASE and fs.elements is not None:
referenced_primitive_arrays[fs.xmiID] = list(fs.elements)

for feature in t.all_features:
if t.supertype.name == TYPE_NAME_ARRAY_BASE and feature.name == "elements":
continue

if ts.is_primitive(feature.rangeType):
fs_copy[feature.name] = fs.get(feature.name)
elif ts.is_primitive_collection(feature.rangeType):
val = fs.get(feature.name)
if val is None:
continue

if feature.multipleReferencesAllowed and hasattr(val, "xmiID") and val.xmiID is not None:
references.setdefault(feature.name, [])
references[feature.name].append((fs.xmiID, val.xmiID))
continue

# Distinguish primitive arrays (have `elements`) from primitive lists (use head/tail)
if ts.is_array(feature.rangeType):
fs_copy[feature.name] = ts.get_type(feature.rangeType.name)()
# shallow-copy the elements list to avoid sharing the same list object
fs_copy[feature.name].elements = list(val.elements)
elif ts.is_list(feature.rangeType):
# collect primitive values from head/tail style lists
current = val
prim_list = []
while hasattr(current, FEATURE_BASE_NAME_HEAD):
head = getattr(current, FEATURE_BASE_NAME_HEAD)
prim_list.append(head)
current = current.tail

# store the primitive list values along with the declared range type name
referenced_primitive_lists.setdefault(fs.xmiID, {})
referenced_primitive_lists[fs.xmiID][feature.name] = (
feature.rangeType.name,
prim_list,
)
elif ts.is_array(feature.rangeType):
val = fs[feature.name]
if val is None:
continue

# If the array itself may be shared (multipleReferencesAllowed), preserve
# its identity by treating it like any other FS reference and wiring it
# up later via `references`. Only inline-copy arrays when they are not
# declared shareable.
if feature.multipleReferencesAllowed and hasattr(val, "xmiID") and val.xmiID is not None:
references.setdefault(feature.name, [])
references[feature.name].append((fs.xmiID, val.xmiID))
else:
fs_copy[feature.name] = ts.get_type(TYPE_NAME_FS_ARRAY)()
# collect referenced xmiIDs for mapping later and preserve None placeholders
referenced_list = []
for item in val.elements:
if item is None:
referenced_list.append(None)
elif hasattr(item, "xmiID") and item.xmiID is not None:
referenced_list.append(item.xmiID)
else:
warnings.warn(
f"Array feature '{feature.name}' of FS {fs.xmiID} contains an unidentifiable item; preserving as None in copy."
)
referenced_list.append(None)
Comment on lines +1113 to +1120
referenced_arrays.setdefault(fs.xmiID, {})
referenced_arrays[fs.xmiID][feature.name] = referenced_list
elif ts.is_list(feature.rangeType):
val = fs[feature.name]
if val is None:
continue

if feature.multipleReferencesAllowed and hasattr(val, "xmiID") and val.xmiID is not None:
references.setdefault(feature.name, [])
references[feature.name].append((fs.xmiID, val.xmiID))
else:
referenced_lists.setdefault(fs.xmiID, {})
referenced_lists[fs.xmiID][feature.name] = _collect_fs_list_references(val)
elif feature.rangeType.name == TYPE_NAME_SOFA:
# ignore sofa references
pass
else:
val = fs[feature.name]
# If the original feature value is None, preserve it without warning
if val is None:
continue
if hasattr(val, "xmiID") and val.xmiID is not None:
references.setdefault(feature.name, [])
references[feature.name].append((fs.xmiID, val.xmiID))
else:
warnings.warn(
f'Original non-primitive feature "{feature.name}" was not copied from feature structure {fs.xmiID}.'
)

fs_copy.xmiID = fs.xmiID
all_copied_fs[fs_copy.xmiID] = fs_copy

# set references to single objects
for feature, pairs in references.items():
for current_ID, reference_ID in pairs:
try:
all_copied_fs[current_ID][feature] = all_copied_fs[reference_ID]
except KeyError:
warnings.warn(
f"Reference {reference_ID} not found for feature '{feature}' of feature structure {current_ID}"
)

# set references for objects in arrays
for current_ID, arrays in referenced_arrays.items():
for feature, referenced_list in arrays.items():
elements = []
for reference_ID in referenced_list:
if reference_ID is None:
elements.append(None)
continue
try:
elements.append(all_copied_fs[reference_ID])
except KeyError:
warnings.warn(
f"Reference {reference_ID} not found for array feature '{feature}' of feature structure {current_ID}; inserting None."
)
elements.append(None)
all_copied_fs[current_ID][feature].elements = elements

for current_ID, referenced_list in referenced_fs_arrays.items():
elements = []
for reference_ID in referenced_list:
if reference_ID is None:
elements.append(None)
continue
try:
elements.append(all_copied_fs[reference_ID])
except KeyError:
warnings.warn(
f"Reference {reference_ID} not found for standalone FSArray {current_ID}; inserting None."
)
elements.append(None)
all_copied_fs[current_ID].elements = elements

for current_ID, elements in referenced_primitive_arrays.items():
all_copied_fs[current_ID].elements = list(elements)

# rebuild FSList features from copied members
for current_ID, lists in referenced_lists.items():
for feature, referenced_list in lists.items():
all_copied_fs[current_ID][feature] = _build_fs_list(referenced_list)

# rebuild primitive head/tail lists (e.g. IntegerList, FloatList, StringList)
for current_ID, lists in referenced_primitive_lists.items():
for feature, (list_type_name, primitive_values) in lists.items():
# derive Empty/NonEmpty concrete type names from the abstract list type
suffix = list_type_name.split(".")[-1]
empty_name = f"uima.cas.Empty{suffix}"
nonempty_name = f"uima.cas.NonEmpty{suffix}"

current = ts.get_type(empty_name)()
for value in reversed(primitive_values):
node = ts.get_type(nonempty_name)()
node.tail = current
node.head = value
current = node

all_copied_fs[current_ID][feature] = current

# ensure Sofa.sofaArray references point to the copied feature structures
# Use the original CAS's sofas to locate the original sofaArray objects
# (safer than relying on sofa_copy.sofaArray pointing back to the original
# object in all cases) and remap them to the copied FS when available.
for orig_sofa in self.sofas:
sofa_copy = cas_copy._sofas.get(orig_sofa.sofaID)
if sofa_copy is None:
continue
orig_sofa_array = getattr(orig_sofa, "sofaArray", None)
if hasattr(orig_sofa_array, "xmiID") and orig_sofa_array.xmiID in all_copied_fs:
sofa_copy.sofaArray = all_copied_fs[orig_sofa_array.xmiID]

# Add only original view members back to the copied indices. Referenced
# feature structures that were not indexed in any original view remain
# reachable transitively and will still be serialized by `_find_all_fs()`.
feature_structures = sorted(all_copied_fs.values(), key=lambda f: f.xmiID, reverse=False)
for item in feature_structures:
if not hasattr(item, "xmiID") or item.xmiID is None:
continue

view_names = referenced_view.get(item.xmiID)
if not view_names:
continue

# Use the normal add-path once so FS with a `sofa` feature are rebound
# to the copied sofa in their primary view. Any additional view
# memberships are restored by indexing the same FS directly to avoid
# mutating its `sofa` repeatedly.
cas_copy._current_view = cas_copy._views[view_names[0]]
cas_copy.add(item, keep_id=True)

for view_name in view_names[1:]:
cas_copy._views[view_name].add_annotation_to_index(item)

cas_copy._xmi_id_generator = IdGenerator(initial_id=self._xmi_id_generator._next_id)
cas_copy._sofa_num_generator = IdGenerator(initial_id=self._sofa_num_generator._next_id)

# Restore the active view on the copy to match the source CAS' current view.
# During re-indexing we may have set `cas_copy._current_view` multiple
# times; ensure the returned copy has the same active sofa as `self`.
try:
active_sofa_id = self.get_sofa().sofaID
except Exception:
active_sofa_id = "_InitialView"

if active_sofa_id in cas_copy._views:
cas_copy._current_view = cas_copy._views[active_sofa_id]

return cas_copy


def _sort_func(a: FeatureStructure) -> Tuple[int, int, int]:
d = a.__slots__
Expand Down
Loading
Loading