Source code for jLM.Analysis

# 
# University of Illinois Open Source License
# Copyright 2016-2018 Luthey-Schulten Group,
# All rights reserved.
# 
# Developed by: Luthey-Schulten Group
#                           University of Illinois at Urbana-Champaign
#                           http://www.scs.uiuc.edu/~schulten
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the Software), to deal with 
# the Software without restriction, including without limitation the rights to 
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 
# of the Software, and to permit persons to whom the Software is furnished to 
# do so, subject to the following conditions:
# 
# - Redistributions of source code must retain the above copyright notice, 
# this list of conditions and the following disclaimers.
# 
# - Redistributions in binary form must reproduce the above copyright notice, 
# this list of conditions and the following disclaimers in the documentation 
# and/or other materials provided with the distribution.
# 
# - Neither the names of the Luthey-Schulten Group, University of Illinois at
# Urbana-Champaign, nor the names of its contributors may be used to endorse or
# promote products derived from this Software without specific prior written
# permission.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL 
# THE CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
# OTHER DEALINGS WITH THE SOFTWARE.
# 
# Author(s): Tyler M. Earnest
# 

"""Simulation analysis mixins for SpatialModel"""
import os.path
import numpy as np
import warnings
from h5py import File as H5File
from . import Lattice
from . import Types

