#
# University of Illinois Open Source License
# Copyright 2016-2018 Luthey-Schulten Group,
# All rights reserved.
#
# Developed by: Luthey-Schulten Group
# University of Illinois at Urbana-Champaign
# http://www.scs.uiuc.edu/~schulten
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the Software), to deal with
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to
# do so, subject to the following conditions:
#
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimers.
#
# - Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimers in the documentation
# and/or other materials provided with the distribution.
#
# - Neither the names of the Luthey-Schulten Group, University of Illinois at
# Urbana-Champaign, nor the names of its contributors may be used to endorse or
# promote products derived from this Software without specific prior written
# permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS WITH THE SOFTWARE.
#
# Author(s): Tyler M. Earnest, Zane Thornburg
#
"""Spatial model"""
import h5py
import atexit
import numpy as np
from functools import partial
try:
from tqdm import tqdm
use_tqdm = True
except ImportError:
use_tqdm = False
from . import BaseTypes as BT
from . import Types as T
from . import JupyterDisplay as JD
from . import LmInteract as LM
from . import Analysis as AN
def _idxOrNone(x):
if x is None:
return x
else:
return x.idx
def _copyDoc(method):
def _doc(func):
func.__doc__ = method.__doc__
return func
return _doc
[docs]
class SpatialModel(AN.LatticeAnalysisMixin, JD.JupyterDisplayMixin):
"""Base RDME model """
@property
def simulationTime(self):
"""Total simulated time"""
return getattr(self, "_simulationTime", None)
@simulationTime.setter
def simulationTime(self, val):
setattr(self, "_simulationTime", val)
@property
def speciesWriteInterval(self):
"""Simulation time between writing the species counts to the HDF5 file"""
return getattr(self, "_speciesWriteInterval", None)
@speciesWriteInterval.setter
def speciesWriteInterval(self, val):
setattr(self, "_speciesWriteInterval", val)
@property
def latticeWriteInterval(self):
"""Simulation time between writing the lattice to the HDF5 file"""
return getattr(self, "_latticeWriteInterval", None)
@latticeWriteInterval.setter
def latticeWriteInterval(self, val):
setattr(self, "_latticeWriteInterval", val)
@property
def perfPrintInterval(self):
"""Real elapsed time between writing simulation progress to stdout"""
if not hasattr(self, "_perfPrintInterval"):
self._perfPrintInterval = 60
return getattr(self, "_perfPrintInterval", None)
@perfPrintInterval.setter
def perfPrintInterval(self, val):
setattr(self, "_perfPrintInterval", val)
@property
def hookInterval(self):
"""Simulation time between calls to hookSimulation"""
return getattr(self, "_hookInterval", None)
@hookInterval.setter
def hookInterval(self, val):
setattr(self, "_hookInterval", val)
def __init__(self, name, filename, dimensions, latticeSpacing, latticeType=None ):
self.name = name #: str: Simulation name
self.filename = filename #: str: LM data filename
self.latticeType = latticeType #: str: Lattice type (Int or Byte)
self.NA = 6.02214085774e23 #: float: Avogadro's number
self._particlePlacement = [] # [ (x,y,z,spid), ... ]
self._particleDistCount = [] # [ (regid,spid, count), ... ]
self._particleDistConc = [] # [ (regid,spid, conc), ... ]
self._transitionRates = [] # [ (sp, from, to, rateId), ...] where {sp,from,to} = None|id, None means all
self._reactionLocations = [] # [ (rxid, regid), ...]
self.resizeLattice(dimensions, latticeSpacing, latticeType=latticeType)
[docs]
def resizeLattice(self, dimensions, latticeSpacing, latticeType=None):
import lm
"""Resize lattice
Args:
dimensions (int,int,int):
New lattice dimensions
latticeSpacing (float):
Lattice spacing in meters
latticeType (str):
"Byte" / "Int"
"""
nx,ny,nz = dimensions
self.siteV = 1000 * latticeSpacing**3 #: float: Subvolume size in liters
self.siteNAV = self.siteV * self.NA #: float: Conversion factor from concentration to site occupancy
self.shape = nz,ny,nx #: (int, int, int): Site lattice dimensions
self.latticeSpacing = latticeSpacing #: float: Lattice spacing in meters
self.pps = lm.getCompiledLatticeMaxOccupancy() #: int: Particles per site
if latticeType == 'Int':
self.lattice = lm.IntLattice(nz,ny,nx, latticeSpacing, self.pps) #: :py:class:`lm.IntLattice`: LM lattice object
else:
self.lattice = lm.ByteLattice(nz,ny,nx, latticeSpacing, self.pps) #: :py:class:`lm.ByteLattice`: LM lattice object
self.siteLattice = self.lattice.getSiteLatticeView() #: :py:class:`~numpy.ndarray`: NumPy view of site lattice
self.particleLattice = self.lattice.getParticleLatticeView() #: :py:class:`~numpy.ndarray`: NumPy view of particle lattice
self.maxConcentration = self.pps/self.siteNAV #: float: Concentration of a fully packed lattice
[docs]
def placeNumber(self, sp, x, y, z, n):
"""Place a number of particles at a specific subvolume
Args:
sp (:py:class:`~jLM.Types.Species`):
Species type
x (int):
x-coordinate
y (int):
y-coordinate
z (int):
z-coordinate
n (int):
Number of particles
"""
x,y,z,n = map(int, (x,y,z,n))
for _ in range(n):
self._particlePlacement.append(( z, y, x, sp.idx))
[docs]
def distributeNumber(self, sp, reg, count):
"""Distribute a number of particles uniformly through a region
Args:
sp (:py:class:`~jLM.Types.Species`]):
Species type
reg (:py:class:`~jLM.Types.Region`]):
Region type
count (int):
Number of particles
"""
count = int(count)
cs = self._particleDistCount
cs = [x for x in cs if not (x[0]==reg.idx and x[1]==sp.idx)]
cs.append((reg.idx, sp.idx, count))
self._particleDistCount = cs
[docs]
def distributeConcentration(self, sp, reg, conc):
"""Distribute a concentration of particles uniformly through a region
Args:
sp (:py:class:`~jLM.Types.Species`]):
Species type
reg (:py:class:`~jLM.Types.Region`]):
Region type
conc (float):
Concentration of particles
"""
cs = self._particleDistConc
cs = [x for x in cs if not (x[0]==reg.idx and x[1]==sp.idx)]
cs.append((reg.idx, sp.idx, conc))
self._particleDistConc = cs
[docs]
def transitionRate(self, sp, rFrom, rTo, rate, value=None):
"""Define the diffusive transition rate between regions
This allows for fine grained detail over the diffusion matrix.
The transition is defined for the species `sp` from region
`rFrom` to `rTo`. If `None` is given for any of these
parameters, then the entire axis of the matrix is affected,
i.e. `None` is a wildcard. The rate is specified with the
`rate` parameter, either as a DiffusionConst instance, or as
a string to lookup the previously created DiffusionConst. If a
string is provided and a DiffusionConst instance has not been
created, providing the parameter `value` will create the new
constant.
Args:
sp (:py:class:`~jLM.Types.Species`]):
Species type or None
rFrom (:py:class:`~jLM.Types.Region`]):
Region type or None
rTo (:py:class:`~jLM.Types.Region`]):
Region type or None
rate (:py:class:`~jLM.Types.DiffusionConst`]):
Diffusion rate
value (float):
Value of diffusion constant if new
"""
if not isinstance(rate, T.DiffusionConst):
if isinstance(rate, str) and isinstance(value, float):
rate = self.diffusionConst(rate, value)
else:
raise TypeError("rate")
self._transitionRates.append( (_idxOrNone(sp), _idxOrNone(rFrom), _idxOrNone(rTo), rate.idx) )
[docs]
def assignReaction(self, reaction, region):
"""Assign a reaction to a region
Args:
reaction (:py:class:`~jLM.Types.Reaction`):
Reaction
region (:py:class:`~jLM.Types.Region`):
Target region
"""
e = (reaction.idx, region.idx)
if e not in self._reactionLocations:
self._reactionLocations.append( (reaction.idx, region.idx) )
[docs]
def species(self, name, **kwargs):
"""Lookup/create species instance
Args:
name (str):
Name of the species
texRepr (str):
TeX math mode representation of the species
annotation (str):
Description text
Returns:
:py:class:`~jLM.Types.Species`:
Species instance
"""
return self.speciesList.get(name, **kwargs)
[docs]
def region(self, name, **kwargs):
"""Lookup/create region instance
Args:
name (str):
Name of the region
texRepr (str):
TeX math mode representation of the region
annotation (str):
Description text
Returns:
:py:class:`~jLM.Types.Region`:
Region instance
"""
return self.regionList.get(name, **kwargs)
[docs]
def reaction(self, reactants, products, rate, value=None, regions=None, **kwargs):
r"""Lookup/create reaction instance
The product/reactants can be specified as a string, Species
instance, list of strings, list of Species instances, or the
empty list, denoting no species. The reaction rate can be
created in place if a string is given for `rate` and the value
of the rate constant is specified by `value`. Rates must be
provided in standard chemical kinetic units, e.g. for a reaction
.. math:: \mathrm{A} + \mathrm{B} \overset{k}{\to} \mathrm{C},
the rate coefficent, :math:`k` has units of
:math:`\mathrm{M}^{-1}\mathrm{s}^{-1}`
Args:
reactants ([:py:class:`~jLM.Types.Species`]):
Species, or list of species acting as reactants
products ([:py:class:`~jLM.Types.Species`]):
Species, or list of species acting as products
rate ([:py:class:`~jLM.Types.RateConst`]):
Rate constant
value (float):
Optional value of rate constant
regions (py:class:`~jLM.Types.Region`]):
List of applicable regions
Returns:
:py:class:`~jLM.Types.Reaction`:
Reaction instance
"""
if not isinstance(rate, T.RateConst):
if isinstance(rate, str) and isinstance(value, float):
rate = self.rateConst(rate, value, len(reactants))
else:
raise TypeError("rate")
rxn = self.reactionList.get(reactants, products, rate, **kwargs)
if regions is not None:
if not isinstance(regions, list):
regions = [regions]
for reg in regions:
reg = self.region(reg) if isinstance(reg, str) else reg
self.assignReaction(rxn, reg)
return rxn
[docs]
def rateConst(self, rate, value, order, **kwargs):
"""Lookup/create reaction rate constant instance
Args:
rate (str):
Name of rate constant
value (float):
Value of rate constant
order (int):
Order of reaction
texRepr (str):
TeX math mode representation of the rate constant
annotation (str):
Description text
Returns:
:py:class:`~jLM.Types.RateConst`:
RateConst instance
"""
return self.rxnRateList.get(rate, value, order, **kwargs)
[docs]
def diffusionConst(self, rate, value, **kwargs):
"""Lookup/create diffusion constant instance
Args:
rate (str):
Name of diffusion constant
value (float):
Value of diffusion constant
texRepr (str):
TeX math mode representation of the rate constant
annotation (str):
Description text
Returns:
:py:class:`~jLM.Types.DiffusionConst`:
DiffusionConst instance
"""
return self.diffRateList.get(rate, value, **kwargs)
@property
def diffusionZero(self):
"""Lookup/create a zero-valued diffusion constant instance
Returns:
:py:class:`~jLM.Types.DiffusionConst`:
DiffusionConst instance
"""
return self.diffRateList.get('0', 0.0, texRepr=r'D_\varnothing')
[docs]
def maxDiffusionRate(self, latticeSpacing=None, dt=None):
"""Compute max allowed diffusion constant for the simulation
Args:
latticeSpacing (float):
Lattice spacing in meters
dt (float):
Timestep in seconds
Returns:
float:
Maximum diffusion constant in m^/s
"""
if dt is None:
dt = self.timestep
if latticeSpacing is None:
latticeSpacing = self.latticeSpacing
return self.latticeSpacing**2/6/dt
@property
def diffusionFast(self):
"""Lookup/create a maximum-valued diffusion constant instance
Returns:
:py:class:`~jLM.Types.DiffusionConst`:
DiffusionConst instance
"""
return self.diffRateList.get("fast", self.maxDiffusionRate(), texRepr=r'D_\infty')
[docs]
def setMaximumTimestep(self):
"""Set the simulation timestep using the fastest diffusion rate"""
v = 0
for r in self.diffRateList:
v = max(v, r.value)
if v <= 0:
raise RuntimeError("Define diffusion constants prior to calling")
else:
return (self.latticeSpacing)**2 / (6*v)
[docs]
def run(self, solver = None, replicate = 1, seed = None,
cudaDevices = None, checkpointInterval = 0,
sample_frame = False, max_frames = 100):
"""Run the RDME simulation
Args:
solver (:py:class:`lm.RDMESolver`):
Rdme solver
replicate (int):
Index of replicate
seed (int):
RNG seed
cudaDevices ([int]):
List of CUDA device indexes
checkpointInterval (int):
Number of seconds between checkpoints
sample_frame (bool):
Sample frames or not
max_frames (int):
Number of frames after sampling
Returns:
:py:class:`jLM.File`:
Simulation result
"""
import lm
solver = solver or lm.MpdRdmeSolver()
cudaDevices = cudaDevices or [0]
if seed is not None:
f = lm.simulationFile(self.filename)
f.setParameter("seed", str(seed))
f.close()
lm.runSolver(self.filename, replicate, solver, cudaDevices, int(checkpointInterval))
return File(self.filename,
latticeType = self.latticeType,
sample_frame = sample_frame,
max_frames = max_frames)
[docs]
class Sim(LM.LmWriteMixin, SpatialModel):
"""Define and run an RDME simulation"""
[docs]
@_copyDoc(AN.LatticeAnalysisMixin.particleStatistics)
def particleStatistics(self, particleLattice=None, siteLattice=None):
self._placeAllParticles()
return super().particleStatistics(particleLattice,siteLattice)
def __init__(self, name, filename, dimensions, latticeSpacing, regionName, latticeType=None, dt=None):
"""Create new RDME object
Args:
name (str):
Name of simulation
filename (str):
LM data filename
dimensions ((nx,ny,nz)):
Dimensions of lattice
latticeSpacing (float):
Lattice spacing in meters
regionName (str):
Name of the default region
latticeType (str):
"Byte" / "Int"
dt (float):
Timestep
"""
super().__init__(name, filename, dimensions, latticeSpacing, latticeType=latticeType)
self.speciesList = BT.SimObjs(self, T.BuilderSpecies, idbase=1) #: :py:class:`~jLM.BaseTypes.SimObjs`: List of species types
self.regionList = BT.SimObjs(self, T.BuilderRegion) #: :py:class:`~jLM.BaseTypes.SimObjs`: List of regions
self.reactionList = BT.SimObjs(self, T.BuilderReaction) #: :py:class:`~jLM.BaseTypes.SimObjs`: List of reactions
self.rxnRateList = BT.SimObjs(self, T.RateConst) #: :py:class:`~jLM.BaseTypes.SimObjs`: List of reaction rates
self.diffRateList = BT.SimObjs(self, T.DiffusionConst) #: :py:class:`~jLM.BaseTypes.SimObjs`: List of diffusion constants
self.sp = self.speciesList.getAutoNamespace() #: :py:class:`~jLM.BaseTypes.Namespace`: Convienient species access
self.reg = self.regionList.getAutoNamespace() #: :py:class:`~jLM.BaseTypes.Namespace`: Convienient region access
self.rc = self.rxnRateList.getAutoNamespace() #: :py:class:`~jLM.BaseTypes.Namespace`: Convienient reaction rate access
self.dc = self.diffRateList.getAutoNamespace() #: :py:class:`~jLM.BaseTypes.Namespace`: Convienient diffusion constant access
self._particlesPlaced = False
self.filename = filename
if latticeType == 'Int':
self.bytesPerParticle = 4
else:
self.bytesPerParticle = 1
self.region(regionName)
if dt is not None:
self.timestep = dt
[docs]
class File(AN.TrajAnalysisMixin, JD.FileJupyterMixin, LM.LmReadMixin, SpatialModel):
"""Load a previously defined simulation"""
def __init__(self, fname,
replicate = 1,
latticeType = None,
sample_frame = False,
max_frames = 100):
"""Load a RDME simulation file
Args:
fname (str):
LM data filename
replicate (int):
Replicate to load initially
latticeType (str):
"Byte" / "Int"
sample_frame (bool):
Sample frames or not
max_frames (int):
Number of frames after sampling
Note:
jLM includes some metadata in the Lattice Microbes HDF5 file which
is necessary to reload the model.
"""
# self.vmd_precomp(fname)
self.h5 = h5py.File(fname, "r")
name = self.h5['Model'].attrs['name'].decode("ascii")
latticeSpacing = float(self.h5['Model/Diffusion'].attrs['latticeSpacing'])
naturalDims = (int(self.h5['Model/Diffusion'].attrs['latticeZSize']),
int(self.h5['Model/Diffusion'].attrs['latticeYSize']),
int(self.h5['Model/Diffusion'].attrs['latticeXSize']))
super().__init__(name, fname, naturalDims, latticeSpacing, latticeType = latticeType)
self.speciesList = BT.SimObjs(self, T.TrajSpecies, idbase = 1)
self.regionList = BT.SimObjs(self, T.TrajRegion)
self.reactionList = BT.SimObjs(self, T.Reaction)
self.rxnRateList = BT.SimObjs(self, T.RateConst)
self.diffRateList = BT.SimObjs(self, T.DiffusionConst)
self.sp = self.speciesList.getAutoNamespace()
self.reg = self.regionList.getAutoNamespace()
self.rc = self.rxnRateList.getAutoNamespace()
self.dc = self.diffRateList.getAutoNamespace()
self.counts = dict()
self._loadFromH5(replicate)
if (sample_frame == True):
self.reduce_trajectory_file(fname, max_frames)
[docs]
def vmd_precomp(self, file_name):
hdf_file = None
with h5py.File(file_name, 'r') as traj_file:
numberSpecies = int(traj_file['Model']['Reaction'].attrs['numberSpecies'])
numberSiteTypes = int(traj_file['Model']['Diffusion'].attrs['numberSiteTypes'])
maxTime = float(traj_file['Parameters'].attrs['maxTime'])
timestep = float(traj_file['Parameters'].attrs['timestep'])
latticeWriteInterval = float(traj_file['Parameters'].attrs['latticeWriteInterval'])
frames = int(maxTime / (latticeWriteInterval * timestep))
siteTypesLattice = np.array(traj_file['Model']['Diffusion']['LatticeSites'])
maxSiteCounts = np.zeros(numberSiteTypes, dtype = int)
maxParticleCounts = np.zeros(numberSpecies + 1, dtype = int)
if frames > 1:
import multiprocessing as mp
num_cores = int(mp.cpu_count())
pool = mp.Pool(num_cores, initializer = self.init_worker, initargs = (file_name,))
process_frame_partial = partial(self.process_frame,
siteTypesLattice = siteTypesLattice,
numberSpecies = numberSpecies,
numberSiteTypes = numberSiteTypes)
if use_tqdm:
results = list(tqdm(pool.imap(process_frame_partial, range(frames)),
total = frames,
desc = "Processing frames",
unit = "frame"))
else:
results = pool.map(process_frame_partial, range(frames))
pool.close()
pool.join()
maxSiteCounts = np.max([r[1] for r in results], axis = 0)
maxParticleCounts = np.max([r[0] for r in results], axis = 0)
else:
traj_file = h5py.File(file_name, 'r+')
trajs = traj_file['Simulations']['0000001']['Lattice']
if use_tqdm:
frame_iterator = tqdm(range(frames),
desc = "Processing frames",
unit = "frame")
else:
frame_iterator = range(frames)
for i in frame_iterator:
frame_key = f"{i:010d}"
frameLattice = np.array(trajs[frame_key])
species_counts = np.bincount(frameLattice.ravel(), minlength = numberSpecies + 1)
maxParticleCounts = np.maximum(maxParticleCounts, species_counts)
for site_type in range(numberSiteTypes):
site_mask = (siteTypesLattice == site_type)
species_count = np.sum(site_mask)
maxSiteCounts[site_type] = max(maxSiteCounts[site_type], species_count)
if not use_tqdm and (i % max(1, frames // 20) == 0):
progress = (i + 1) / frames * 100
maxSiteCounts[0] = 0
maxParticleCounts[0] = 0
self.write_to_h5(file_name, maxParticleCounts, maxSiteCounts)
traj_file.close()
return
[docs]
def close_hdf_file(self):
global hdf_file
if hdf_file is not None:
hdf_file.close()
[docs]
def init_worker(self, file_name):
global hdf_file
# Open the trajectory file
hdf_file = h5py.File(file_name, 'r')
# Register a function to close the file when the worker exits
atexit.register(self.close_hdf_file)
[docs]
def process_frame(self, frame_index, siteTypesLattice, numberSpecies, numberSiteTypes):
global hdf_file
trajs = hdf_file['Simulations']['0000001']['Lattice']
frame_key = f"{frame_index:010d}"
frameLattice = np.array(trajs[frame_key])
# Calculate counts for each species
species_counts = np.bincount(frameLattice.ravel(), minlength = numberSpecies + 1)
# Calculate counts for each site type
site_counts = np.zeros(numberSiteTypes, dtype = int)
for site_type in range(numberSiteTypes):
site_mask = (siteTypesLattice == site_type)
site_counts[site_type] = np.sum(site_mask)
return species_counts, site_counts
[docs]
def write_to_h5(self, file_name, maxParticleCounts, maxSiteCounts):
with h5py.File(file_name, 'r+') as hdf_file:
traj_h5file = hdf_file['Simulations']['0000001']
try:
# Delete existing datasets
if 'MaxParticleCounts' in traj_h5file:
del traj_h5file['MaxParticleCounts']
if 'MaxSiteCounts' in traj_h5file:
del traj_h5file['MaxSiteCounts']
traj_h5file.create_dataset('MaxSiteCounts',
data = np.array(maxSiteCounts, dtype = np.uint32))
traj_h5file.create_dataset('MaxParticleCounts',
data = np.array(maxParticleCounts, dtype = np.uint32))
except Exception as e:
print(f"Error writing to HDF5 file: {e}")
return False
return
[docs]
def reduce_trajectory_file(self, file_name, max_frames):
with h5py.File(file_name, 'r') as traj_data:
num_frames = len(traj_data['Simulations']['0000001']['Lattice'])
stride = int(num_frames / max_frames)
output_file_name = f"reduced_{stride}_{file_name}"
# Create the output file with userblock
with h5py.File(output_file_name, 'w', userblock_size = 512) as output_file:
# Copy all groups and datasets
for key in traj_data.keys():
traj_data.copy(key, output_file)
# Clear existing lattice data
lattice_group = output_file['Simulations']['0000001']['Lattice']
for dataset_name in list(lattice_group.keys()):
del lattice_group[dataset_name]
# Copy frames with stride
new_index = 0
for i in range(0, num_frames, stride):
new_index_string = str(new_index).zfill(10)
old_index_string = str(i).zfill(10)
output_file['Simulations']['0000001']['Lattice'].create_dataset(
new_index_string,
data = traj_data['Simulations']['0000001']['Lattice'][old_index_string]
)
new_index += 1
# Write LMH5 header
with open(output_file_name, 'r+b') as f:
f.write(b'LMH5')
return