From a9fa046d65ed948354cf5226e23e5f45c1e8a098 Mon Sep 17 00:00:00 2001 From: Josh Horton Date: Fri, 19 Jun 2026 12:08:47 +0100 Subject: [PATCH 1/2] copy over openfe improvements --- feflow/utils/hybrid_topology.py | 1918 +++++++++++++------------------ 1 file changed, 822 insertions(+), 1096 deletions(-) diff --git a/feflow/utils/hybrid_topology.py b/feflow/utils/hybrid_topology.py index 35decf7c..6352a927 100644 --- a/feflow/utils/hybrid_topology.py +++ b/feflow/utils/hybrid_topology.py @@ -3,16 +3,20 @@ # The eventual goal is to move a version of this towards openmmtools # LICENSE: MIT -import logging -import openmm -from openmm import unit, app -import numpy as np +# turn off formatting since this is mostly vendored code +# fmt: off + import copy import itertools +import logging + +import mdtraj as mdt +import numpy as np +import openmm +from openmm import app, unit # OpenMM constant for Coulomb interactions (implicitly in md_unit_system units) from openmmtools.constants import ONE_4PI_EPS0 -import mdtraj as mdt logger = logging.getLogger(__name__) @@ -78,23 +82,15 @@ class HybridTopologyFactory: """ - def __init__( - self, - old_system, - old_positions, - old_topology, - new_system, - new_positions, - new_topology, - old_to_new_atom_map, - old_to_new_core_atom_map, - use_dispersion_correction=False, - softcore_alpha=0.5, - softcore_LJ_v2=True, - softcore_LJ_v2_alpha=0.85, - interpolate_old_and_new_14s=False, - **kwargs, - ): + def __init__(self, + old_system, old_positions, old_topology, + new_system, new_positions, new_topology, + old_to_new_atom_map, old_to_new_core_atom_map, + use_dispersion_correction=False, + softcore_alpha=0.5, + softcore_LJ_v2=True, + softcore_LJ_v2_alpha=0.85, + interpolate_old_and_new_14s=False): """ Initialize the Hybrid topology factory. @@ -142,7 +138,7 @@ def __init__( self._new_system = copy.deepcopy(new_system) self._new_positions = new_positions self._new_topology = new_topology - self._hybrid_system_forces = {} + self._hybrid_system_forces = dict() # Set mappings (full, core, and env maps) self._set_mappings(old_to_new_atom_map, old_to_new_core_atom_map) @@ -181,11 +177,9 @@ def __init__( # Construct dictionary of exceptions in old and new systems self._old_system_exceptions = self._generate_dict_from_exceptions( - self._old_system_forces["NonbondedForce"] - ) + self._old_system_forces['NonbondedForce']) self._new_system_exceptions = self._generate_dict_from_exceptions( - self._new_system_forces["NonbondedForce"] - ) + self._new_system_forces['NonbondedForce']) # check for exceptions clashes between unique and env atoms self._validate_disjoint_sets() @@ -206,10 +200,8 @@ def __init__( self._add_torsion_force_terms() - has_nonbonded_force = ( - "NonbondedForce" in self._old_system_forces - or "NonbondedForce" in self._new_system_forces - ) + has_nonbonded_force = ('NonbondedForce' in self._old_system_forces or + 'NonbondedForce' in self._new_system_forces) if has_nonbonded_force: self._add_nonbonded_force_terms() @@ -223,12 +215,13 @@ def __init__( self._handle_periodic_torsion_force() + # add cmap terms if possible + self._handle_cmap_torsion_force() + if has_nonbonded_force: self._handle_nonbonded() - if not ( - len(self._old_system_exceptions.keys()) == 0 - and len(self._new_system_exceptions.keys()) == 0 - ): + if not (len(self._old_system_exceptions.keys()) == 0 and + len(self._new_system_exceptions.keys()) == 0): self._handle_old_new_exceptions() # Get positions for the hybrid @@ -239,6 +232,155 @@ def __init__( self._omm_hybrid_topology = self._create_hybrid_topology() logger.info("Hybrid system created") + @staticmethod + def _verify_cmap_compatibility( + cmap_old: openmm.CMAPTorsionForce, + cmap_new: openmm.CMAPTorsionForce, + ) -> tuple[ + int, + int, + int, + int, + ]: + """ + Verify CMAPTorsionForce compatibility between two systems. + + Parameters + ---------- + cmap_old : openmm.CMAPTorsionForce + CMAPTorsionForce from the old system + cmap_new : openmm.CMAPTorsionForce + CMAPTorsionForce from the new system + + Returns + ------- + tuple + (old_num_maps, new_num_maps, old_num_torsions, new_num_torsions) + four integers describing the number of maps and + torsions in each force. + + Raises + ------ + RuntimeError + If only one of the forces is present, or if the number of maps or the + number of torsions differs between the two forces. + """ + logger.info("CMAPTorsionForce found checking compatibility") + + # some quick checks on compatibility like the number of maps and total number of terms + old_num_maps = cmap_old.getNumMaps() + new_num_maps = cmap_new.getNumMaps() + if old_num_maps != new_num_maps: + raise RuntimeError( + f"Incompatible CMAPTorsionForce between end states expected to have same number of maps, " + f"found old: {old_num_maps} and new: {new_num_maps}") + + old_num_torsions = cmap_old.getNumTorsions() + new_num_torsions = cmap_new.getNumTorsions() + if old_num_torsions != new_num_torsions: + raise RuntimeError( + f"Incompatible CMAPTorsionForce between end states expected to have same number of torsions, " + f"found old: {old_num_torsions} and new: {new_num_torsions}") + + return old_num_maps, new_num_maps, old_num_torsions, new_num_torsions + + def _handle_cmap_torsion_force(self): + """ + This method does the following in order: + - Some basic checks that the CMAPTorsionForce exists in both old/new systems. + - Adds the CMAPTorsionForce from the old system + - Checks that the new system CMAPTorsionForce terms are equal to the old system's (we do not allow for alchemically changing CMAP terms). + """ + cmap_old = self._old_system_forces.get("CMAPTorsionForce", None) + cmap_new = self._new_system_forces.get("CMAPTorsionForce", None) + + # if only one has cmap raise an error + if (cmap_new is None) ^ (cmap_old is None): + raise RuntimeError(f"Inconsistent CMAPTorsionForce between end states expected to be present in both" + f"but found in old: {bool(cmap_old)} and new: {bool(cmap_new)}") + + if cmap_new == cmap_old is None: + logger.info("No CMAPTorsionForce found. Skipping adding force.") + return + + # verify compatibility and extract numbers of maps and torsions + ( + old_num_maps, + new_num_maps, + old_num_torsions, + new_num_torsions + ) = self._verify_cmap_compatibility( + cmap_old, cmap_new + ) + + logger.info("Adding CMAPTorsionForce to hybrid system") + # start looping through the old system terms and add them to the hybrid system + # track the terms we add so we can cross compare with the new system and also make sure we don't hit + # an index in the alchemical region + hybrid_cmap_force = openmm.CMAPTorsionForce() + self._hybrid_system.addForce(hybrid_cmap_force) + self._hybrid_system_forces["cmap_torsion_force"] = hybrid_cmap_force + + old_system_maps = {} + old_system_terms = {} + + logger.info("Adding CMAP forces") + # add all the old maps + for i in range(old_num_maps): + size, energy = cmap_old.getMapParameters(i) + old_system_maps[i] = (size, energy) + # also add the map to the hybrid system + hybrid_cmap_force.addMap(size, energy) + + logger.info("Adding CMAP force terms") + # now add the terms we need to map from the old to the new index + old_to_hybrid_index = self._old_to_hybrid_map + new_to_hybrid_index = self._new_to_hybrid_map + for i in range(old_num_torsions): + # get the parameters for the torsion using the same notation as OpenMM + map_index, a1, a2, a3, a4, b1, b2, b3, b4 = cmap_old.getTorsionParameters(i) + atom_ids = [a1, a2, a3, a4, b1, b2, b3, b4] + # map to hybrid indices + hybrid_atom_ids = [old_to_hybrid_index[a_id] for a_id in atom_ids] + # add to the hybrid system using the hybrid index + hybrid_cmap_force.addTorsion(map_index, *hybrid_atom_ids) + # track the atoms we add in the hybrid system to cross compare with new system + old_system_terms[tuple(hybrid_atom_ids)] = map_index + + # gather all alchemical atoms, use a copy so we don't change the groups + alchemical_atoms = self._atom_classes["core_atoms"].copy() + alchemical_atoms.update(self._atom_classes["unique_old_atoms"], self._atom_classes["unique_new_atoms"]) + # check if any of the atoms added are in the alchemical region + old_added_atoms = {atom_id for atoms in old_system_terms.keys() for atom_id in atoms} + if overlap_atoms := alchemical_atoms.intersection(old_added_atoms): + raise RuntimeError( + f"Incompatible CMAPTorsionForce term found in alchemical region for old system atoms {overlap_atoms}") + + # now loop over the new system force and check the terms are compatible + # we expect to add no new terms + for i in range(new_num_maps): + size, energy = cmap_new.getMapParameters(i) + if (size, energy) != old_system_maps[i]: + raise RuntimeError(f"Incompatible CMAPTorsionForce map parameters found between end states for map {i} " + f"expected {old_system_maps[i]} found {(size, energy)}") + + for i in range(new_num_torsions): + map_index, a1, a2, a3, a4, b1, b2, b3, b4 = cmap_new.getTorsionParameters(i) + atom_ids = [a1, a2, a3, a4, b1, b2, b3, b4] + # map to hybrid indices + hybrid_atom_ids = [new_to_hybrid_index[a_id] for a_id in atom_ids] + # check its in the old system terms + if tuple(hybrid_atom_ids) not in old_system_terms.keys(): + raise RuntimeError( + f"Incompatible CMAPTorsionForce term found between end states for atoms {hybrid_atom_ids} " + f"not found in old system terms.") + # check the map index is the same + if map_index != old_system_terms[tuple(hybrid_atom_ids)]: + raise RuntimeError( + f"Incompatible CMAPTorsionForce map index found between end states for atoms {hybrid_atom_ids} " + f"expected {old_system_terms[tuple(hybrid_atom_ids)]} found {map_index}") + logger.info("CMAPTorsionForce added to the hybrid system") + @staticmethod def _check_bounds(value, varname, minmax=(0, 1)): """ @@ -266,19 +408,40 @@ def _invert_dict(dictionary): """ Convenience method to invert a dictionary (since we do it so often). - Paramters: + Parameters: ---------- dictionary : dict Dictionary you want to invert """ return {v: k for k, v in dictionary.items()} + @staticmethod + def _pair_key(particle1, particle2): + """ + Convenience method to generate a key for a pair of atom indexes with consistent normalization. + + Parameters + ---------- + particle1 : int + Index of first particle in exception + particle2 : int + Index of second particle in exception + + Returns + ------- + tuple + Sorted tuple of the two particle indices, which is used as a key in the exception dicts. + """ + return (particle1, particle2) if particle1 < particle2 else (particle2, particle1) + def _set_mappings(self, old_to_new_map, core_old_to_new_map): """ Parameters ---------- old_to_new_map : dict of int : int Dictionary mapping atoms between the old and new systems. + core_old_to_new_map: dict[int,int] + Dictionary mapping core atoms between the old and new systems. This is a subset of the old_to_new_map. Notes ----- @@ -329,40 +492,32 @@ def _check_and_store_system_forces(self): def _check_unknown_forces(forces, system_name): # TODO: double check that CMMotionRemover is ok being here known_forces = { - "HarmonicBondForce", - "HarmonicAngleForce", - "PeriodicTorsionForce", - "NonbondedForce", - "MonteCarloBarostat", - "CMMotionRemover", + 'HarmonicBondForce', 'HarmonicAngleForce', + 'PeriodicTorsionForce', 'NonbondedForce', + 'MonteCarloBarostat', 'CMMotionRemover', + 'CMAPTorsionForce', 'MonteCarloMembraneBarostat', } force_names = forces.keys() unknown_forces = set(force_names) - set(known_forces) if unknown_forces: - errmsg = ( - f"Unknown forces {unknown_forces} encountered in " - f"{system_name} system" - ) + errmsg = (f"Unknown forces {unknown_forces} encountered in " + f"{system_name} system") raise ValueError(errmsg) # Prepare dicts of forces, which will be useful later # TODO: Store this as self._system_forces[name], name in ('old', # 'new', 'hybrid') for compactness - self._old_system_forces = { - type(force).__name__: force for force in self._old_system.getForces() - } - _check_unknown_forces(self._old_system_forces, "old") - self._new_system_forces = { - type(force).__name__: force for force in self._new_system.getForces() - } - _check_unknown_forces(self._new_system_forces, "new") + self._old_system_forces = {type(force).__name__: force for force in + self._old_system.getForces()} + _check_unknown_forces(self._old_system_forces, 'old') + self._new_system_forces = {type(force).__name__: force for force in + self._new_system.getForces()} + _check_unknown_forces(self._new_system_forces, 'new') # TODO: check if this is actually used much, otherwise ditch it # Get and store the nonbonded method from the system: - self._nonbonded_method = self._old_system_forces[ - "NonbondedForce" - ].getNonbondedMethod() + self._nonbonded_method = self._old_system_forces['NonbondedForce'].getNonbondedMethod() def _add_particles(self): """ @@ -385,7 +540,8 @@ def _add_particles(self): if particle_idx in self._old_to_new_map.keys(): particle_idx_new_system = self._old_to_new_map[particle_idx] - mass_new = self._new_system.getParticleMass(particle_idx_new_system) + mass_new = self._new_system.getParticleMass( + particle_idx_new_system) # Take the average of the masses if the atom is mapped particle_mass = (mass_old + mass_new) / 2 else: @@ -416,9 +572,17 @@ def _handle_box(self): """ # Check that if there is a barostat in the old system, # it is added to the hybrid system - if "MonteCarloBarostat" in self._old_system_forces.keys(): - barostat = copy.deepcopy(self._old_system_forces["MonteCarloBarostat"]) + present_barostat = [ + i for i in self._old_system_forces.keys() + if i in ["MonteCarloBarostat", "MonteCarloMembraneBarostat"] + ] + if len(present_barostat) == 1: + barostat = copy.deepcopy( + self._old_system_forces[present_barostat[0]]) self._hybrid_system.addForce(barostat) + elif len(present_barostat) > 1: + errmsg = "More than 1 barostat are present which is not supported" + raise ValueError(errmsg) # Copy over the box vectors from the old system box_vectors = self._old_system.getDefaultPeriodicBoxVectors() @@ -430,47 +594,41 @@ def _set_atom_classes(self): unique new, core, or environment, as defined in the class docstring. All indices are indices in the hybrid system. """ - self._atom_classes = { - "unique_old_atoms": set(), - "unique_new_atoms": set(), - "core_atoms": set(), - "environment_atoms": set(), - } + self._atom_classes = {'unique_old_atoms': set(), + 'unique_new_atoms': set(), + 'core_atoms': set(), + 'environment_atoms': set()} # First, find the unique old atoms for atom_idx in self._unique_old_atoms: hybrid_idx = self._old_to_hybrid_map[atom_idx] - self._atom_classes["unique_old_atoms"].add(hybrid_idx) + self._atom_classes['unique_old_atoms'].add(hybrid_idx) # Then the unique new atoms for atom_idx in self._unique_new_atoms: hybrid_idx = self._new_to_hybrid_map[atom_idx] - self._atom_classes["unique_new_atoms"].add(hybrid_idx) + self._atom_classes['unique_new_atoms'].add(hybrid_idx) # The core atoms: for new_idx, old_idx in self._core_new_to_old_map.items(): new_to_hybrid_idx = self._new_to_hybrid_map[new_idx] old_to_hybrid_idx = self._old_to_hybrid_map[old_idx] if new_to_hybrid_idx != old_to_hybrid_idx: - errmsg = ( - f"there is an index collision in hybrid indices of " - f"the core atom map: {self._core_new_to_old_map}" - ) + errmsg = (f"there is an index collision in hybrid indices of " + f"the core atom map: {self._core_new_to_old_map}") raise AssertionError(errmsg) - self._atom_classes["core_atoms"].add(new_to_hybrid_idx) + self._atom_classes['core_atoms'].add(new_to_hybrid_idx) # The environment atoms: for new_idx, old_idx in self._env_new_to_old_map.items(): new_to_hybrid_idx = self._new_to_hybrid_map[new_idx] old_to_hybrid_idx = self._old_to_hybrid_map[old_idx] if new_to_hybrid_idx != old_to_hybrid_idx: - errmsg = ( - f"there is an index collion in hybrid indices of " - f"the environment atom map: " - f"{self._env_new_to_old_map}" - ) + errmsg = (f"there is an index collion in hybrid indices of " + f"the environment atom map: " + f"{self._env_new_to_old_map}") raise AssertionError(errmsg) - self._atom_classes["environment_atoms"].add(new_to_hybrid_idx) + self._atom_classes['environment_atoms'].add(new_to_hybrid_idx) @staticmethod def _generate_dict_from_exceptions(force): @@ -488,14 +646,18 @@ def _generate_dict_from_exceptions(force): ------- exceptions_dict : dict Dictionary of exceptions + + Note + ---- + * The keys of the dictionary are sorted tuples of the particle indices + to make it easier to search for exceptions between two particles + without worrying about order. """ exceptions_dict = {} for exception_index in range(force.getNumExceptions()): - [index1, index2, chargeProd, sigma, epsilon] = force.getExceptionParameters( - exception_index - ) - exceptions_dict[(index1, index2)] = [chargeProd, sigma, epsilon] + [index1, index2, chargeProd, sigma, epsilon] = force.getExceptionParameters(exception_index) + exceptions_dict[HybridTopologyFactory._pair_key(index1, index2)] = [chargeProd, sigma, epsilon] return exceptions_dict @@ -508,82 +670,60 @@ def _validate_disjoint_sets(self): TODO: repeated code - condense """ for old_indices in self._old_system_exceptions.keys(): - hybrid_indices = ( - self._old_to_hybrid_map[old_indices[0]], - self._old_to_hybrid_map[old_indices[1]], - ) + hybrid_indices = (self._old_to_hybrid_map[old_indices[0]], + self._old_to_hybrid_map[old_indices[1]]) old_env_intersection = set(old_indices).intersection( - self._atom_classes["environment_atoms"] - ) + self._atom_classes['environment_atoms']) if old_env_intersection: if set(old_indices).intersection( - self._atom_classes["unique_old_atoms"] + self._atom_classes['unique_old_atoms'] ): - errmsg = ( - f"old index exceptions {old_indices} include " - "unique old and environment atoms, which is " - "disallowed" - ) + errmsg = (f"old index exceptions {old_indices} include " + "unique old and environment atoms, which is " + "disallowed") raise AssertionError(errmsg) for new_indices in self._new_system_exceptions.keys(): - hybrid_indices = ( - self._new_to_hybrid_map[new_indices[0]], - self._new_to_hybrid_map[new_indices[1]], - ) + hybrid_indices = (self._new_to_hybrid_map[new_indices[0]], + self._new_to_hybrid_map[new_indices[1]]) new_env_intersection = set(hybrid_indices).intersection( - self._atom_classes["environment_atoms"] - ) + self._atom_classes['environment_atoms']) if new_env_intersection: if set(hybrid_indices).intersection( - self._atom_classes["unique_new_atoms"] + self._atom_classes['unique_new_atoms'] ): - errmsg = ( - f"new index exceptions {new_indices} include " - "unique new and environment atoms, which is " - "dissallowed" - ) + errmsg = (f"new index exceptions {new_indices} include " + "unique new and environment atoms, which is " + "disallowed") raise AssertionError def _handle_constraints(self): """ This method adds relevant constraints from the old and new systems. - First, all constraints from the old systenm are added. + First, all constraints from the old system are added. Then, constraints to atoms unique to the new system are added. - - TODO: condense duplicated code """ # lengths of constraints already added - constraint_lengths = {} + constraint_lengths = dict() + + for system, hybrid_map in ( + (self._old_system, self._old_to_hybrid_map), + (self._new_system, self._new_to_hybrid_map), + ): + for const_idx in range(system.getNumConstraints()): + at1, at2, length = system.getConstraintParameters(const_idx) + hybrid_atoms = self._pair_key(hybrid_map[at1], hybrid_map[at2]) + if hybrid_atoms not in constraint_lengths.keys(): + # add to the system + self._hybrid_system.addConstraint(hybrid_atoms[0], + hybrid_atoms[1], length) + # store for later checks + constraint_lengths[hybrid_atoms] = length + else: + if constraint_lengths[hybrid_atoms] != length: + raise AssertionError('constraint length is changing') - # old system - hybrid_map = self._old_to_hybrid_map - for const_idx in range(self._old_system.getNumConstraints()): - at1, at2, length = self._old_system.getConstraintParameters(const_idx) - hybrid_atoms = tuple(sorted([hybrid_map[at1], hybrid_map[at2]])) - if hybrid_atoms not in constraint_lengths.keys(): - self._hybrid_system.addConstraint( - hybrid_atoms[0], hybrid_atoms[1], length - ) - constraint_lengths[hybrid_atoms] = length - else: - if constraint_lengths[hybrid_atoms] != length: - raise AssertionError("constraint length is changing") - - # new system - hybrid_map = self._new_to_hybrid_map - for const_idx in range(self._new_system.getNumConstraints()): - at1, at2, length = self._new_system.getConstraintParameters(const_idx) - hybrid_atoms = tuple(sorted([hybrid_map[at1], hybrid_map[at2]])) - if hybrid_atoms not in constraint_lengths.keys(): - self._hybrid_system.addConstraint( - hybrid_atoms[0], hybrid_atoms[1], length - ) - constraint_lengths[hybrid_atoms] = length - else: - if constraint_lengths[hybrid_atoms] != length: - raise AssertionError("constraint length is changing") @staticmethod def _copy_threeparticleavg(atm_map, env_atoms, vs): @@ -594,7 +734,7 @@ def _copy_threeparticleavg(atm_map, env_atoms, vs): Parameters ---------- atm_map : dict[int, int] - The atom map correspondance between the two Systems. + The atom map correspondence between the two Systems. env_atoms: set[int] A list of environment atoms for the target System. This checks that no alchemical atoms are being tied to. @@ -610,15 +750,12 @@ def _copy_threeparticleavg(atm_map, env_atoms, vs): particles[i] = atm_map[vs.getParticle(i)] weights[i] = vs.getWeight(i) if not all(i in env_atoms for i in particles.values()): - errmsg = "Virtual sites bound to non-environment atoms " "are not supported" + errmsg = ("Virtual sites bound to non-environment atoms " + "are not supported") raise ValueError(errmsg) return openmm.ThreeParticleAverageSite( - particles[0], - particles[1], - particles[2], - weights[0], - weights[1], - weights[2], + particles[0], particles[1], particles[2], + weights[0], weights[1], weights[2], ) def _handle_virtual_sites(self): @@ -638,22 +775,27 @@ def _handle_virtual_sites(self): # If it's a virtual site, make sure it is not in the unique or # core atoms, since this is currently unsupported hybrid_idx = self._old_to_hybrid_map[particle_idx] - if hybrid_idx not in self._atom_classes["environment_atoms"]: - errmsg = "Virtual sites in changing residue are " "unsupported." + if hybrid_idx not in self._atom_classes['environment_atoms']: + errmsg = ("Virtual sites in changing residue are " + "unsupported.") raise ValueError(errmsg) else: - virtual_site = self._old_system.getVirtualSite(particle_idx) - if isinstance(virtual_site, openmm.ThreeParticleAverageSite): + virtual_site = self._old_system.getVirtualSite( + particle_idx) + if isinstance( + virtual_site, openmm.ThreeParticleAverageSite): vs_copy = self._copy_threeparticleavg( self._old_to_hybrid_map, - self._atom_classes["environment_atoms"], + self._atom_classes['environment_atoms'], virtual_site, ) else: - errmsg = "Unsupported VirtualSite " f"class: {virtual_site}" + errmsg = ("Unsupported VirtualSite " + f"class: {virtual_site}") raise ValueError(errmsg) - self._hybrid_system.setVirtualSite(hybrid_idx, vs_copy) + self._hybrid_system.setVirtualSite(hybrid_idx, + vs_copy) # new system - there should be nothing left to add # Loop through virtual sites @@ -662,15 +804,14 @@ def _handle_virtual_sites(self): # If it's a virtual site, make sure it is not in the unique or # core atoms, since this is currently unsupported hybrid_idx = self._new_to_hybrid_map[particle_idx] - if hybrid_idx not in self._atom_classes["environment_atoms"]: - errmsg = "Virtual sites in changing residue are " "unsupported." + if hybrid_idx not in self._atom_classes['environment_atoms']: + errmsg = ("Virtual sites in changing residue are " + "unsupported.") raise ValueError(errmsg) else: if not self._hybrid_system.isVirtualSite(hybrid_idx): - errmsg = ( - "Environment virtual site in new system " - "found not copied from old system" - ) + errmsg = ("Environment virtual site in new system " + "found not copied from old system") raise ValueError(errmsg) def _add_bond_force_terms(self): @@ -684,31 +825,29 @@ def _add_bond_force_terms(self): ----- * User defined functions have been removed for now. """ - core_energy_expression = "(K/2)*(r-length)^2;" + core_energy_expression = '(K/2)*(r-length)^2;' # linearly interpolate spring constant - core_energy_expression += "K = (1-lambda_bonds)*K1 + lambda_bonds*K2;" + core_energy_expression += 'K = (1-lambda_bonds)*K1 + lambda_bonds*K2;' # linearly interpolate bond length - core_energy_expression += ( - "length = (1-lambda_bonds)*length1 + lambda_bonds*length2;" - ) + core_energy_expression += 'length = (1-lambda_bonds)*length1 + lambda_bonds*length2;' # Create the force and add the relevant parameters custom_core_force = openmm.CustomBondForce(core_energy_expression) - custom_core_force.addPerBondParameter("length1") # old bond length - custom_core_force.addPerBondParameter("K1") # old spring constant - custom_core_force.addPerBondParameter("length2") # new bond length - custom_core_force.addPerBondParameter("K2") # new spring constant + custom_core_force.addPerBondParameter('length1') # old bond length + custom_core_force.addPerBondParameter('K1') # old spring constant + custom_core_force.addPerBondParameter('length2') # new bond length + custom_core_force.addPerBondParameter('K2') # new spring constant - custom_core_force.addGlobalParameter("lambda_bonds", 0.0) + custom_core_force.addGlobalParameter('lambda_bonds', 0.0) self._hybrid_system.addForce(custom_core_force) - self._hybrid_system_forces["core_bond_force"] = custom_core_force + self._hybrid_system_forces['core_bond_force'] = custom_core_force # Add a bond force for environment and unique atoms (bonds are never # scaled for these): standard_bond_force = openmm.HarmonicBondForce() self._hybrid_system.addForce(standard_bond_force) - self._hybrid_system_forces["standard_bond_force"] = standard_bond_force + self._hybrid_system_forces['standard_bond_force'] = standard_bond_force def _add_angle_force_terms(self): """ @@ -721,36 +860,34 @@ def _add_angle_force_terms(self): * User defined functions have been removed for now. * Neglected angle terms have been removed for now. """ - energy_expression = "(K/2)*(theta-theta0)^2;" + energy_expression = '(K/2)*(theta-theta0)^2;' # linearly interpolate spring constant - energy_expression += "K = (1.0-lambda_angles)*K_1 + lambda_angles*K_2;" + energy_expression += 'K = (1.0-lambda_angles)*K_1 + lambda_angles*K_2;' # linearly interpolate equilibrium angle - energy_expression += ( - "theta0 = (1.0-lambda_angles)*theta0_1 + lambda_angles*theta0_2;" - ) + energy_expression += 'theta0 = (1.0-lambda_angles)*theta0_1 + lambda_angles*theta0_2;' # Create the force and add relevant parameters custom_core_force = openmm.CustomAngleForce(energy_expression) # molecule1 equilibrium angle - custom_core_force.addPerAngleParameter("theta0_1") + custom_core_force.addPerAngleParameter('theta0_1') # molecule1 spring constant - custom_core_force.addPerAngleParameter("K_1") + custom_core_force.addPerAngleParameter('K_1') # molecule2 equilibrium angle - custom_core_force.addPerAngleParameter("theta0_2") + custom_core_force.addPerAngleParameter('theta0_2') # molecule2 spring constant - custom_core_force.addPerAngleParameter("K_2") + custom_core_force.addPerAngleParameter('K_2') - custom_core_force.addGlobalParameter("lambda_angles", 0.0) + custom_core_force.addGlobalParameter('lambda_angles', 0.0) # Add the force to the system and the force dict. self._hybrid_system.addForce(custom_core_force) - self._hybrid_system_forces["core_angle_force"] = custom_core_force + self._hybrid_system_forces['core_angle_force'] = custom_core_force # Add an angle term for environment/unique interactions -- these are # never scaled standard_angle_force = openmm.HarmonicAngleForce() self._hybrid_system.addForce(standard_angle_force) - self._hybrid_system_forces["standard_angle_force"] = standard_angle_force + self._hybrid_system_forces['standard_angle_force'] = standard_angle_force def _add_torsion_force_terms(self): """ @@ -765,37 +902,35 @@ def _add_torsion_force_terms(self): add_unique_atom_torsion_force (default True) have been removed for now. """ - energy_expression = "(1-lambda_torsions)*U1 + lambda_torsions*U2;" - energy_expression += "U1 = K1*(1+cos(periodicity1*theta-phase1));" - energy_expression += "U2 = K2*(1+cos(periodicity2*theta-phase2));" + energy_expression = '(1-lambda_torsions)*U1 + lambda_torsions*U2;' + energy_expression += 'U1 = K1*(1+cos(periodicity1*theta-phase1));' + energy_expression += 'U2 = K2*(1+cos(periodicity2*theta-phase2));' # Create the force and add the relevant parameters custom_core_force = openmm.CustomTorsionForce(energy_expression) # molecule1 periodicity - custom_core_force.addPerTorsionParameter("periodicity1") + custom_core_force.addPerTorsionParameter('periodicity1') # molecule1 phase - custom_core_force.addPerTorsionParameter("phase1") + custom_core_force.addPerTorsionParameter('phase1') # molecule1 spring constant - custom_core_force.addPerTorsionParameter("K1") + custom_core_force.addPerTorsionParameter('K1') # molecule2 periodicity - custom_core_force.addPerTorsionParameter("periodicity2") + custom_core_force.addPerTorsionParameter('periodicity2') # molecule2 phase - custom_core_force.addPerTorsionParameter("phase2") + custom_core_force.addPerTorsionParameter('phase2') # molecule2 spring constant - custom_core_force.addPerTorsionParameter("K2") + custom_core_force.addPerTorsionParameter('K2') - custom_core_force.addGlobalParameter("lambda_torsions", 0.0) + custom_core_force.addGlobalParameter('lambda_torsions', 0.0) # Add the force to the system self._hybrid_system.addForce(custom_core_force) - self._hybrid_system_forces["custom_torsion_force"] = custom_core_force + self._hybrid_system_forces['custom_torsion_force'] = custom_core_force # Create and add the torsion term for unique/environment atoms unique_atom_torsion_force = openmm.PeriodicTorsionForce() self._hybrid_system.addForce(unique_atom_torsion_force) - self._hybrid_system_forces["unique_atom_torsion_force"] = ( - unique_atom_torsion_force - ) + self._hybrid_system_forces['unique_atom_torsion_force'] = unique_atom_torsion_force @staticmethod def _nonbonded_custom(v2): @@ -823,19 +958,13 @@ def _nonbonded_custom(v2): if v2: sterics_energy_expression = "U_sterics = select(step(r - r_LJ), 4*epsilon*x*(x-1.0), U_sterics_quad);" sterics_energy_expression += "U_sterics_quad = Force*(((r - r_LJ)^2)/2 - (r - r_LJ)) + U_sterics_cut;" - sterics_energy_expression += ( - "U_sterics_cut = 4*epsilon*((sigma/r_LJ)^6)*(((sigma/r_LJ)^6) - 1.0);" - ) - sterics_energy_expression += ( - "Force = -4*epsilon*((-12*sigma^12)/(r_LJ^13) + (6*sigma^6)/(r_LJ^7));" - ) + sterics_energy_expression += "U_sterics_cut = 4*epsilon*((sigma/r_LJ)^6)*(((sigma/r_LJ)^6) - 1.0);" + sterics_energy_expression += "Force = -4*epsilon*((-12*sigma^12)/(r_LJ^13) + (6*sigma^6)/(r_LJ^7));" sterics_energy_expression += "x = (sigma/r)^6;" sterics_energy_expression += "r_LJ = softcore_alpha*((26/7)*(sigma^6)*lambda_sterics_deprecated)^(1/6);" sterics_energy_expression += "lambda_sterics_deprecated = new_interaction*(1.0 - lambda_sterics_insert) + old_interaction*lambda_sterics_delete;" else: - sterics_energy_expression = ( - "U_sterics = 4*epsilon*x*(x-1.0); x = (sigma/reff_sterics)^6;" - ) + sterics_energy_expression = "U_sterics = 4*epsilon*x*(x-1.0); x = (sigma/reff_sterics)^6;" return sterics_energy_expression @@ -854,13 +983,9 @@ def _nonbonded_custom_sterics_common(): * Move to a dictionary or equivalent. """ # interpolation - sterics_addition = ( - "epsilon = (1-lambda_sterics)*epsilonA + lambda_sterics*epsilonB;" - ) + sterics_addition = "epsilon = (1-lambda_sterics)*epsilonA + lambda_sterics*epsilonB;" # effective softcore distance for sterics - sterics_addition += ( - "reff_sterics = sigma*((softcore_alpha*lambda_alpha + (r/sigma)^6))^(1/6);" - ) + sterics_addition += "reff_sterics = sigma*((softcore_alpha*lambda_alpha + (r/sigma)^6))^(1/6);" sterics_addition += "sigma = (1-lambda_sterics)*sigmaA + lambda_sterics*sigmaB;" sterics_addition += "lambda_alpha = new_interaction*(1-lambda_sterics_insert) + old_interaction*lambda_sterics_delete;" @@ -915,11 +1040,9 @@ def _translate_nonbonded_method_to_custom(standard_nonbonded_method): custom_nonbonded_method : openmm.CustomNonbondedForce.NonbondedMethod the nonbonded method for the equivalent customnonbonded force """ - if standard_nonbonded_method in [ - openmm.NonbondedForce.CutoffPeriodic, - openmm.NonbondedForce.PME, - openmm.NonbondedForce.Ewald, - ]: + if standard_nonbonded_method in [openmm.NonbondedForce.CutoffPeriodic, + openmm.NonbondedForce.PME, + openmm.NonbondedForce.Ewald]: return openmm.CustomNonbondedForce.CutoffPeriodic elif standard_nonbonded_method == openmm.NonbondedForce.NoCutoff: return openmm.CustomNonbondedForce.NoCutoff @@ -950,36 +1073,28 @@ def _add_nonbonded_force_terms(self): # changing. standard_nonbonded_force = openmm.NonbondedForce() self._hybrid_system.addForce(standard_nonbonded_force) - self._hybrid_system_forces["standard_nonbonded_force"] = ( - standard_nonbonded_force - ) + self._hybrid_system_forces['standard_nonbonded_force'] = standard_nonbonded_force # Create a CustomNonbondedForce to handle alchemically interpolated # nonbonded parameters. # Select functional form based on nonbonded method. # TODO: check _nonbonded_custom_ewald and _nonbonded_custom_cutoff # since they take arguments that are never used... - r_cutoff = self._old_system_forces["NonbondedForce"].getCutoffDistance() + r_cutoff = self._old_system_forces['NonbondedForce'].getCutoffDistance() sterics_energy_expression = self._nonbonded_custom(self._softcore_LJ_v2) if self._nonbonded_method in [openmm.NonbondedForce.NoCutoff]: - pass - elif self._nonbonded_method in [ - openmm.NonbondedForce.CutoffPeriodic, - openmm.NonbondedForce.CutoffNonPeriodic, - ]: - epsilon_solvent = self._old_system_forces[ - "NonbondedForce" - ].getReactionFieldDielectric() - standard_nonbonded_force.setReactionFieldDielectric(epsilon_solvent) + sterics_energy_expression = self._nonbonded_custom( + self._softcore_LJ_v2) + elif self._nonbonded_method in [openmm.NonbondedForce.CutoffPeriodic, + openmm.NonbondedForce.CutoffNonPeriodic]: + epsilon_solvent = self._old_system_forces['NonbondedForce'].getReactionFieldDielectric() + standard_nonbonded_force.setReactionFieldDielectric( + epsilon_solvent) standard_nonbonded_force.setCutoffDistance(r_cutoff) - elif self._nonbonded_method in [ - openmm.NonbondedForce.PME, - openmm.NonbondedForce.Ewald, - ]: - [alpha_ewald, nx, ny, nz] = self._old_system_forces[ - "NonbondedForce" - ].getPMEParameters() - delta = self._old_system_forces["NonbondedForce"].getEwaldErrorTolerance() + elif self._nonbonded_method in [openmm.NonbondedForce.PME, + openmm.NonbondedForce.Ewald]: + [alpha_ewald, nx, ny, nz] = self._old_system_forces['NonbondedForce'].getPMEParameters() + delta = self._old_system_forces['NonbondedForce'].getEwaldErrorTolerance() standard_nonbonded_force.setPMEParameters(alpha_ewald, nx, ny, nz) standard_nonbonded_force.setEwaldErrorTolerance(delta) standard_nonbonded_force.setCutoffDistance(r_cutoff) @@ -994,28 +1109,21 @@ def _add_nonbonded_force_terms(self): sterics_mixing_rules = self._nonbonded_custom_mixing_rules() custom_nonbonded_method = self._translate_nonbonded_method_to_custom( - self._nonbonded_method - ) + self._nonbonded_method) - total_sterics_energy = ( - "U_sterics;" + sterics_energy_expression + sterics_mixing_rules - ) + total_sterics_energy = "U_sterics;" + sterics_energy_expression + sterics_mixing_rules sterics_custom_nonbonded_force = openmm.CustomNonbondedForce( - total_sterics_energy - ) - + total_sterics_energy) # Match cutoff from non-custom NB forces sterics_custom_nonbonded_force.setCutoffDistance(r_cutoff) if self._softcore_LJ_v2: sterics_custom_nonbonded_force.addGlobalParameter( - "softcore_alpha", self._softcore_LJ_v2_alpha - ) + "softcore_alpha", self._softcore_LJ_v2_alpha) else: sterics_custom_nonbonded_force.addGlobalParameter( - "softcore_alpha", self._softcore_alpha - ) + "softcore_alpha", self._softcore_alpha) # Lennard-Jones sigma initial sterics_custom_nonbonded_force.addPerParticleParameter("sigmaA") @@ -1030,37 +1138,32 @@ def _add_nonbonded_force_terms(self): # 1 = hybrid new atom, 0 otherwise sterics_custom_nonbonded_force.addPerParticleParameter("unique_new") - sterics_custom_nonbonded_force.addGlobalParameter("lambda_sterics_core", 0.0) sterics_custom_nonbonded_force.addGlobalParameter( - "lambda_electrostatics_core", 0.0 - ) - sterics_custom_nonbonded_force.addGlobalParameter("lambda_sterics_insert", 0.0) - sterics_custom_nonbonded_force.addGlobalParameter("lambda_sterics_delete", 0.0) + "lambda_sterics_core", 0.0) + sterics_custom_nonbonded_force.addGlobalParameter( + "lambda_electrostatics_core", 0.0) + sterics_custom_nonbonded_force.addGlobalParameter( + "lambda_sterics_insert", 0.0) + sterics_custom_nonbonded_force.addGlobalParameter( + "lambda_sterics_delete", 0.0) - sterics_custom_nonbonded_force.setNonbondedMethod(custom_nonbonded_method) + sterics_custom_nonbonded_force.setNonbondedMethod( + custom_nonbonded_method) self._hybrid_system.addForce(sterics_custom_nonbonded_force) - self._hybrid_system_forces["core_sterics_force"] = ( - sterics_custom_nonbonded_force - ) + self._hybrid_system_forces['core_sterics_force'] = sterics_custom_nonbonded_force # Set the use of dispersion correction to be the same between the new # nonbonded force and the old one: - if self._old_system_forces["NonbondedForce"].getUseDispersionCorrection(): - self._hybrid_system_forces[ - "standard_nonbonded_force" - ].setUseDispersionCorrection(True) + if self._old_system_forces['NonbondedForce'].getUseDispersionCorrection(): + self._hybrid_system_forces['standard_nonbonded_force'].setUseDispersionCorrection(True) if self._use_dispersion_correction: sterics_custom_nonbonded_force.setUseLongRangeCorrection(True) else: - self._hybrid_system_forces[ - "standard_nonbonded_force" - ].setUseDispersionCorrection(False) - - if self._old_system_forces["NonbondedForce"].getUseSwitchingFunction(): - switching_distance = self._old_system_forces[ - "NonbondedForce" - ].getSwitchingDistance() + self._hybrid_system_forces['standard_nonbonded_force'].setUseDispersionCorrection(False) + + if self._old_system_forces['NonbondedForce'].getUseSwitchingFunction(): + switching_distance = self._old_system_forces['NonbondedForce'].getSwitchingDistance() standard_nonbonded_force.setUseSwitchingFunction(True) standard_nonbonded_force.setSwitchingDistance(switching_distance) sterics_custom_nonbonded_force.setUseSwitchingFunction(True) @@ -1070,33 +1173,25 @@ def _add_nonbonded_force_terms(self): sterics_custom_nonbonded_force.setUseSwitchingFunction(False) @staticmethod - def _find_bond_parameters(bond_force, index1, index2): + def _build_bond_lookup(bond_force) -> dict: """ - This is a convenience function to find bond parameters in another - system given the two indices. + Build a lookup dictionary for bond parameters given a HarmonicBondForce. Parameters ---------- bond_force : openmm.HarmonicBondForce - The bond force where the parameters should be found - index1 : int - Index1 (order does not matter) of the bond atoms - index2 : int - Index2 (order does not matter) of the bond atoms + The bond force from which to build the lookup. Returns ------- - bond_parameters : list - List of relevant bond parameters + dict + A dictionary mapping a sorted tuple of atom indices to their bond parameters. """ - index_set = {index1, index2} - # Loop through all the bonds: + bond_lookup = {} for bond_index in range(bond_force.getNumBonds()): - parms = bond_force.getBondParameters(bond_index) - if index_set == {parms[0], parms[1]}: - return parms - - return [] + index1, index2, r0, k = bond_force.getBondParameters(bond_index) + bond_lookup[HybridTopologyFactory._pair_key(index1, index2)] = (r0, k) + return bond_lookup def _handle_harmonic_bonds(self): """ @@ -1115,18 +1210,17 @@ def _handle_harmonic_bonds(self): ----- * Bond softening logic has been removed for now. """ - old_system_bond_force = self._old_system_forces["HarmonicBondForce"] - new_system_bond_force = self._new_system_forces["HarmonicBondForce"] + old_system_bond_force = self._old_system_forces['HarmonicBondForce'] + new_system_bond_force = self._new_system_forces['HarmonicBondForce'] + # build a lookup for the new bonds so we don't have to loop through the force + new_bond_lookup = self._build_bond_lookup(new_system_bond_force) + # track the terms in the hybrid system + hybrid_bonds = {} # First, loop through the old system bond forces and add relevant terms for bond_index in range(old_system_bond_force.getNumBonds()): # Get each set of bond parameters - [ - index1_old, - index2_old, - r0_old, - k_old, - ] = old_system_bond_force.getBondParameters(bond_index) + [index1_old, index2_old, r0_old, k_old] = old_system_bond_force.getBondParameters(bond_index) # Map the indices to the hybrid system, for which our atom classes # are defined. @@ -1138,23 +1232,21 @@ def _handle_harmonic_bonds(self): # atoms are in the core) # If it is, we need to find the parameters in the old system so # that we can interpolate - if index_set.issubset(self._atom_classes["core_atoms"]): + if index_set.issubset(self._atom_classes['core_atoms']): index1_new = self._old_to_new_map[index1_old] index2_new = self._old_to_new_map[index2_old] - new_bond_parameters = self._find_bond_parameters( - new_system_bond_force, index1_new, index2_new - ) - if not new_bond_parameters: + new_bond_parameters = new_bond_lookup.get(self._pair_key(index1_new, index2_new), None) + if new_bond_parameters is None: r0_new = r0_old - k_new = 0.0 * unit.kilojoule_per_mole / unit.angstrom**2 + k_new = 0.0*unit.kilojoule_per_mole/unit.angstrom**2 else: - # TODO - why is this being recalculated? - [index1, index2, r0_new, k_new] = self._find_bond_parameters( - new_system_bond_force, index1_new, index2_new - ) - self._hybrid_system_forces["core_bond_force"].addBond( - index1_hybrid, index2_hybrid, [r0_old, k_old, r0_new, k_new] - ) + r0_new, k_new = new_bond_parameters + + self._hybrid_system_forces['core_bond_force'].addBond( + index1_hybrid, index2_hybrid, + [r0_old, k_old, r0_new, k_new]) + # track that we've added this bond + hybrid_bonds[self._pair_key(index1_hybrid, index2_hybrid)] = True # Check if the index set is a subset of anything besides # environment (in the case of environment, we just add the bond to @@ -1164,48 +1256,35 @@ def _handle_harmonic_bonds(self): # NOTE - These are currently all the same because we don't soften # TODO - work these out somewhere else, this is terribly difficult # to understand logic. - elif index_set.issubset(self._atom_classes["unique_old_atoms"]) or ( - len(index_set.intersection(self._atom_classes["unique_old_atoms"])) == 1 - and len(index_set.intersection(self._atom_classes["core_atoms"])) == 1 - ): + elif (index_set.issubset(self._atom_classes['unique_old_atoms']) or + (len(index_set.intersection(self._atom_classes['unique_old_atoms'])) == 1 + and len(index_set.intersection(self._atom_classes['core_atoms'])) == 1)): + # We can just add it to the regular bond force. - self._hybrid_system_forces["standard_bond_force"].addBond( - index1_hybrid, index2_hybrid, r0_old, k_old - ) + self._hybrid_system_forces['standard_bond_force'].addBond( + index1_hybrid, index2_hybrid, r0_old, k_old) - elif ( - len(index_set.intersection(self._atom_classes["environment_atoms"])) - == 1 - and len(index_set.intersection(self._atom_classes["core_atoms"])) == 1 - ): - self._hybrid_system_forces["standard_bond_force"].addBond( - index1_hybrid, index2_hybrid, r0_old, k_old - ) + elif (len(index_set.intersection(self._atom_classes['environment_atoms'])) == 1 and + len(index_set.intersection(self._atom_classes['core_atoms'])) == 1): + self._hybrid_system_forces['standard_bond_force'].addBond( + index1_hybrid, index2_hybrid, r0_old, k_old) # Otherwise, we just add the same parameters as those in the old # system (these are environment atoms, and the parameters are the # same) - elif index_set.issubset(self._atom_classes["environment_atoms"]): - self._hybrid_system_forces["standard_bond_force"].addBond( - index1_hybrid, index2_hybrid, r0_old, k_old - ) + elif index_set.issubset(self._atom_classes['environment_atoms']): + self._hybrid_system_forces['standard_bond_force'].addBond( + index1_hybrid, index2_hybrid, r0_old, k_old) else: - errmsg = ( - f"hybrid index set {index_set} does not fit into a " - "canonical atom type" - ) + errmsg = (f"hybrid index set {index_set} does not fit into a " + "canonical atom type") raise ValueError(errmsg) # Now loop through the new system to get the interactions that are # unique to it. for bond_index in range(new_system_bond_force.getNumBonds()): # Get each set of bond parameters - [ - index1_new, - index2_new, - r0_new, - k_new, - ] = new_system_bond_force.getBondParameters(bond_index) + [index1_new, index2_new, r0_new, k_new] = new_system_bond_force.getBondParameters(bond_index) # Convert indices to hybrid, since that is how we represent atom classes: index1_hybrid = self._new_to_hybrid_map[index1_new] @@ -1216,16 +1295,13 @@ def _handle_harmonic_bonds(self): # anything, the bond is unique to the new system and must be added # all other bonds in the new system have been accounted for already # NOTE - These are mostly all the same because we don't soften - if len( - index_set.intersection(self._atom_classes["unique_new_atoms"]) - ) == 2 or ( - len(index_set.intersection(self._atom_classes["unique_new_atoms"])) == 1 - and len(index_set.intersection(self._atom_classes["core_atoms"])) == 1 - ): + if (len(index_set.intersection(self._atom_classes['unique_new_atoms'])) == 2 or + (len(index_set.intersection(self._atom_classes['unique_new_atoms'])) == 1 and + len(index_set.intersection(self._atom_classes['core_atoms'])) == 1)): + # If we aren't softening bonds, then just add it to the standard bond force - self._hybrid_system_forces["standard_bond_force"].addBond( - index1_hybrid, index2_hybrid, r0_new, k_new - ) + self._hybrid_system_forces['standard_bond_force'].addBond( + index1_hybrid, index2_hybrid, r0_new, k_new) # If the bond is in the core, it has probably already been added # in the above loop. However, there are some circumstances @@ -1233,68 +1309,71 @@ def _handle_harmonic_bonds(self): # not been added and should be added here. # This has some peculiarities to be discussed... # TODO - Work out what the above peculiarities are... - elif index_set.issubset(self._atom_classes["core_atoms"]): - if not self._find_bond_parameters( - self._hybrid_system_forces["core_bond_force"], - index1_hybrid, - index2_hybrid, - ): + elif index_set.issubset(self._atom_classes['core_atoms']): + bond_key = self._pair_key(index1_hybrid, index2_hybrid) + if bond_key not in hybrid_bonds: r0_old = r0_new - k_old = 0.0 * unit.kilojoule_per_mole / unit.angstrom**2 - self._hybrid_system_forces["core_bond_force"].addBond( - index1_hybrid, index2_hybrid, [r0_old, k_old, r0_new, k_new] - ) - elif index_set.issubset(self._atom_classes["environment_atoms"]): + k_old = 0.0*unit.kilojoule_per_mole/unit.angstrom**2 + self._hybrid_system_forces['core_bond_force'].addBond( + index1_hybrid, index2_hybrid, + [r0_old, k_old, r0_new, k_new]) + # track that we've added this bond + hybrid_bonds[bond_key] = True + + elif index_set.issubset(self._atom_classes['environment_atoms']): # Already been added pass - elif ( - len(index_set.intersection(self._atom_classes["environment_atoms"])) - == 1 - and len(index_set.intersection(self._atom_classes["core_atoms"])) == 1 - ): + elif (len(index_set.intersection(self._atom_classes['environment_atoms'])) == 1 and + len(index_set.intersection(self._atom_classes['core_atoms'])) == 1): pass else: - errmsg = ( - f"hybrid index set {index_set} does not fit into a " - "canonical atom type" - ) + errmsg = (f"hybrid index set {index_set} does not fit into a " + "canonical atom type") raise ValueError(errmsg) @staticmethod - def _find_angle_parameters(angle_force, indices): + def _triplet_key(a, b, c) -> tuple[int, int, int]: """ - Convenience function to find the angle parameters corresponding to a - particular set of indices + Create a key for angle lookups that is invariant to the order of the first and third indices. Parameters ---------- - angle_force : openmm.HarmonicAngleForce - The force where the angle of interest may be found. - indices : list of int - The indices (any order) of the angle atoms + a : int + The first atom index. + b : int + The second atom index (the central atom in the angle). + c : int + The third atom index. Returns ------- - angle_params : list - list of angle parameters + tuple[int, int, int] + A tuple representing the angle, with the first and third indices sorted. """ - indices_reversed = indices[::-1] + return (a, b, c) if a < c else (c, b, a) - # Now loop through and try to find the angle: - for angle_index in range(angle_force.getNumAngles()): - angle_params = angle_force.getAngleParameters(angle_index) + @staticmethod + def _build_angle_lookup(angle_force) -> dict: + """ + Build a lookup dictionary for angle parameters given a HarmonicAngleForce. - # Get a set representing the angle indices - angle_param_indices = angle_params[:3] + Parameters + ---------- + angle_force : openmm.HarmonicAngleForce + The angle force from which to build the lookup. - if ( - indices == angle_param_indices - or indices_reversed == angle_param_indices - ): - return angle_params - return [] # Return empty if no matching angle found + Returns + ------- + dict + A dictionary mapping a sorted tuple of atom indices to their angle parameters. + """ + angle_lookup = {} + for angle_index in range(angle_force.getNumAngles()): + index1, index2, index3, theta0, k = angle_force.getAngleParameters(angle_index) + angle_lookup[HybridTopologyFactory._triplet_key(index1, index2, index3)] = (theta0, k) + return angle_lookup def _handle_harmonic_angles(self): """ @@ -1319,80 +1398,73 @@ def _handle_harmonic_angles(self): ----- * Removed softening and neglected angle functionality """ - old_system_angle_force = self._old_system_forces["HarmonicAngleForce"] - new_system_angle_force = self._new_system_forces["HarmonicAngleForce"] + old_system_angle_force = self._old_system_forces['HarmonicAngleForce'] + new_system_angle_force = self._new_system_forces['HarmonicAngleForce'] + # build a lookup for the new system angles to save iterating through the force + new_angle_lookup = self._build_angle_lookup(new_system_angle_force) + # hybrid angle tracking + hybrid_angles = {} # First, loop through all the angles in the old system to determine # what to do with them. We will only use the # custom angle force if all atoms are part of "core." Otherwise, they # are either unique to one system or never change. for angle_index in range(old_system_angle_force.getNumAngles()): + old_angle_parameters = old_system_angle_force.getAngleParameters( - angle_index - ) + angle_index) # Get the indices in the hybrid system hybrid_index_list = [ - self._old_to_hybrid_map[old_atomid] - for old_atomid in old_angle_parameters[:3] + self._old_to_hybrid_map[old_atomid] for old_atomid in old_angle_parameters[:3] ] hybrid_index_set = set(hybrid_index_list) # If all atoms are in the core, we'll need to find the # corresponding parameters in the old system and interpolate - if hybrid_index_set.issubset(self._atom_classes["core_atoms"]): + if hybrid_index_set.issubset(self._atom_classes['core_atoms']): # Get the new indices so we can get the new angle parameters new_indices = [ - self._old_to_new_map[old_atomid] - for old_atomid in old_angle_parameters[:3] + self._old_to_new_map[old_atomid] for old_atomid in old_angle_parameters[:3] ] - new_angle_parameters = self._find_angle_parameters( - new_system_angle_force, new_indices - ) - if not new_angle_parameters: - new_angle_parameters = [ - 0, - 0, - 0, - old_angle_parameters[3], - 0.0 * unit.kilojoule_per_mole / unit.radian**2, - ] + new_angle_parameters = new_angle_lookup.get(self._triplet_key(*new_indices), None) + + if new_angle_parameters is None: + new_theta0 = old_angle_parameters[3] + new_k = 0.0*unit.kilojoule_per_mole/unit.radian**2 + else: + new_theta0, new_k = new_angle_parameters # Add to the hybrid force: # the parameters at indices 3 and 4 represent theta0 and k, # respectively. hybrid_force_parameters = [ - old_angle_parameters[3], - old_angle_parameters[4], - new_angle_parameters[3], - new_angle_parameters[4], + old_angle_parameters[3], old_angle_parameters[4], + new_theta0, new_k ] - self._hybrid_system_forces["core_angle_force"].addAngle( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - hybrid_force_parameters, + self._hybrid_system_forces['core_angle_force'].addAngle( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], hybrid_force_parameters ) + # track that we have added this angle for the hybrid system + hybrid_angles[self._triplet_key(*hybrid_index_list)] = True # Check if the atoms are neither all core nor all environment, # which would mean they involve unique old interactions - elif not hybrid_index_set.issubset(self._atom_classes["environment_atoms"]): + elif not hybrid_index_set.issubset( + self._atom_classes['environment_atoms']): # if there is an environment atom if hybrid_index_set.intersection( - self._atom_classes["environment_atoms"] - ): + self._atom_classes['environment_atoms']): if hybrid_index_set.intersection( - self._atom_classes["unique_old_atoms"] - ): + self._atom_classes['unique_old_atoms']): errmsg = "we disallow unique-environment terms" raise ValueError(errmsg) - self._hybrid_system_forces["standard_angle_force"].addAngle( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - old_angle_parameters[3], - old_angle_parameters[4], + self._hybrid_system_forces['standard_angle_force'].addAngle( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], old_angle_parameters[3], + old_angle_parameters[4] ) else: # There are no env atoms, so we can treat this term @@ -1400,42 +1472,36 @@ def _handle_harmonic_angles(self): # We don't soften so just add this to the standard angle # force - self._hybrid_system_forces["standard_angle_force"].addAngle( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - old_angle_parameters[3], - old_angle_parameters[4], + self._hybrid_system_forces['standard_angle_force'].addAngle( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], old_angle_parameters[3], + old_angle_parameters[4] ) # Otherwise, only environment atoms are in this interaction, so # add it to the standard angle force - elif hybrid_index_set.issubset(self._atom_classes["environment_atoms"]): - self._hybrid_system_forces["standard_angle_force"].addAngle( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - old_angle_parameters[3], - old_angle_parameters[4], + elif hybrid_index_set.issubset( + self._atom_classes['environment_atoms']): + self._hybrid_system_forces['standard_angle_force'].addAngle( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], old_angle_parameters[3], + old_angle_parameters[4] ) else: - errmsg = ( - f"handle_harmonic_angles: angle_index {angle_index} " - "does not fit a canonical form." - ) + errmsg = (f"handle_harmonic_angles: angle_index {angle_index} " + "does not fit a canonical form.") raise ValueError(errmsg) # Finally, loop through the new system force to add any unique new # angles for angle_index in range(new_system_angle_force.getNumAngles()): + new_angle_parameters = new_system_angle_force.getAngleParameters( - angle_index - ) + angle_index) # Get the indices in the hybrid system hybrid_index_list = [ - self._new_to_hybrid_map[new_atomid] - for new_atomid in new_angle_parameters[:3] + self._new_to_hybrid_map[new_atomid] for new_atomid in new_angle_parameters[:3] ] hybrid_index_set = set(hybrid_index_list) @@ -1443,104 +1509,50 @@ def _handle_harmonic_angles(self): # is nonempty, it must be added: # TODO - there's a ton of len > 0 on sets, empty sets == False, # so we can simplify this logic. - if ( - len( - hybrid_index_set.intersection( - self._atom_classes["unique_new_atoms"] - ) - ) - > 0 - ): + if len(hybrid_index_set.intersection( + self._atom_classes['unique_new_atoms'])) > 0: if hybrid_index_set.intersection( - self._atom_classes["environment_atoms"] - ): - errmsg = ( - "we disallow angle terms with unique new and " - "environment atoms" - ) + self._atom_classes['environment_atoms']): + errmsg = ("we disallow angle terms with unique new and " + "environment atoms") raise ValueError(errmsg) # Not softening just add to the nonalchemical force - self._hybrid_system_forces["standard_angle_force"].addAngle( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - new_angle_parameters[3], - new_angle_parameters[4], + self._hybrid_system_forces['standard_angle_force'].addAngle( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], new_angle_parameters[3], + new_angle_parameters[4] ) - elif hybrid_index_set.issubset(self._atom_classes["core_atoms"]): - if not self._find_angle_parameters( - self._hybrid_system_forces["core_angle_force"], hybrid_index_list - ): + elif hybrid_index_set.issubset(self._atom_classes['core_atoms']): + angle_key = self._triplet_key(*hybrid_index_list) + if angle_key not in hybrid_angles: hybrid_force_parameters = [ new_angle_parameters[3], - 0.0 * unit.kilojoule_per_mole / unit.radian**2, - new_angle_parameters[3], - new_angle_parameters[4], + 0.0*unit.kilojoule_per_mole/unit.radian**2, + new_angle_parameters[3], new_angle_parameters[4] ] - self._hybrid_system_forces["core_angle_force"].addAngle( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - hybrid_force_parameters, + self._hybrid_system_forces['core_angle_force'].addAngle( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], hybrid_force_parameters ) - elif hybrid_index_set.issubset(self._atom_classes["environment_atoms"]): + # track that we have added this angle + hybrid_angles[angle_key] = True + + elif hybrid_index_set.issubset(self._atom_classes['environment_atoms']): # We have already added the appropriate environmental atom # terms pass - elif hybrid_index_set.intersection(self._atom_classes["environment_atoms"]): - if hybrid_index_set.intersection( - self._atom_classes["unique_new_atoms"] - ): - errmsg = ( - "we disallow angle terms with unique new and " - "environment atoms" - ) + elif hybrid_index_set.intersection(self._atom_classes['environment_atoms']): + if hybrid_index_set.intersection(self._atom_classes['unique_new_atoms']): + errmsg = ("we disallow angle terms with unique new and " + "environment atoms") raise ValueError(errmsg) else: - errmsg = ( - f"hybrid index list {hybrid_index_list} does not " - "fit into a canonical atom set" - ) + errmsg = (f"hybrid index list {hybrid_index_list} does not " + "fit into a canonical atom set") raise ValueError(errmsg) - @staticmethod - def _find_torsion_parameters(torsion_force, indices): - """ - Convenience function to find the torsion parameters corresponding to a - particular set of indices. - - Parameters - ---------- - torsion_force : openmm.PeriodicTorsionForce - torsion force where the torsion of interest may be found - indices : list of int - The indices of the atoms of the torsion - - Returns - ------- - torsion_parameters : list - torsion parameters - """ - indices_reversed = indices[::-1] - - torsion_params_list = [] - - # Now loop through and try to find the torsion: - for torsion_idx in range(torsion_force.getNumTorsions()): - torsion_params = torsion_force.getTorsionParameters(torsion_idx) - - # Get a set representing the torsion indices: - torsion_param_indices = torsion_params[:4] - - if ( - indices == torsion_param_indices - or indices_reversed == torsion_param_indices - ): - torsion_params_list.append(torsion_params) - - return torsion_params_list def _handle_periodic_torsion_force(self): """ @@ -1560,13 +1572,15 @@ def _handle_periodic_torsion_force(self): ----- * Torsion flattening logic has been removed for now. """ - old_system_torsion_force = self._old_system_forces["PeriodicTorsionForce"] - new_system_torsion_force = self._new_system_forces["PeriodicTorsionForce"] + old_system_torsion_force = self._old_system_forces['PeriodicTorsionForce'] + new_system_torsion_force = self._new_system_forces['PeriodicTorsionForce'] + # local variables for speed while doing many lookups + unique_old_atoms = self._atom_classes["unique_old_atoms"] + unique_new_atoms = self._atom_classes["unique_new_atoms"] - # aux list stores the torsions that we already computed such that we don't add them again when checking the new system - auxiliary_custom_torsion_force = [] - # aludel/valence.py -- convenient way of handling all the valence terms for alchemistry - old_custom_torsions_to_standard = [] + # use sets to keep membership checks quick as systems have many torsions + auxiliary_custom_torsion_force = set() + old_custom_torsions_to_standard = set() # We need to keep track of what torsions we added so that we do not # double count @@ -1574,132 +1588,92 @@ def _handle_periodic_torsion_force(self): # TODO: Commented out since this actually isn't being done anywhere? # Is it necessary? Should we add this logic back in? for torsion_index in range(old_system_torsion_force.getNumTorsions()): + torsion_parameters = old_system_torsion_force.getTorsionParameters( - torsion_index - ) + torsion_index) # Get the indices in the hybrid system hybrid_index_list = [ - self._old_to_hybrid_map[old_index] - for old_index in torsion_parameters[:4] + self._old_to_hybrid_map[old_index] for old_index in torsion_parameters[:4] ] hybrid_index_set = set(hybrid_index_list) # If all atoms are in the core, we'll need to find the # corresponding parameters in the old system and interpolate - if hybrid_index_set.intersection(self._atom_classes["unique_old_atoms"]): + if hybrid_index_set.intersection(unique_old_atoms): # Then it goes to a standard force... - self._hybrid_system_forces["unique_atom_torsion_force"].addTorsion( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - hybrid_index_list[3], - torsion_parameters[4], - torsion_parameters[5], - torsion_parameters[6], + self._hybrid_system_forces['unique_atom_torsion_force'].addTorsion( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], hybrid_index_list[3], + torsion_parameters[4], torsion_parameters[5], + torsion_parameters[6] ) else: # It is a core-only term, an environment-only term, or a # core/env term; in any case, it goes to the core torsion_force # TODO - why are we even adding the 0.0, 0.0, 0.0 section? hybrid_force_parameters = [ - torsion_parameters[4], - torsion_parameters[5], - torsion_parameters[6], - 0.0, - 0.0, - 0.0, + torsion_parameters[4], torsion_parameters[5].value_in_unit(unit.radian), + torsion_parameters[6].value_in_unit(unit.kilojoule_per_mole), 0.0, 0.0, 0.0 ] - auxiliary_custom_torsion_force.append( - [ - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - hybrid_index_list[3], - hybrid_force_parameters[:3], - ] + auxiliary_custom_torsion_force.add( + (hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], hybrid_index_list[3], + *hybrid_force_parameters[:3]) ) for torsion_index in range(new_system_torsion_force.getNumTorsions()): - torsion_parameters = new_system_torsion_force.getTorsionParameters( - torsion_index - ) + torsion_parameters = new_system_torsion_force.getTorsionParameters(torsion_index) # Get the indices in the hybrid system: hybrid_index_list = [ - self._new_to_hybrid_map[new_index] - for new_index in torsion_parameters[:4] - ] + self._new_to_hybrid_map[new_index] for new_index in torsion_parameters[:4]] hybrid_index_set = set(hybrid_index_list) - if hybrid_index_set.intersection(self._atom_classes["unique_new_atoms"]): + if hybrid_index_set.intersection(unique_new_atoms): # Then it goes to the custom torsion force (scaled to zero) - self._hybrid_system_forces["unique_atom_torsion_force"].addTorsion( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - hybrid_index_list[3], - torsion_parameters[4], - torsion_parameters[5], - torsion_parameters[6], + self._hybrid_system_forces['unique_atom_torsion_force'].addTorsion( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], hybrid_index_list[3], + torsion_parameters[4], torsion_parameters[5], + torsion_parameters[6] ) else: hybrid_force_parameters = [ - 0.0, - 0.0, - 0.0, - torsion_parameters[4], - torsion_parameters[5], - torsion_parameters[6], - ] + 0.0, 0.0, 0.0, torsion_parameters[4], + torsion_parameters[5].value_in_unit(unit.radian), torsion_parameters[6].value_in_unit(unit.kilojoule_per_mole)] # Check to see if this term is in the olds... - term = [ - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - hybrid_index_list[3], - hybrid_force_parameters[3:], - ] + term = (hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], hybrid_index_list[3], + *hybrid_force_parameters[3:]) if term in auxiliary_custom_torsion_force: # Then this terms has to go to standard and be deleted... - old_index = auxiliary_custom_torsion_force.index(term) - old_custom_torsions_to_standard.append(old_index) - self._hybrid_system_forces["unique_atom_torsion_force"].addTorsion( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - hybrid_index_list[3], - torsion_parameters[4], - torsion_parameters[5], - torsion_parameters[6], + old_custom_torsions_to_standard.add(term) + self._hybrid_system_forces['unique_atom_torsion_force'].addTorsion( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], hybrid_index_list[3], + torsion_parameters[4], torsion_parameters[5], + torsion_parameters[6] ) else: # Then this term has to go to the core force... - self._hybrid_system_forces["custom_torsion_force"].addTorsion( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - hybrid_index_list[3], - hybrid_force_parameters, + self._hybrid_system_forces['custom_torsion_force'].addTorsion( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], hybrid_index_list[3], + hybrid_force_parameters ) # Now we have to loop through the aux custom torsion force - for index in [ - q - for q in range(len(auxiliary_custom_torsion_force)) - if q not in old_custom_torsions_to_standard - ]: - terms = auxiliary_custom_torsion_force[index] - hybrid_index_list = terms[:4] - hybrid_force_parameters = terms[4] + [0.0, 0.0, 0.0] - self._hybrid_system_forces["custom_torsion_force"].addTorsion( - hybrid_index_list[0], - hybrid_index_list[1], - hybrid_index_list[2], - hybrid_index_list[3], - hybrid_force_parameters, - ) + for term in auxiliary_custom_torsion_force: + if term not in old_custom_torsions_to_standard: + hybrid_index_list = term[:4] + hybrid_force_parameters = term[4:] + (0., 0., 0.) + self._hybrid_system_forces['custom_torsion_force'].addTorsion( + hybrid_index_list[0], hybrid_index_list[1], + hybrid_index_list[2], hybrid_index_list[3], + hybrid_force_parameters + ) def _handle_nonbonded(self): """ @@ -1711,93 +1685,71 @@ def _handle_nonbonded(self): * A lot of this logic is duplicated, probably turn it into a couple of functions. """ - def _check_indices(idx1, idx2): if idx1 != idx2: - errmsg = "Attempting to add incorrect particle to hybrid " "system" + errmsg = ("Attempting to add incorrect particle to hybrid " + "system") raise ValueError(errmsg) - old_system_nonbonded_force = self._old_system_forces["NonbondedForce"] - new_system_nonbonded_force = self._new_system_forces["NonbondedForce"] + old_system_nonbonded_force = self._old_system_forces['NonbondedForce'] + new_system_nonbonded_force = self._new_system_forces['NonbondedForce'] hybrid_to_old_map = self._hybrid_to_old_map hybrid_to_new_map = self._hybrid_to_new_map # Define new global parameters for NonbondedForce - self._hybrid_system_forces["standard_nonbonded_force"].addGlobalParameter( - "lambda_electrostatics_core", 0.0 - ) - self._hybrid_system_forces["standard_nonbonded_force"].addGlobalParameter( - "lambda_sterics_core", 0.0 - ) - self._hybrid_system_forces["standard_nonbonded_force"].addGlobalParameter( - "lambda_electrostatics_delete", 0.0 - ) - self._hybrid_system_forces["standard_nonbonded_force"].addGlobalParameter( - "lambda_electrostatics_insert", 0.0 - ) + self._hybrid_system_forces['standard_nonbonded_force'].addGlobalParameter('lambda_electrostatics_core', 0.0) + self._hybrid_system_forces['standard_nonbonded_force'].addGlobalParameter('lambda_sterics_core', 0.0) + self._hybrid_system_forces['standard_nonbonded_force'].addGlobalParameter("lambda_electrostatics_delete", 0.0) + self._hybrid_system_forces['standard_nonbonded_force'].addGlobalParameter("lambda_electrostatics_insert", 0.0) # We have to loop through the particles in the system, because # nonbonded force does not accept index for particle_index in range(self._hybrid_system.getNumParticles()): - if particle_index in self._atom_classes["unique_old_atoms"]: + + if particle_index in self._atom_classes['unique_old_atoms']: # Get the parameters in the old system old_index = hybrid_to_old_map[particle_index] - [ - charge, - sigma, - epsilon, - ] = old_system_nonbonded_force.getParticleParameters(old_index) + [charge, sigma, epsilon] = old_system_nonbonded_force.getParticleParameters(old_index) # Add the particle to the hybrid custom sterics and # electrostatics. # turning off sterics in forward direction - check_index = self._hybrid_system_forces[ - "core_sterics_force" - ].addParticle([sigma, epsilon, sigma, 0.0 * epsilon, 1, 0]) + check_index = self._hybrid_system_forces['core_sterics_force'].addParticle( + [sigma, epsilon, sigma, 0.0*epsilon, 1, 0] + ) _check_indices(particle_index, check_index) # Add particle to the regular nonbonded force, but # Lennard-Jones will be handled by CustomNonbondedForce - check_index = self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addParticle(charge, sigma, 0.0 * epsilon) + check_index = self._hybrid_system_forces['standard_nonbonded_force'].addParticle( + charge, sigma, 0.0*epsilon + ) _check_indices(particle_index, check_index) # Charge will be turned off at # lambda_electrostatics_delete = 0, on at # lambda_electrostatics_delete = 1; kill charge with # lambda_electrostatics_delete = 0 --> 1 - self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addParticleParameterOffset( - "lambda_electrostatics_delete", - particle_index, - -charge, - 0 * sigma, - 0 * epsilon, + self._hybrid_system_forces['standard_nonbonded_force'].addParticleParameterOffset( + 'lambda_electrostatics_delete', particle_index, + -charge, 0*sigma, 0*epsilon ) - elif particle_index in self._atom_classes["unique_new_atoms"]: + elif particle_index in self._atom_classes['unique_new_atoms']: # Get the parameters in the new system new_index = hybrid_to_new_map[particle_index] - [ - charge, - sigma, - epsilon, - ] = new_system_nonbonded_force.getParticleParameters(new_index) + [charge, sigma, epsilon] = new_system_nonbonded_force.getParticleParameters(new_index) # Add the particle to the hybrid custom sterics and electrostatics # turning on sterics in forward direction - check_index = self._hybrid_system_forces[ - "core_sterics_force" - ].addParticle([sigma, 0.0 * epsilon, sigma, epsilon, 0, 1]) + check_index = self._hybrid_system_forces['core_sterics_force'].addParticle( + [sigma, 0.0*epsilon, sigma, epsilon, 0, 1] + ) _check_indices(particle_index, check_index) # Add particle to the regular nonbonded force, but # Lennard-Jones will be handled by CustomNonbondedForce - check_index = self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addParticle( + check_index = self._hybrid_system_forces['standard_nonbonded_force'].addParticle( 0.0, sigma, 0.0 ) # charge starts at zero _check_indices(particle_index, check_index) @@ -1805,41 +1757,31 @@ def _check_indices(idx1, idx2): # Charge will be turned off at lambda_electrostatics_insert = 0 # on at lambda_electrostatics_insert = 1; # add charge with lambda_electrostatics_insert = 0 --> 1 - self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addParticleParameterOffset( - "lambda_electrostatics_insert", particle_index, +charge, 0, 0 + self._hybrid_system_forces['standard_nonbonded_force'].addParticleParameterOffset( + 'lambda_electrostatics_insert', particle_index, + +charge, 0, 0 ) - elif particle_index in self._atom_classes["core_atoms"]: + elif particle_index in self._atom_classes['core_atoms']: # Get the parameters in the new and old systems: old_index = hybrid_to_old_map[particle_index] - [ - charge_old, - sigma_old, - epsilon_old, - ] = old_system_nonbonded_force.getParticleParameters(old_index) + [charge_old, sigma_old, epsilon_old] = old_system_nonbonded_force.getParticleParameters(old_index) new_index = hybrid_to_new_map[particle_index] - [ - charge_new, - sigma_new, - epsilon_new, - ] = new_system_nonbonded_force.getParticleParameters(new_index) + [charge_new, sigma_new, epsilon_new] = new_system_nonbonded_force.getParticleParameters(new_index) # Add the particle to the custom forces, interpolating between # the two parameters; add steric params and zero electrostatics # to core_sterics per usual - check_index = self._hybrid_system_forces[ - "core_sterics_force" - ].addParticle([sigma_old, epsilon_old, sigma_new, epsilon_new, 0, 0]) + check_index = self._hybrid_system_forces['core_sterics_force'].addParticle( + [sigma_old, epsilon_old, sigma_new, epsilon_new, 0, 0]) _check_indices(particle_index, check_index) # Still add the particle to the regular nonbonded force, but # with zeroed out parameters; add old charge to # standard_nonbonded and zero sterics - check_index = self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addParticle(charge_old, 0.5 * (sigma_old + sigma_new), 0.0) + check_index = self._hybrid_system_forces['standard_nonbonded_force'].addParticle( + # this term is off due to epsilon = 0, but just set sigma to the initial value to not confuse things + charge_old, sigma_old, 0.0) _check_indices(particle_index, check_index) # Charge is charge_old at lambda_electrostatics = 0, @@ -1850,14 +1792,9 @@ def _check_indices(idx1, idx2): # Interpolate between old and new charge with # lambda_electrostatics core make sure to keep sterics off - self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addParticleParameterOffset( - "lambda_electrostatics_core", - particle_index, - (charge_new - charge_old), - 0, - 0, + self._hybrid_system_forces['standard_nonbonded_force'].addParticleParameterOffset( + 'lambda_electrostatics_core', particle_index, + (charge_new - charge_old), 0, 0 ) # Otherwise, the particle is in the environment @@ -1865,42 +1802,34 @@ def _check_indices(idx1, idx2): # The parameters will be the same in new and old system, so # just take the old parameters old_index = hybrid_to_old_map[particle_index] - [ - charge, - sigma, - epsilon, - ] = old_system_nonbonded_force.getParticleParameters(old_index) + [charge, sigma, epsilon] = old_system_nonbonded_force.getParticleParameters(old_index) # Add the particle to the hybrid custom sterics, but they dont # change; electrostatics are ignored - self._hybrid_system_forces["core_sterics_force"].addParticle( + self._hybrid_system_forces['core_sterics_force'].addParticle( [sigma, epsilon, sigma, epsilon, 0, 0] ) # Add the environment atoms to the regular nonbonded force as # well: should we be adding steric terms here, too? - self._hybrid_system_forces["standard_nonbonded_force"].addParticle( + self._hybrid_system_forces['standard_nonbonded_force'].addParticle( charge, sigma, epsilon ) # Now loop pairwise through (unique_old, unique_new) and add exceptions # so that they never interact electrostatically # (place into Nonbonded Force) - unique_old_atoms = self._atom_classes["unique_old_atoms"] - unique_new_atoms = self._atom_classes["unique_new_atoms"] + unique_old_atoms = self._atom_classes['unique_old_atoms'] + unique_new_atoms = self._atom_classes['unique_new_atoms'] for old in unique_old_atoms: for new in unique_new_atoms: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - old, - new, - 0.0 * unit.elementary_charge**2, - 1.0 * unit.nanometers, - 0.0 * unit.kilojoules_per_mole, - ) + self._hybrid_system_forces['standard_nonbonded_force'].addException( + old, new, 0.0*unit.elementary_charge**2, + 1.0*unit.nanometers, 0.0*unit.kilojoules_per_mole) # This is only necessary to avoid the 'All forces must have # identical exclusions' rule - self._hybrid_system_forces["core_sterics_force"].addExclusion(old, new) + self._hybrid_system_forces['core_sterics_force'].addExclusion(old, new) self._handle_interaction_groups() @@ -1929,29 +1858,34 @@ def _handle_interaction_groups(self): 8) Unique-old - Unique-old """ # Get the force objects for convenience: - sterics_custom_force = self._hybrid_system_forces["core_sterics_force"] + sterics_custom_force = self._hybrid_system_forces['core_sterics_force'] # Also prepare the atom classes - core_atoms = self._atom_classes["core_atoms"] - unique_old_atoms = self._atom_classes["unique_old_atoms"] - unique_new_atoms = self._atom_classes["unique_new_atoms"] - environment_atoms = self._atom_classes["environment_atoms"] + core_atoms = self._atom_classes['core_atoms'] + unique_old_atoms = self._atom_classes['unique_old_atoms'] + unique_new_atoms = self._atom_classes['unique_new_atoms'] + environment_atoms = self._atom_classes['environment_atoms'] sterics_custom_force.addInteractionGroup(unique_old_atoms, core_atoms) - sterics_custom_force.addInteractionGroup(unique_old_atoms, environment_atoms) + sterics_custom_force.addInteractionGroup(unique_old_atoms, + environment_atoms) - sterics_custom_force.addInteractionGroup(unique_new_atoms, core_atoms) + sterics_custom_force.addInteractionGroup(unique_new_atoms, + core_atoms) - sterics_custom_force.addInteractionGroup(unique_new_atoms, environment_atoms) + sterics_custom_force.addInteractionGroup(unique_new_atoms, + environment_atoms) sterics_custom_force.addInteractionGroup(core_atoms, environment_atoms) sterics_custom_force.addInteractionGroup(core_atoms, core_atoms) - sterics_custom_force.addInteractionGroup(unique_new_atoms, unique_new_atoms) + sterics_custom_force.addInteractionGroup(unique_new_atoms, + unique_new_atoms) - sterics_custom_force.addInteractionGroup(unique_old_atoms, unique_old_atoms) + sterics_custom_force.addInteractionGroup(unique_old_atoms, + unique_old_atoms) def _handle_hybrid_exceptions(self): """ @@ -1959,12 +1893,12 @@ def _handle_hybrid_exceptions(self): exceptions for interactions that were zeroed out but should occur. """ # TODO - are these actually used anywhere? Flake8 says no - old_system_nonbonded_force = self._old_system_forces["NonbondedForce"] - new_system_nonbonded_force = self._new_system_forces["NonbondedForce"] + old_system_nonbonded_force = self._old_system_forces['NonbondedForce'] + new_system_nonbonded_force = self._new_system_forces['NonbondedForce'] # Prepare the atom classes - unique_old_atoms = self._atom_classes["unique_old_atoms"] - unique_new_atoms = self._atom_classes["unique_new_atoms"] + unique_old_atoms = self._atom_classes['unique_old_atoms'] + unique_new_atoms = self._atom_classes['unique_new_atoms'] # Get the list of interaction pairs for which we need to set exceptions unique_old_pairs = list(itertools.combinations(unique_old_atoms, 2)) @@ -1975,61 +1909,33 @@ def _handle_hybrid_exceptions(self): for atom_pair in unique_old_pairs: # Since the pairs are indexed in the dictionary by the old system # indices, we need to convert - old_index_atom_pair = ( + # use the exception key function to ensure we check using the correct order + old_index_atom_pair = self._pair_key( self._hybrid_to_old_map[atom_pair[0]], - self._hybrid_to_old_map[atom_pair[1]], + self._hybrid_to_old_map[atom_pair[1]] ) # Now we check if the pair is in the exception dictionary if old_index_atom_pair in self._old_system_exceptions: - [chargeProd, sigma, epsilon] = self._old_system_exceptions[ - old_index_atom_pair - ] + [chargeProd, sigma, epsilon] = self._old_system_exceptions[old_index_atom_pair] # if we are interpolating 1,4 exceptions then we have to if self._interpolate_14s: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - atom_pair[0], - atom_pair[1], - chargeProd * 0.0, - sigma, - epsilon * 0.0, - ) - else: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - atom_pair[0], atom_pair[1], chargeProd, sigma, epsilon - ) - - # Add exclusion to ensure exceptions are consistent - self._hybrid_system_forces["core_sterics_force"].addExclusion( - atom_pair[0], atom_pair[1] - ) - - # Check if the pair is in the reverse order and use that if so - elif old_index_atom_pair[::-1] in self._old_system_exceptions: - [chargeProd, sigma, epsilon] = self._old_system_exceptions[ - old_index_atom_pair[::-1] - ] - # If we are interpolating 1,4 exceptions then we have to - if self._interpolate_14s: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - atom_pair[0], - atom_pair[1], - chargeProd * 0.0, - sigma, - epsilon * 0.0, + self._hybrid_system_forces['standard_nonbonded_force'].addException( + atom_pair[0], atom_pair[1], chargeProd*0.0, + sigma, epsilon*0.0 ) else: - self._hybrid_system_forces["standard_nonbonded_force"].addException( + self._hybrid_system_forces['standard_nonbonded_force'].addException( atom_pair[0], atom_pair[1], chargeProd, sigma, epsilon ) # Add exclusion to ensure exceptions are consistent - self._hybrid_system_forces["core_sterics_force"].addExclusion( + self._hybrid_system_forces['core_sterics_force'].addExclusion( atom_pair[0], atom_pair[1] ) # TODO: work out why there's a bunch of commented out code here - # Exerpt: + # Excerpt: # If it's not handled by an exception in the original system, we # just add the regular parameters as an exception # TODO: this implies that the old-old nonbonded interactions (those @@ -2041,87 +1947,31 @@ def _handle_hybrid_exceptions(self): for atom_pair in unique_new_pairs: # Since the pairs are indexed in the dictionary by the new system # indices, we need to convert - new_index_atom_pair = ( + new_index_atom_pair = self._pair_key( self._hybrid_to_new_map[atom_pair[0]], - self._hybrid_to_new_map[atom_pair[1]], + self._hybrid_to_new_map[atom_pair[1]] ) # Now we check if the pair is in the exception dictionary if new_index_atom_pair in self._new_system_exceptions: - [chargeProd, sigma, epsilon] = self._new_system_exceptions[ - new_index_atom_pair - ] - if self._interpolate_14s: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - atom_pair[0], - atom_pair[1], - chargeProd * 0.0, - sigma, - epsilon * 0.0, - ) - else: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - atom_pair[0], atom_pair[1], chargeProd, sigma, epsilon - ) - - self._hybrid_system_forces["core_sterics_force"].addExclusion( - atom_pair[0], atom_pair[1] - ) - - # Check if the pair is present in the reverse order and use that if so - elif new_index_atom_pair[::-1] in self._new_system_exceptions: - [chargeProd, sigma, epsilon] = self._new_system_exceptions[ - new_index_atom_pair[::-1] - ] + [chargeProd, sigma, epsilon] = self._new_system_exceptions[new_index_atom_pair] if self._interpolate_14s: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - atom_pair[0], - atom_pair[1], - chargeProd * 0.0, - sigma, - epsilon * 0.0, + self._hybrid_system_forces['standard_nonbonded_force'].addException( + atom_pair[0], atom_pair[1], chargeProd*0.0, + sigma, epsilon*0.0 ) else: - self._hybrid_system_forces["standard_nonbonded_force"].addException( + self._hybrid_system_forces['standard_nonbonded_force'].addException( atom_pair[0], atom_pair[1], chargeProd, sigma, epsilon ) - self._hybrid_system_forces["core_sterics_force"].addExclusion( + self._hybrid_system_forces['core_sterics_force'].addExclusion( atom_pair[0], atom_pair[1] ) - # TODO: work out why there's a bunch of commented out code here # If it's not handled by an exception in the original system, we # just add the regular parameters as an exception - @staticmethod - def _find_exception(force, index1, index2): - """ - Find the exception that corresponds to the given indices in the given - system - - Parameters - ---------- - force : openmm.NonbondedForce object - System containing the exceptions - index1 : int - The index of the first atom (order is unimportant) - index2 : int - The index of the second atom (order is unimportant) - - Returns - ------- - exception_parameters : list - List of exception parameters - """ - index_set = {index1, index2} - - # Loop through the exceptions and try to find one matching the criteria - for exception_idx in range(force.getNumExceptions()): - exception_parameters = force.getExceptionParameters(exception_idx) - if index_set == set(exception_parameters[:2]): - return exception_parameters - return [] def _handle_original_exceptions(self): """ @@ -2129,14 +1979,15 @@ def _handle_original_exceptions(self): present in the hybrid appropriately. """ # Get what we need to find the exceptions from the new and old systems: - old_system_nonbonded_force = self._old_system_forces["NonbondedForce"] - new_system_nonbonded_force = self._new_system_forces["NonbondedForce"] + old_system_nonbonded_force = self._old_system_forces['NonbondedForce'] + new_system_nonbonded_force = self._new_system_forces['NonbondedForce'] hybrid_to_old_map = self._hybrid_to_old_map hybrid_to_new_map = self._hybrid_to_new_map # First, loop through the old system's exceptions and add them to the # hybrid appropriately: for exception_pair, exception_parameters in self._old_system_exceptions.items(): + [index1_old, index2_old] = exception_pair [chargeProd_old, sigma_old, epsilon_old] = exception_parameters @@ -2145,47 +1996,39 @@ def _handle_original_exceptions(self): index2_hybrid = self._old_to_hybrid_map[index2_old] index_set = {index1_hybrid, index2_hybrid} + # In this case, the interaction is only covered by the regular # nonbonded force, and as such will be copied to that force # In the unique-old case, it is handled elsewhere due to internal # peculiarities regarding exceptions - if index_set.issubset(self._atom_classes["environment_atoms"]): - self._hybrid_system_forces["standard_nonbonded_force"].addException( - index1_hybrid, index2_hybrid, chargeProd_old, sigma_old, epsilon_old + if index_set.issubset(self._atom_classes['environment_atoms']): + self._hybrid_system_forces['standard_nonbonded_force'].addException( + index1_hybrid, index2_hybrid, chargeProd_old, + sigma_old, epsilon_old ) - self._hybrid_system_forces["core_sterics_force"].addExclusion( + self._hybrid_system_forces['core_sterics_force'].addExclusion( index1_hybrid, index2_hybrid ) # We have already handled unique old - unique old exceptions - elif ( - len(index_set.intersection(self._atom_classes["unique_old_atoms"])) == 2 - ): + elif len(index_set.intersection(self._atom_classes['unique_old_atoms'])) == 2: continue # Otherwise, check if one of the atoms in the set is in the # unique_old_group and the other is not: - elif ( - len(index_set.intersection(self._atom_classes["unique_old_atoms"])) == 1 - ): + elif len(index_set.intersection(self._atom_classes['unique_old_atoms'])) == 1: if self._interpolate_14s: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - index1_hybrid, - index2_hybrid, - chargeProd_old * 0.0, - sigma_old, - epsilon_old * 0.0, + self._hybrid_system_forces['standard_nonbonded_force'].addException( + index1_hybrid, index2_hybrid, chargeProd_old*0.0, + sigma_old, epsilon_old*0.0 ) else: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - index1_hybrid, - index2_hybrid, - chargeProd_old, - sigma_old, - epsilon_old, + self._hybrid_system_forces['standard_nonbonded_force'].addException( + index1_hybrid, index2_hybrid, chargeProd_old, + sigma_old, epsilon_old ) - self._hybrid_system_forces["core_sterics_force"].addExclusion( + self._hybrid_system_forces['core_sterics_force'].addExclusion( index1_hybrid, index2_hybrid ) @@ -2198,62 +2041,35 @@ def _handle_original_exceptions(self): # First get the new indices. index1_new = hybrid_to_new_map[index1_hybrid] index2_new = hybrid_to_new_map[index2_hybrid] - # Get the exception parameters: - new_exception_parms = self._find_exception( - new_system_nonbonded_force, index1_new, index2_new - ) + # Get the exception parameters: make sure to sort the keys to match how they are stored + new_exception_parms = self._new_system_exceptions.get(self._pair_key(index1_new, index2_new), []) # If there's no new exception, then we should just set the # exception parameters to be the nonbonded parameters if not new_exception_parms: - [ - charge1_new, - sigma1_new, - epsilon1_new, - ] = new_system_nonbonded_force.getParticleParameters(index1_new) - [ - charge2_new, - sigma2_new, - epsilon2_new, - ] = new_system_nonbonded_force.getParticleParameters(index2_new) + [charge1_new, sigma1_new, epsilon1_new] = new_system_nonbonded_force.getParticleParameters(index1_new) + [charge2_new, sigma2_new, epsilon2_new] = new_system_nonbonded_force.getParticleParameters(index2_new) chargeProd_new = charge1_new * charge2_new sigma_new = 0.5 * (sigma1_new + sigma2_new) - epsilon_new = unit.sqrt(epsilon1_new * epsilon2_new) + epsilon_new = unit.sqrt(epsilon1_new*epsilon2_new) else: - [ - index1_new, - index2_new, - chargeProd_new, - sigma_new, - epsilon_new, - ] = new_exception_parms + [chargeProd_new, sigma_new, epsilon_new] = new_exception_parms # Interpolate between old and new - exception_index = self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addException( - index1_hybrid, index2_hybrid, chargeProd_old, sigma_old, epsilon_old + exception_index = self._hybrid_system_forces['standard_nonbonded_force'].addException( + index1_hybrid, index2_hybrid, chargeProd_old, + sigma_old, epsilon_old ) - self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addExceptionParameterOffset( - "lambda_electrostatics_core", - exception_index, - (chargeProd_new - chargeProd_old), - 0, - 0, + self._hybrid_system_forces['standard_nonbonded_force'].addExceptionParameterOffset( + 'lambda_electrostatics_core', exception_index, + (chargeProd_new - chargeProd_old), 0, 0 ) - self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addExceptionParameterOffset( - "lambda_sterics_core", - exception_index, - 0, - (sigma_new - sigma_old), - (epsilon_new - epsilon_old), + self._hybrid_system_forces['standard_nonbonded_force'].addExceptionParameterOffset( + 'lambda_sterics_core', exception_index, 0, + (sigma_new - sigma_old), (epsilon_new - epsilon_old) ) - self._hybrid_system_forces["core_sterics_force"].addExclusion( + self._hybrid_system_forces['core_sterics_force'].addExclusion( index1_hybrid, index2_hybrid ) @@ -2274,41 +2090,34 @@ def _handle_original_exceptions(self): # If it's a subset of unique_new_atoms, then this is an # intra-unique interaction and should have its exceptions # specified in the regular nonbonded force. However, this is - # handled elsewhere as above due to pecularities with exception + # handled elsewhere as above due to peculiarities with exception # handling - if index_set.issubset(self._atom_classes["unique_new_atoms"]): + if index_set.issubset(self._atom_classes['unique_new_atoms']): continue # Look for the final class- interactions between uniquenew-core and # uniquenew-environment. They are treated similarly: they are # simply on and constant the entire time (as a valence term) - elif ( - len(index_set.intersection(self._atom_classes["unique_new_atoms"])) > 0 - ): + elif len(index_set.intersection(self._atom_classes['unique_new_atoms'])) > 0: if self._interpolate_14s: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - index1_hybrid, - index2_hybrid, - chargeProd_new * 0.0, - sigma_new, - epsilon_new * 0.0, + self._hybrid_system_forces['standard_nonbonded_force'].addException( + index1_hybrid, index2_hybrid, chargeProd_new*0.0, + sigma_new, epsilon_new*0.0 ) else: - self._hybrid_system_forces["standard_nonbonded_force"].addException( - index1_hybrid, - index2_hybrid, - chargeProd_new, - sigma_new, - epsilon_new, + self._hybrid_system_forces['standard_nonbonded_force'].addException( + index1_hybrid, index2_hybrid, chargeProd_new, + sigma_new, epsilon_new ) - self._hybrid_system_forces["core_sterics_force"].addExclusion( + self._hybrid_system_forces['core_sterics_force'].addExclusion( index1_hybrid, index2_hybrid ) # However, there may be a core exception that exists in one system # but not the other (ring closure) - elif index_set.issubset(self._atom_classes["core_atoms"]): + elif index_set.issubset(self._atom_classes['core_atoms']): + # Get the old indices try: index1_old = self._new_to_old_map[index1_new] @@ -2318,62 +2127,39 @@ def _handle_original_exceptions(self): # See if it's also in the old nonbonded force. if it is, then we don't need to add it. # But if it's not, we need to interpolate - if not self._find_exception( - old_system_nonbonded_force, index1_old, index2_old - ): - [ - charge1_old, - sigma1_old, - epsilon1_old, - ] = old_system_nonbonded_force.getParticleParameters(index1_old) - [ - charge2_old, - sigma2_old, - epsilon2_old, - ] = old_system_nonbonded_force.getParticleParameters(index2_old) - - chargeProd_old = charge1_old * charge2_old + old_exception_parms = self._old_system_exceptions.get(self._pair_key(index1_old, index2_old), []) + if not old_exception_parms: + + [charge1_old, sigma1_old, epsilon1_old] = old_system_nonbonded_force.getParticleParameters(index1_old) + [charge2_old, sigma2_old, epsilon2_old] = old_system_nonbonded_force.getParticleParameters(index2_old) + + chargeProd_old = charge1_old*charge2_old sigma_old = 0.5 * (sigma1_old + sigma2_old) - epsilon_old = unit.sqrt(epsilon1_old * epsilon2_old) - - exception_index = self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addException( - index1_hybrid, - index2_hybrid, - chargeProd_old, - sigma_old, - epsilon_old, - ) + epsilon_old = unit.sqrt(epsilon1_old*epsilon2_old) - self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addExceptionParameterOffset( - "lambda_electrostatics_core", - exception_index, - (chargeProd_new - chargeProd_old), - 0, - 0, + exception_index = self._hybrid_system_forces['standard_nonbonded_force'].addException( + index1_hybrid, index2_hybrid, + chargeProd_old, sigma_old, + epsilon_old) + + self._hybrid_system_forces['standard_nonbonded_force'].addExceptionParameterOffset( + 'lambda_electrostatics_core', exception_index, + (chargeProd_new - chargeProd_old), 0, 0 ) - self._hybrid_system_forces[ - "standard_nonbonded_force" - ].addExceptionParameterOffset( - "lambda_sterics_core", - exception_index, - 0, - (sigma_new - sigma_old), - (epsilon_new - epsilon_old), + self._hybrid_system_forces['standard_nonbonded_force'].addExceptionParameterOffset( + 'lambda_sterics_core', exception_index, 0, + (sigma_new - sigma_old), (epsilon_new - epsilon_old) ) - self._hybrid_system_forces["core_sterics_force"].addExclusion( + self._hybrid_system_forces['core_sterics_force'].addExclusion( index1_hybrid, index2_hybrid ) def _handle_old_new_exceptions(self): """ Find the exceptions associated with old-old and old-core interactions, - as well as new-new and new-core interactions. Theses exceptions will + as well as new-new and new-core interactions. These exceptions will be placed in CustomBondedForce that will interpolate electrostatics and a softcore potential. @@ -2386,46 +2172,36 @@ def _handle_old_new_exceptions(self): if self._softcore_LJ_v2: old_new_nonbonded_exceptions += "U_sterics = select(step(r - r_LJ), 4*epsilon*x*(x-1.0), U_sterics_quad);" - old_new_nonbonded_exceptions += f"U_sterics_quad = Force*(((r - r_LJ)^2)/2 - (r - r_LJ)) + U_sterics_cut;" - old_new_nonbonded_exceptions += ( - f"U_sterics_cut = 4*epsilon*((sigma/r_LJ)^6)*(((sigma/r_LJ)^6) - 1.0);" - ) - old_new_nonbonded_exceptions += ( - f"Force = -4*epsilon*((-12*sigma^12)/(r_LJ^13) + (6*sigma^6)/(r_LJ^7));" - ) - old_new_nonbonded_exceptions += f"x = (sigma/r)^6;" - old_new_nonbonded_exceptions += f"r_LJ = softcore_alpha*((26/7)*(sigma^6)*lambda_sterics_deprecated)^(1/6);" - old_new_nonbonded_exceptions += f"lambda_sterics_deprecated = new_interaction*(1.0 - lambda_sterics_insert) + old_interaction*lambda_sterics_delete;" + old_new_nonbonded_exceptions += "U_sterics_quad = Force*(((r - r_LJ)^2)/2 - (r - r_LJ)) + U_sterics_cut;" + old_new_nonbonded_exceptions += "U_sterics_cut = 4*epsilon*((sigma/r_LJ)^6)*(((sigma/r_LJ)^6) - 1.0);" + old_new_nonbonded_exceptions += "Force = -4*epsilon*((-12*sigma^12)/(r_LJ^13) + (6*sigma^6)/(r_LJ^7));" + old_new_nonbonded_exceptions += "x = (sigma/r)^6;" + old_new_nonbonded_exceptions += "r_LJ = softcore_alpha*((26/7)*(sigma^6)*lambda_sterics_deprecated)^(1/6);" + old_new_nonbonded_exceptions += "lambda_sterics_deprecated = new_interaction*(1.0 - lambda_sterics_insert) + old_interaction*lambda_sterics_delete;" else: - old_new_nonbonded_exceptions += ( - "U_sterics = 4*epsilon*x*(x-1.0); x = (sigma/reff_sterics)^6;" - ) + old_new_nonbonded_exceptions += "U_sterics = 4*epsilon*x*(x-1.0); x = (sigma/reff_sterics)^6;" old_new_nonbonded_exceptions += "reff_sterics = sigma*((softcore_alpha*lambda_alpha + (r/sigma)^6))^(1/6);" - old_new_nonbonded_exceptions += "reff_sterics = sigma*((softcore_alpha*lambda_alpha + (r/sigma)^6))^(1/6);" # effective softcore distance for sterics + old_new_nonbonded_exceptions += "reff_sterics = sigma*((softcore_alpha*lambda_alpha + (r/sigma)^6))^(1/6);" # effective softcore distance for sterics old_new_nonbonded_exceptions += "lambda_alpha = new_interaction*(1-lambda_sterics_insert) + old_interaction*lambda_sterics_delete;" old_new_nonbonded_exceptions += "U_electrostatics = (lambda_electrostatics_insert * unique_new + unique_old * (1 - lambda_electrostatics_delete)) * ONE_4PI_EPS0*chargeProd/r;" old_new_nonbonded_exceptions += "ONE_4PI_EPS0 = %f;" % ONE_4PI_EPS0 - old_new_nonbonded_exceptions += "epsilon = (1-lambda_sterics)*epsilonA + lambda_sterics*epsilonB;" # interpolation - old_new_nonbonded_exceptions += ( - "sigma = (1-lambda_sterics)*sigmaA + lambda_sterics*sigmaB;" - ) + old_new_nonbonded_exceptions += "epsilon = (1-lambda_sterics)*epsilonA + lambda_sterics*epsilonB;" # interpolation + old_new_nonbonded_exceptions += "sigma = (1-lambda_sterics)*sigmaA + lambda_sterics*sigmaB;" old_new_nonbonded_exceptions += "lambda_sterics = new_interaction*lambda_sterics_insert + old_interaction*lambda_sterics_delete;" old_new_nonbonded_exceptions += "new_interaction = delta(1-unique_new); old_interaction = delta(1-unique_old);" + nonbonded_exceptions_force = openmm.CustomBondForce( - old_new_nonbonded_exceptions - ) + old_new_nonbonded_exceptions) name = f"{nonbonded_exceptions_force.__class__.__name__}_exceptions" nonbonded_exceptions_force.setName(name) self._hybrid_system.addForce(nonbonded_exceptions_force) # For reference, set name in force dict - self._hybrid_system_forces["old_new_exceptions_force"] = ( - nonbonded_exceptions_force - ) + self._hybrid_system_forces['old_new_exceptions_force'] = nonbonded_exceptions_force if self._softcore_LJ_v2: nonbonded_exceptions_force.addGlobalParameter( @@ -2445,31 +2221,29 @@ def _handle_old_new_exceptions(self): "lambda_electrostatics_delete", 0.0 ) # sterics insert - nonbonded_exceptions_force.addGlobalParameter("lambda_sterics_insert", 0.0) + nonbonded_exceptions_force.addGlobalParameter( + "lambda_sterics_insert", 0.0 + ) # steric delete - nonbonded_exceptions_force.addGlobalParameter("lambda_sterics_delete", 0.0) - - for parameter in [ - "chargeProd", - "sigmaA", - "epsilonA", - "sigmaB", - "epsilonB", - "unique_old", - "unique_new", - ]: + nonbonded_exceptions_force.addGlobalParameter( + "lambda_sterics_delete", 0.0 + ) + + for parameter in ['chargeProd','sigmaA', 'epsilonA', 'sigmaB', + 'epsilonB', 'unique_old', 'unique_new']: nonbonded_exceptions_force.addPerBondParameter(parameter) # Prepare for exceptions loop by grabbing nonbonded forces, # hybrid_to_old/new maps - old_system_nonbonded_force = self._old_system_forces["NonbondedForce"] - new_system_nonbonded_force = self._new_system_forces["NonbondedForce"] + old_system_nonbonded_force = self._old_system_forces['NonbondedForce'] + new_system_nonbonded_force = self._new_system_forces['NonbondedForce'] hybrid_to_old_map = self._hybrid_to_old_map hybrid_to_new_map = self._hybrid_to_new_map # First, loop through the old system's exceptions and add them to the # hybrid appropriately: for exception_pair, exception_parameters in self._old_system_exceptions.items(): + [index1_old, index2_old] = exception_pair [chargeProd_old, sigma_old, epsilon_old] = exception_parameters @@ -2480,30 +2254,21 @@ def _handle_old_new_exceptions(self): # Otherwise, check if one of the atoms in the set is in the # unique_old_group and the other is not: - if len( - index_set.intersection(self._atom_classes["unique_old_atoms"]) - ) > 0 and ( - chargeProd_old.value_in_unit_system(unit.md_unit_system) != 0.0 - or epsilon_old.value_in_unit_system(unit.md_unit_system) != 0.0 - ): + if (len(index_set.intersection(self._atom_classes['unique_old_atoms'])) > 0 and + (chargeProd_old.value_in_unit_system(unit.md_unit_system) != 0.0 or + epsilon_old.value_in_unit_system(unit.md_unit_system) != 0.0)): if self._interpolate_14s: # If we are interpolating 1,4s, then we anneal this term # off; otherwise, the exception force is constant and # already handled in the standard nonbonded force nonbonded_exceptions_force.addBond( - index1_hybrid, - index2_hybrid, - [ - chargeProd_old, - sigma_old, - epsilon_old, - sigma_old, - epsilon_old * 0.0, - 1, - 0, - ], + index1_hybrid, index2_hybrid, + [chargeProd_old, sigma_old, epsilon_old, sigma_old, + epsilon_old*0.0, 1, 0] ) + + # Next, loop through the new system's exceptions and add them to the # hybrid appropriately for exception_pair, exception_parameters in self._new_system_exceptions.items(): @@ -2520,28 +2285,17 @@ def _handle_old_new_exceptions(self): # uniquenew-environment. They are treated # similarly: they are simply on and constant the entire time # (as a valence term) - if len( - index_set.intersection(self._atom_classes["unique_new_atoms"]) - ) > 0 and ( - chargeProd_new.value_in_unit_system(unit.md_unit_system) != 0.0 - or epsilon_new.value_in_unit_system(unit.md_unit_system) != 0.0 - ): + if (len(index_set.intersection(self._atom_classes['unique_new_atoms'])) > 0 and + (chargeProd_new.value_in_unit_system(unit.md_unit_system) != 0.0 or + epsilon_new.value_in_unit_system(unit.md_unit_system) != 0.0)): if self._interpolate_14s: # If we are interpolating 1,4s, then we anneal this term # on; otherwise, the exception force is constant and # already handled in the standard nonbonded force nonbonded_exceptions_force.addBond( - index1_hybrid, - index2_hybrid, - [ - chargeProd_new, - sigma_new, - epsilon_new * 0.0, - sigma_new, - epsilon_new, - 0, - 1, - ], + index1_hybrid, index2_hybrid, + [chargeProd_new, sigma_new, epsilon_new*0.0, + sigma_new, epsilon_new, 0, 1] ) def _compute_hybrid_positions(self): @@ -2551,7 +2305,8 @@ def _compute_hybrid_positions(self): The positions are assigned by first copying all the mapped positions from the old system in, then copying the mapped positions from the new system. This means that there is an - assumption that the positions common to old and new are the same. + assumption that the positions common to old and new are the same + (which is the case for perses as-is). Returns ------- @@ -2560,11 +2315,9 @@ def _compute_hybrid_positions(self): """ # Get unitless positions old_pos_without_units = np.array( - self._old_positions.value_in_unit(unit.nanometer) - ) + self._old_positions.value_in_unit(unit.nanometer)) new_pos_without_units = np.array( - self._new_positions.value_in_unit(unit.nanometer) - ) + self._new_positions.value_in_unit(unit.nanometer)) # Determine the number of particles in the system n_atoms_hybrid = self._hybrid_system.getNumParticles() @@ -2602,7 +2355,7 @@ def _create_mdtraj_topology(self): hybrid_topology = copy.deepcopy(old_top) - added_atoms = {} + added_atoms = dict() # Get the core atoms in the new index system (as opposed to the hybrid # index system). We will need this later @@ -2626,15 +2379,13 @@ def _create_mdtraj_topology(self): # in the "core" category, since they are mapped and part of a # changing residue mapped_new_atom_indices = core_atoms_new_indices.intersection( - new_system_atom_set - ) + new_system_atom_set) # Now get the old indices of the above atoms so that we can find # the appropriate residue in the old system for this we can use the # new to old atom map - mapped_old_atom_indices = [ - self._new_to_old_map[atom_idx] for atom_idx in mapped_new_atom_indices - ] + mapped_old_atom_indices = [self._new_to_old_map[atom_idx] for + atom_idx in mapped_new_atom_indices] # We can just take the first one--they all have the same residue first_mapped_old_atom_index = mapped_old_atom_indices[0] @@ -2642,27 +2393,27 @@ def _create_mdtraj_topology(self): # Get the atom object corresponding to this index from the hybrid # (which is a deepcopy of the old) mapped_hybrid_system_atom = hybrid_topology.atom( - first_mapped_old_atom_index - ) + first_mapped_old_atom_index) # Get the residue that is relevant to this atom mapped_residue = mapped_hybrid_system_atom.residue # Add the atom using the mapped residue added_atoms[new_particle_hybrid_idx] = hybrid_topology.add_atom( - new_system_atom.name, new_system_atom.element, mapped_residue - ) + new_system_atom.name, + new_system_atom.element, + mapped_residue) # Now loop through the bonds in the new system, and if the bond # contains a unique new atom, then add it to the hybrid topology - for atom1, atom2 in new_top.bonds: + for (atom1, atom2) in new_top.bonds: at1_hybrid_idx = self._new_to_hybrid_map[atom1.index] at2_hybrid_idx = self._new_to_hybrid_map[atom2.index] # If at least one atom is in the unique new class, we need to add # it to the hybrid system - at1_uniq = at1_hybrid_idx in self._atom_classes["unique_new_atoms"] - at2_uniq = at2_hybrid_idx in self._atom_classes["unique_new_atoms"] + at1_uniq = at1_hybrid_idx in self._atom_classes['unique_new_atoms'] + at2_uniq = at2_hybrid_idx in self._atom_classes['unique_new_atoms'] if at1_uniq or at2_uniq: if at1_uniq: atom1_to_bond = added_atoms[at1_hybrid_idx] @@ -2680,6 +2431,7 @@ def _create_mdtraj_topology(self): return hybrid_topology + def _create_hybrid_topology(self): """ Create a hybrid openmm.app.Topology from the input old and new @@ -2728,9 +2480,12 @@ def _create_hybrid_topology(self): ) prev_res = at.residue - hybrid_atom = hybrid_top.addAtom(at.name, at.element, hybrid_residue, at.id) + hybrid_atom = hybrid_top.addAtom( + at.name, at.element, hybrid_residue, at.id + ) # Next we deal with bonds + # loop over the topology atoms once to avoid repeated calls hybrid_top_atom_list = list(hybrid_top.atoms()) # First we add in all the old topology bonds for bond in self._old_topology.bonds(): @@ -2740,8 +2495,7 @@ def _create_hybrid_topology(self): hybrid_top.addBond( hybrid_top_atom_list[at1], hybrid_top_atom_list[at2], - bond.type, - bond.order, + bond.type, bond.order, ) # Finally we add in all the bonds from the unique atoms in the @@ -2749,14 +2503,12 @@ def _create_hybrid_topology(self): for bond in self._new_topology.bonds(): at1 = self.new_to_hybrid_atom_map[bond.atom1.index] at2 = self.new_to_hybrid_atom_map[bond.atom2.index] - if (at1 in self._atom_classes["unique_new_atoms"]) or ( - at2 in self._atom_classes["unique_new_atoms"] - ): + if ((at1 in self._atom_classes['unique_new_atoms']) or + (at2 in self._atom_classes['unique_new_atoms'])): hybrid_top.addBond( hybrid_top_atom_list[at1], hybrid_top_atom_list[at2], - bond.type, - bond.order, + bond.type, bond.order, ) return hybrid_top @@ -2779,8 +2531,10 @@ def old_positions(self, hybrid_positions): n_atoms_old = self._old_system.getNumParticles() # making sure hybrid positions are simtk.unit.Quantity objects if not isinstance(hybrid_positions, unit.Quantity): - hybrid_positions = unit.Quantity(hybrid_positions, unit=unit.nanometer) - old_positions = unit.Quantity(np.zeros([n_atoms_old, 3]), unit=unit.nanometer) + hybrid_positions = unit.Quantity(hybrid_positions, + unit=unit.nanometer) + old_positions = unit.Quantity(np.zeros([n_atoms_old, 3]), + unit=unit.nanometer) for idx in range(n_atoms_old): hyb_idx = self._old_to_hybrid_map[idx] old_positions[idx, :] = hybrid_positions[hyb_idx, :] @@ -2804,8 +2558,10 @@ def new_positions(self, hybrid_positions): n_atoms_new = self._new_system.getNumParticles() # making sure hybrid positions are simtk.unit.Quantity objects if not isinstance(hybrid_positions, unit.Quantity): - hybrid_positions = unit.Quantity(hybrid_positions, unit=unit.nanometer) - new_positions = unit.Quantity(np.zeros([n_atoms_new, 3]), unit=unit.nanometer) + hybrid_positions = unit.Quantity(hybrid_positions, + unit=unit.nanometer) + new_positions = unit.Quantity(np.zeros([n_atoms_new, 3]), + unit=unit.nanometer) for idx in range(n_atoms_new): hyb_idx = self._new_to_hybrid_map[idx] new_positions[idx, :] = hybrid_positions[hyb_idx, :] @@ -2907,34 +2663,4 @@ def has_virtual_sites(self): for ix in range(self._hybrid_system.getNumParticles()): if self._hybrid_system.isVirtualSite(ix): return True - return False - - @property - def initial_atom_indices(self): - """ - Indices of atoms in hybrid topology for atoms in the old topology. - - Returns - ------- - hybrid_indices: list[int] - Indices of old atoms in hybrid topology - - """ - n_atoms_old = self._old_system.getNumParticles() - hybrid_indices = [self._old_to_hybrid_map[idx] for idx in range(n_atoms_old)] - return hybrid_indices - - @property - def final_atom_indices(self): - """ - Indices of atoms in hybrid topology for atoms in the new topology. - - Returns - ------- - hybrid_indices: list[int] - Indices of new atoms in hybrid topology - - """ - n_atoms_new = self._new_system.getNumParticles() - hybrid_indices = [self._new_to_hybrid_map[idx] for idx in range(n_atoms_new)] - return hybrid_indices + return False \ No newline at end of file From 164711c39126b6c02b85d9fe15ee669809835fa2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 19 Jun 2026 11:11:17 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- feflow/utils/hybrid_topology.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feflow/utils/hybrid_topology.py b/feflow/utils/hybrid_topology.py index 6352a927..562f2934 100644 --- a/feflow/utils/hybrid_topology.py +++ b/feflow/utils/hybrid_topology.py @@ -2663,4 +2663,4 @@ def has_virtual_sites(self): for ix in range(self._hybrid_system.getNumParticles()): if self._hybrid_system.isVirtualSite(ix): return True - return False \ No newline at end of file + return False