[docs] class LatticeAnalysisMixin:
[docs] def particleStatistics(self, particleLattice=None, siteLattice=None): """Compute properties of the particle lattice Dictionary keys countBySpeciesRegion Particle counts for each species type for each region. concBySpeciesRegion Concentration for each species type for each region. countByRegion Total particle counts for each region. concByRegion Total concentration for each region. countBySpecies Total particle counts for each species. concBySpecies Total concentration of each species, averaged over simulation volume count Total particle count conc Total concentration, averaged over simulation volume vol Total simulation volume siteCount Total number of subvolumes regionVol Volume of each region regionCounts Number of subvolumes in each region Returns: dict: Particle lattice statistics """ particleLattice = self.particleLattice if particleLattice is None else particleLattice siteLattice = self.siteLattice if siteLattice is None else siteLattice if particleLattice.ndim == 5: self._pCount, self._sCount = Lattice.latticeStatsAll(particleLattice, siteLattice) elif particleLattice.ndim == 4: self._pCount, self._sCount = Lattice.latticeStatsAll_h5fmt(particleLattice, siteLattice.T) else: raise ValueError("Wrong number of dimensions for particle lattice") regionCounts = {rt: self._sCount[rt.idx] for rt in self.regionList} regionVol = {rt: self.siteV*self._sCount[rt.idx] for rt in self.regionList} concScl = {rt: 1/self.NA/max(self.siteV,self.siteV*self._sCount[rt.idx]) for rt in self.regionList} siteCount = sum(regionCounts.values()) vol = self.siteV*siteCount countBySpeciesRegion = { sp:{ rt: self._pCount[rt.idx,sp.idx] for rt in self.regionList } for sp in self.speciesList} countBySpecies = { sp: np.sum(self._pCount[:,sp.idx]) for sp in self.speciesList } countByRegion = { rt: np.sum(self._pCount[rt.idx,1:]) for rt in self.regionList } count = np.sum(self._pCount[:,1:]) conc = count/(vol*self.NA) concBySpecies = {sp: ct/(vol*self.NA) for sp,ct in countBySpecies.items()} concByRegion = {rt: ct*concScl[rt] for rt,ct in countByRegion.items()} concBySpeciesRegion = { sp:{ rt: countBySpeciesRegion[sp][rt]*concScl[rt] for rt in self.regionList } for sp in self.speciesList} return dict(countBySpeciesRegion=countBySpeciesRegion, countByRegion=countByRegion, countBySpecies=countBySpecies, count=count, siteCount=siteCount, vol=vol, regionVol=regionVol, regionCounts=regionCounts, concBySpeciesRegion=concBySpeciesRegion, concByRegion=concByRegion, concBySpecies=concBySpecies, conc=conc)
[docs] class TrajAnalysisMixin: def _speciesIdParse(self, regex, species, startIndex=0): """ Parse and convert species identifiers to a list of integer indices. This function handles different ways of specifying species: 1. Using a regex pattern to match species names 2. A single Species object 3. A single species name as a string 4. A list of Species objects 5. A list of species names as strings 6. A list of integer indices The function returns a list of integer indices representing the species, adjusted by the startIndex. Args: regex (str): A regular expression to match species names species (Species, str, list): Species specification startIndex (int): An offset to add to the returned indices Returns: list: A list of integer indices representing the specified species Raises: RuntimeError: If both or neither regex and species are provided, or if the species specification is unrecognized """ if (regex is None) == (species is None): raise RuntimeError("`regex` and `species` are mutually exclusive") elif regex is not None: return [x.idx-1 + startIndex for x in self.speciesList.matchRegex(regex)] else: if isinstance(species, Types.Species): return [species.idx - 1 + startIndex] elif isinstance(species, str): return [self.speciesList[species].idx - 1 + startIndex] elif hasattr(species, "__len__"): if isinstance(species[0], Types.Species): return [x.idx-1 + startIndex for x in species] elif isinstance(species[0], str): return [self.speciesList[s].idx - 1 + startIndex for s in species] elif isinstance(species[0], int): return species raise RuntimeError("Unrecognized species specification") def _regionIdParse(self, regex, region): if (regex is None) == (region is None): raise RuntimeError("`regex` xor `region` must be defined") elif regex is not None: return [x.idx for x in self.regionList.matchRegex(regex)] else: if isinstance(region, Types.Region): return [region.idx] elif isinstance(region, str): return [self.regionList[region].idx] elif hasattr(region, "__len__"): if isinstance(region[0], Types.Region): return [x.idx for x in region] elif isinstance(region[0], str): return [self.speciesList[s].idx for s in region] elif isinstance(region[0], int): return region raise RuntimeError("Unrecognized region specification") @staticmethod def _frameParse(ts, frameStart, frameEnd, timeStart, timeEnd): if (frameStart is not None and timeStart is not None): RuntimeError("Specify either frameStart or timeStart") if (frameEnd is not None and timeEnd is not None): RuntimeError("Specify either frameStart or timeStart") if timeStart is not None: for i,t in enumerate(ts): if t>timeStart: frameStart = i-1 break if timeEnd is not None: for i,t in reversed(list(enumerate(ts))): if t<timeEnd: frameEnd = i break if frameStart is None: frameStart = 0 if frameEnd is None: frameEnd = -1 if frameStart < 0: frameStart += len(ts) if frameEnd < 0: frameEnd += len(ts) return frameStart, frameEnd+1 def _replicateParse(self,replicate): if replicate == 'all': return list(self.replicates) if replicate is None: replicate = self.replicate if isinstance(replicate, int): return [ replicate ] return [int(r) for r in replicate]
[docs] def getNumberTrajectory(self,regex=None, species=None, replicate=None,frameStart=None,frameEnd=None,timeStart=None,timeEnd=None): """Calculate particle number trajectories The time course of the particle numbers in the simulation can be queried with this function. Species can be selected individually with the `species` key. If a list of species are given instead the sum of the particle numbers for those species types will be returned. A regular expression can also be used to select the species to return. Species types or species names in the form of a string are both acceptible to use in the `species` key. The `regex` and `species` options are mutually exclusive. Instead of returning the entire trajectory, a segment can be selected either through the frame numbers (`frameStart`, `frameEnd`) or through the simulation time (`timeStart`, `timeEnd`). Args: regex (str): Regular expression matching species names. Multiple matches will be summed. species ([:py:class:`~jLM.Types.Species`]): Species replicate (int): Replicate to return, default is current replicate. frameStart (int): Starting frame frameEnd (int): Ending frame timeStart (int): Starting time timeEnd (int): Ending Time Returns: (:py:class:`~numpy.ndarray(shape=(nt,), dtype=float)`, :py:class:`~numpy.ndarray(shape=(nt,), dtype=float)`): The evaluation times and particle counts """ spIdList = self._speciesIdParse(regex, species, startIndex=0) replicate = self._replicateParse(replicate) if len(replicate) > 1: raise ValueError("only one replicate may be specified") replicate = "{:07d}".format(replicate[0]) ts = self.h5['Simulations'][replicate]['SpeciesCountTimes'][...] frameStart, frameEnd = self._frameParse(ts, frameStart, frameEnd, timeStart, timeEnd) ts = ts[frameStart:frameEnd] traj = np.zeros(ts.shape) ys = self.h5['Simulations'][replicate]['SpeciesCounts'][frameStart:frameEnd,...] for i in spIdList: traj += ys[:,i] return ts, traj
[docs] def getNumberTrajectoryFromRegion(self, spRegex=None, species=None, regRegex=None, region=None, replicate=None, frameStart=None, frameEnd=None, frameDownScale= None, timeStart=None, timeEnd=None, timeDownScale=None): """Calculate particle number trajectories for specific regions The time course of the particle numbers in the simulation within specific regions can be queried with this function. Species can be selected individually with the species key. If a list of species are given instead the sum of the particle numbers for those species types will be returned. A regular expression can also be used to select the species to return. Species types or species names in the form of a string are both acceptible to use in the `species` key. The `spRegex` and `species` options are mutually exclusive. Instead of returning the entire trajectory, a segment can be selected either through the frame numbers (`frameStart`, `frameEnd`) or through the simulation time (`timeStart`, `timeEnd`).If both given, the frame numbers will be used. The regions to compute particle numbers over are selected similar to the species through the options `regRegex` and `region`. Args: spRegex (str): Regular expression matching species names. Multiple matches will be summed. species ([:py:class:`~jLM.Types.Species`]): Species regRegex (str): Regular expression matching region names. Multiple matches will be summed. region ([:py:class:`~jLM.Types.Region`]): Region replicate (int): Replicate to return, default is current replicate. frameStart (int): Starting frame frameEnd (int): Ending frame timeStart (int): Starting time timeEnd (int): Ending Time Returns: (:py:class:`~numpy.ndarray(shape=(nt,), dtype=float)`, :py:class:`~numpy.ndarray(shape=(nt,), dtype=float)`): The evaluation times and particle counts Note: Since this calculation requires reading the entire particle lattice trajectory, it can be slow. """ spIdList = np.array(self._speciesIdParse(spRegex, species, startIndex=1), dtype=np.int32) regIdList = np.array(self._regionIdParse(regRegex, region), dtype=np.int32) replicate = self._replicateParse(replicate) if len(replicate) > 1: raise ValueError("only one replicate may be specified") replicate = "{:07d}".format(replicate[0]) # if input is time, convert to frame if frameStart is None and timeStart is not None and timeEnd is not None: dt = float(self.h5['Parameters'].attrs['timestep']) frameStart = int(timeStart / dt) frameEnd = int(timeEnd / dt) # here we support downscale for time if timeDownScale is not None: frameDownScale = int(timeDownScale / dt) elif frameStart is None and timeStart is None: raise ValueError("Either frameStart, frameEnd or timeStart, timeEnd must be specified") # Here we need to choose between frame or time steps if frameStart is not None and frameEnd is not None: # Check if the frameStart and frameEnd are within the bounds of the lattice times lattice_times_length = len(self.h5['Simulations'][replicate]['LatticeTimes']) if frameStart < 0: raise ValueError(f"frameStart ({frameStart}) is less than 0, which exceeds the lower boundary of the lattice times.") if frameEnd > lattice_times_length: raise ValueError(f"frameEnd ({frameEnd}) exceeds the upper boundary ({lattice_times_length}) of the lattice times.") # not exceed the bounds of the lattice times ts = self.h5['Simulations'][replicate]['LatticeTimes'][frameStart:frameEnd] if frameDownScale is not None and frameDownScale > 1 and frameDownScale < lattice_times_length: ts = ts[::frameDownScale] else: raise ValueError("Either you must give frameStart and frameEnd or timeStart and timeEnd") # get the region types in the lattice, and species types in the lattice max_regions = int(self.h5['Model/Diffusion'].attrs['numberSiteTypes']) max_species = int(self.h5['Model/Diffusion'].attrs['numberSpecies']) actual_regions = len(regIdList) actual_species = len(spIdList) cacheKey = "SiteParticleCount", replicate spc = self._cachedResult(cacheKey) if spc is None: sCount, spc = self._postprocess_siteparticlecounts(rep=replicate, reg_num=actual_regions, spe_num=actual_species, max_regions=max_regions, max_species=max_species) self._cachedResult(cacheKey, spc) self._cachedResult(("SiteCount", replicate), sCount) # cartesian product over sites and particles rs = np.tile(regIdList, len(spIdList)) ss = np.repeat(spIdList, len(regIdList)) if len(rs) == 1: traj = spc[:,rs[0],ss[0]] else: traj = np.sum(spc[:,np.tile(regIdList, len(spIdList)), np.repeat(spIdList, len(regIdList))], axis=-1) return ts, traj
[docs] def getLatticeTrajectory(self, regex=None, species=None, integrated="", replicate=None, frameStart=None, frameEnd=None, timeStart=None, timeEnd=None): """Calculate the spatial distribution of particles versus time Species can be selected individually with the species key. If a list of species are given instead the sum of the particle numbers for those species types will be returned. A regular expression can also be used to select the species to return. Species types or species names in the form of a string are both acceptible to use in the `species` key. The `regex` and `species` options are mutually exclusive. Instead of returning the entire trajectory, a segment can be selected either through the frame numbers (`frameStart`, `frameEnd`) or through the simulation time (`timeStart`, `timeEnd`). If the full 3-D lattice is not needed, any of the directions can be integrated out. Args: regex (str): Regular expression matching species names. Multiple matches will be summed. species ([:py:class:`~jLM.Types.Species`]): Species integrated (str): Combination of 'x', 'y', 'z' specifying the directions to integrate out replicate (int): Replicate to return, default is current replicate. frameStart (int): Starting frame frameEnd (int): Ending frame timeStart (int): Starting time timeEnd (int): Ending Time Returns: (:py:class:`~numpy.ndarray(shape=(nt,), dtype=float)`, :py:class:`~numpy.ndarray(dtype=float)`): The evaluation times and particle counts Note: Since this calculation requires reading the entire particle lattice trajectory, it can be slow. """ spIdList = self._speciesIdParse(regex, species, startIndex=1) replicateList = self._replicateParse(replicate) if len(replicateList) > 1: raise ValueError("only one replicate may be specified") replicate = "{:07d}".format(replicateList[0]) ts = self.h5['Simulations'][replicate]['LatticeTimes'][...] frameStart, frameEnd = self._frameParse(ts, frameStart, frameEnd, timeStart, timeEnd) def hdfLattice(x): return self.h5['Simulations'][replicate]['Lattice']['{:010d}'.format(x)] hdfData = hdfLattice(0) latticeDims = hdfData.shape nFrames = frameEnd-frameStart shape = [nFrames] integratedDims = [] for i,d in enumerate('xyz'): if d not in integrated: shape.append(latticeDims[i]) else: integratedDims.append(i) shape = tuple(shape) integratedDims.append(3) # integrate over the occupancy integratedDims.reverse() # integrate higher number axes first to ensure that lower axes do not shift cacheKey = "getLatticeTrajectory", replicate, spIdList, [frameStart, frameEnd], integratedDims traj = self._cachedResult(cacheKey) if traj is None: traj=[] def latticeGen(f0,f1): hdfData = hdfLattice(0) buf = np.zeros(hdfData.shape, dtype=hdfData.dtype) for i in range(f0,f1): hdfLattice(i).read_direct(buf) yield buf for lattice in latticeGen(frameStart, frameEnd): ind = np.zeros(latticeDims) for spId in spIdList: ind += lattice==spId for axis in integratedDims: ind = np.sum(ind,axis=axis) traj.append(ind.T) traj = np.array(traj) self._cachedResult(cacheKey, traj) return ts[frameStart:frameEnd], traj
[docs] def getLatticeHistogram(self, regex=None, species=None, integrated="xyz", replicate=None, frameStart=None, frameEnd=None, timeStart=None,timeEnd=None): """Calculate the spatial distribution of particles over an interval Species can be selected individually with the species key. If a list of species are given instead the sum of the particle numbers for those species types will be returned. A regular expression can also be used to select the species to return. Species types or species names in the form of a string are both acceptible to use in the `species` key. The `regex` and `species` options are mutually exclusive. The interval of time in which the histogram is computed can be selected either through the frame numbers (`frameStart`, `frameEnd`) or through the simulation time (`timeStart`, `timeEnd`). If the full 3-D lattice is not needed, any of the directions can be integrated out. Args: regex (str): Regular expression matching species names. Multiple matches will be summed. species ([:py:class:`~jLM.Types.Species`]): Species integrated (str): Combination of 'x', 'y', 'z' specifying the directions to integrate out replicate (int): Replicate to return, default is current replicate. frameStart (int): Starting frame frameEnd (int): Ending frame timeStart (int): Starting time timeEnd (int): Ending Time Returns: :py:class:`~numpy.ndarray(dtype=float)`: The average particle counts Note: Since this calculation requires reading the entire particle lattice trajectory, it can be slow. """ spIdList = np.array(self._speciesIdParse(regex, species, startIndex=1), dtype=np.int32) assert len(spIdList) > 0 replicateList = self._replicateParse(replicate) def hdfLattice(r,f): return self.h5['Simulations/{:07d}/Lattice/{:010d}'.format(r,f)] def hdfLatticeTimes(r): return self.h5['Simulations/{:07d}/LatticeTimes'.format(r)] integratedDims = ["xyz".index(a) for a in integrated] hist = None for r in replicateList: hdfData = hdfLatticeTimes(r) ts = np.zeros(hdfData.shape, dtype=hdfData.dtype) hdfData.read_direct(ts) frameStart, frameEnd = self._frameParse(ts, frameStart, frameEnd, timeStart, timeEnd) cacheKey = "getLatticeHistogram", "{:07d}".format(r), spIdList, [frameStart, frameEnd], integratedDims cachedHist = self._cachedResult(cacheKey) if cachedHist is None: for f in range(frameStart, frameEnd): lattice = hdfLattice(r,f) if hist is None: latticeTmp = np.zeros(lattice.shape, dtype=lattice.dtype) lattice.read_direct(latticeTmp) hist = Lattice.latticeHistogram(latticeTmp, spIdList, axes=integratedDims, target=None) else: lattice.read_direct(latticeTmp) Lattice.latticeHistogram(latticeTmp, spIdList, axes=integratedDims, target=hist) self._cachedResult(cacheKey, hist) else: if hist is None: hist = cachedHist else: hist += cachedHist return hist.T/(len(replicateList)*(frameEnd-frameStart))
def _postprocess_siteparticlecounts(self, rep, reg_num = 16384, spe_num = 16384, max_regions=16384, max_species=16384): ''' This function is used to generate the site-particle counts for a given replicate. It is used to cache the results of the site-particle counts so that they can be used to generate the site-particle counts for a given replicate. return: sCount: The site counts pCounts: The particle counts ''' traj = self.h5 siteLattice = traj['/Model/Diffusion/LatticeSites'][...] npl = sum(len(traj['/Simulations/'+rep+'/Lattice']) for rep in traj['/Simulations']) warnings.warn("Generating Site/Particle counts for {} frames".format(npl)) particleLattice = None frames = sorted(traj['Simulations/'+rep+"/Lattice"]) nframes = len(frames) pCounts = np.zeros((nframes, reg_num, spe_num), dtype=np.int32) for i,frame in enumerate(frames): if particleLattice is not None: traj['Simulations/'+rep+"/Lattice/"+frame].read_direct(particleLattice) else: particleLattice = traj['Simulations/'+rep+"/Lattice/"+frame][...] pCount, sCount = Lattice.latticeStatsAll_h5fmt(particleLattice, siteLattice) pCounts[i,:,:] = pCount[:reg_num,:spe_num] return sCount, pCounts def _cachedResult(self, key, val=None): call, rep, *rest = key base, _ = os.path.splitext(self.filename) cacheFile = base+"-jlmcache.h5" argKey = ":".join( ",".join(str(x) for x in xs) for xs in rest) if argKey: path = "/{}/{}/{}".format(rep, call, argKey) else: path = "/{}/{}".format(rep, call) with H5File(cacheFile, "a", libver='latest') as h5: if val is not None: if path in h5: h5[path][...] = val else: h5.create_dataset(path, data=val, compression='gzip', compression_opts=9) if not hasattr(self, "_postprocCache"): self._postprocCache = dict() if rep not in self._postprocCache: self._postprocCache[rep] = dict() if call not in self._postprocCache[rep]: self._postprocCache[rep][call] = dict() self._postprocCache[rep][call][argKey] = val else: if path in h5: return h5[path][...]