#
# University of Illinois Open Source License
# Copyright 2016-2018 Luthey-Schulten Group,
# All rights reserved.
#
# Developed by: Luthey-Schulten Group
# University of Illinois at Urbana-Champaign
# http://www.scs.uiuc.edu/~schulten
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the Software), to deal with
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to
# do so, subject to the following conditions:
#
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimers.
#
# - Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimers in the documentation
# and/or other materials provided with the distribution.
#
# - Neither the names of the Luthey-Schulten Group, University of Illinois at
# Urbana-Champaign, nor the names of its contributors may be used to endorse or
# promote products derived from this Software without specific prior written
# permission.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS WITH THE SOFTWARE.
#
# Author(s): Tyler M. Earnest
#
"""SpatialModel mixins for interaction with LM"""
import h5py
import numpy as np
import itertools, os
from . import Lattice
from contextlib import contextmanager
[docs]
class LmWriteMixin:
[docs]
@contextmanager
def h5(self):
with h5py.File(self.filename, "r+") as h5:
yield h5
[docs]
def finalize(self):
import lm
"""Write LM data file"""
try:
os.remove(self.filename)
except FileNotFoundError:
pass
lm.SimulationFile.create(self.filename)
lmFile = lm.SimulationFile(self.filename)
self.Nrxn=len(self.reactionList)
self.Nsps=len(self.speciesList)
self.Nsts=len(self.regionList)
self._placeAllParticles()
rm = self._reactionModel()
dm = self._diffusionModel()
sm = self._spatialModel()
lmFile.setReactionModel(rm)
lmFile.setDiffusionModel(dm)
lmFile.setSpatialModel(sm)
lmFile.setDiffusionModelLattice(dm, self.lattice)
lmFile.setParameter("writeInterval", "{:f}".format(self.speciesWriteInterval))
lmFile.setParameter("latticeWriteInterval", "{:f}".format(self.latticeWriteInterval))
lmFile.setParameter("perfPrintInterval", str(self.perfPrintInterval))
if self.hookInterval is not None:
lmFile.setParameter("hookInterval", "{:d}".format(self.hookInterval))
else:
lmFile.setParameter("hookInterval", "")
lmFile.setParameter("maxTime", "{:.10e}".format(self.simulationTime))
lmFile.setParameter("timestep", "{:.10e}".format(self.timestep))
lmFile.setParameter("bytesPerParticle", "{:d}".format(self.bytesPerParticle))
lmFile.setParameter("siteTypeNames", ",".join(s.name for s in sorted(self.regionList,key=lambda x:x.idx)))
if(type(self).__name__ == "Sim"):
self.noDiffRegionList = [i for i in range(len(self.regionList))]
# print( "nodiffregion ini:", self.noDiffRegionList)
# print(self._diffParamMatrix)
# element is diff id
# self.diffRateList[rateIdx].value
# diffParamMatrix[rFrom, rTo, sp] = rateIdx
for i in range(len(self._diffParamMatrix)):
for j in range(len(self._diffParamMatrix[i])):
for k in range(len(self._diffParamMatrix[i][j])):
# print(f"i: {i}, j: {j}, k: {k}, self._diffParamMatrix[i][j][k]: {self._diffParamMatrix[i][j][k]}")
if self.diffRateList[self._diffParamMatrix[i][j][k]].value != 0:
if i in self.noDiffRegionList:
self.noDiffRegionList.remove(i)
if j in self.noDiffRegionList:
self.noDiffRegionList.remove(j)
# self.noDiffRegionList.remove(i)
# self.noDiffRegionList.remove(j)
break
print( "nodiffregion in finalize:", self.noDiffRegionList)
lmFile.setParameter("noDiffRegionList", ",".join(str(x) for x in self.noDiffRegionList))
# lmFile.setParameter("noDiffRegionList", str(self.noDiffRegionList))
# with h5py.File(self.filename, 'r+') as simFile:
# Parameters = simFile.require_group('Parameters')
# Parameters.attrs['noDiffRegionList'] = np.array(self.noDiffRegionList)
# simFile.flush()
lmFile.close()
simFile = h5py.File(self.filename,'r+')
dt = h5py.special_dtype(vlen=str)
specNamesListAscii = [s.name.encode("ascii", "ignore") for s in sorted(self.speciesList,key=lambda x:x.idx)]
simFile.create_dataset('Parameters/SpeciesNames', (len(specNamesListAscii),1), dt, specNamesListAscii)
simFile.flush()
simFile.close()
self._storeParamData()
[docs]
def copyParticleLattice(self, fname, diffusion_model=False, replicate=1, frame=-1):
"""Copy particle lattice from existing simulation
Args:
fname (str):
LM simulation H5 file
Keyword Args:
diffusion_model (bool):
Use the initial condition particle lattice
replicate (int):
LM simulation replicate
frame (int):
LM simulation particle lattice frame. Negative values
index from end.
"""
with h5py.File(fname, "r") as h5:
if diffusion_model:
plattice = h5['/Model/Diffusion/Lattice'][...]
else:
ts = h5['/Simulations/{:07d}/LatticeTimes'.format(replicate)][...]
frame = (ts.size + frame)%ts.size
plattice = h5['/Simulations/{:07d}/Lattice/{:010d}'.format(replicate, frame)][...]
par=h5['Parameters']
if 'SpeciesNames' in par:
namesDS = par['SpeciesNames'][:]
names = namesDS.tolist()
inSps = [spec[0] for spec in names]
else:
# compatibility with JLM versions < 2.5
inSps = par.attrs['speciesNames'].decode().split(',')
spsTranslationMap = {inSps.index(sp.name)+1: sp.idx for sp in self.speciesList}
txMap = np.zeros(16384, dtype=np.uint32)
for iOld,iNew in spsTranslationMap.items():
txMap[iOld] = iNew
Lattice.hdf2lmRep(self.particleLattice, plattice, txMap)
self._particlesPlaced = True
spCts = np.zeros(len(self.speciesList), dtype=int)
pCount, sCount = Lattice.latticeStatsAll(self.particleLattice, self.siteLattice)
pCount = np.sum(pCount, axis=0)
spCts[...] = pCount[1:len(self.speciesList)+1]
self._spCts = spCts
self._particlePlacement = []
self._particleDistCount = []
self._particleDistConc = []
def _placeAllParticles(self):
if self._particlesPlaced:
return
self.particleLattice[...] = 0
particleDensity = np.zeros((len(self.regionList), len(self.speciesList)+1), dtype=np.double)
siteCounts = Lattice.countSites(self.siteLattice)
spCts = np.zeros(len(self.speciesList), dtype=int)
for regid, spid, count in self._particleDistCount:
spCts[spid-1] += count
v = 1.0*count/siteCounts[regid]/self.pps
particleDensity[regid,spid] += v
for regid, spid, conc in self._particleDistConc:
count = int(self.siteNAV*conc*siteCounts[regid])
spCts[spid-1] += count
v = 1.0*count/siteCounts[regid]/self.pps
particleDensity[regid,spid] += v
# particleDensity is the probability that a particle will be placed
# we need to compute the probability to place empty particles
particleDensity[:,0] = 1 - np.sum(particleDensity[:,1:], axis=1)
if np.any(particleDensity < 0):
raise RuntimeError("Lattice capacity exceeded")
Lattice.populateLattice(self.particleLattice, self.siteLattice, particleDensity)
for x,y,z,sp in self._particlePlacement:
spCts[sp-1] += 1
if Lattice.appendParticle(self.particleLattice, x, y, z, sp) == 0:
raise RuntimeError("Failed to add {} at {}: site full.".format(self.speciesList[sp].name, (x,y,z)))
self._spCts = spCts
def _buildReactionMatricies(self):
Nrxn, Nsps = self.Nrxn, self.Nsps
# Build reaction matricies
rxtypes = np.zeros(Nrxn, dtype=int)
rxconst = np.zeros(Nrxn, dtype=np.double)
stoich = np.zeros((Nrxn,Nsps), dtype=int)
depmat = np.zeros((Nrxn,Nsps), dtype=int)
rxnParamMatrix=np.zeros(Nrxn, dtype=int)
for rx in self.reactionList:
rxconst[rx.idx]=rx.rate._toLM()
rxnParamMatrix[rx.idx]=rx.rate.idx
if rx.order == 2: # Second Order Reaction
if rx.reactants[0] != rx.reactants[1]:
rxtypes[rx.idx]=2
for sr in rx.reactants:
depmat[rx.idx,sr.idx-1]=1
stoich[rx.idx,sr.idx-1] -= 1
else: # Second Order Self Reaction
rxtypes[rx.idx]=3
depmat[rx.idx,rx.reactants[0].idx-1]=1
stoich[rx.idx,rx.reactants[0].idx-1] -= 2
elif rx.order == 0: # zeroth order
rxtypes[rx.idx] = 0
elif rx.order == 1: # First order
rxtypes[rx.idx]=1
depmat[rx.idx, rx.reactants[0].idx-1]=1
stoich[rx.idx, rx.reactants[0].idx-1] -= 1
else:
raise NotImplementedError("{} order reations".format(rx.order))
for sp in rx.products:
stoich[rx.idx, sp.idx-1] += 1
return rxtypes, rxconst, stoich, depmat, rxnParamMatrix
def _reactionModel(self):
import lm
rm=lm.ReactionModel()
Nrxn, Nsps = self.Nrxn, self.Nsps
rm.set_number_reactions(Nrxn)
rm.set_number_species(Nsps)
for s in self._spCts:
rm.add_initial_species_count(int(s))
rxtypes, rxconst, stoich, depmat, rxnParamMatrix = self._buildReactionMatricies()
# Populate reaction object data
for r in range(Nrxn):
rm.add_reaction()
rm.mutable_reaction(r).set_type(int(rxtypes[r]))
rm.mutable_reaction(r).add_rate_constant(rxconst[r])
for s in range(Nsps):
for r in range(Nrxn):
rm.add_stoichiometric_matrix(int(stoich[r,s]))
for s in range(Nsps):
for r in range(Nrxn):
rm.add_dependency_matrix(int(depmat[r,s]))
self._rxnParamMatrix=rxnParamMatrix
return rm
def _buildReactionLocationMatrix(self):
RL=np.zeros((self.Nsts, self.Nrxn), dtype=int)
for rxid, regid in self._reactionLocations:
if regid is None:
RL[:,rxid]=1
else:
RL[regid,rxid]=1
return RL
def _buildDiffusionMatrix(self):
D=np.zeros((self.Nsts,self.Nsts,self.Nsps), dtype=np.double)
diffParamMatrix=np.zeros((self.Nsts,self.Nsts,self.Nsps), dtype=int)
D[:,:,:] = -1 # flag not visited
for sp, rFrom, rTo, rateIdx in self._transitionRates:
if sp is None:
sp = slice(None)
else:
sp = sp - 1
if rFrom is None:
rFrom = slice(None)
if rTo is None:
rTo = slice(None)
D[rFrom, rTo,sp] = self.diffRateList[rateIdx].value
diffParamMatrix[rFrom, rTo, sp] = rateIdx
if np.any(D<0):
raise RuntimeError("Diffusion table not complete, ensure that all "
"combinations of (species, fromRegion, toRegion) are defined")
self._diffParamMatrix = diffParamMatrix
return D
def _diffusionModel(self):
import lm
dm=lm.DiffusionModel()
Nsps, Nsts, Nrxn = self.Nsps, self.Nsts, self.Nrxn
dm.set_number_reactions(Nrxn)
dm.set_number_species(Nsps)
dm.set_number_site_types(Nsts)
dm.set_lattice_x_size(self.shape[0])
dm.set_lattice_y_size(self.shape[1])
dm.set_lattice_z_size(self.shape[2])
dm.set_lattice_spacing(self.latticeSpacing)
dm.set_particles_per_site(self.pps)
dm.set_bytes_per_particle(self.bytesPerParticle)
D = self._buildDiffusionMatrix()
RL = self._buildReactionLocationMatrix()
for via in range(Nsts):
for to in range(Nsts):
for p in range(Nsps):
dm.add_diffusion_matrix(D[via,to,p])
for r in range(Nrxn):
for s in range(Nsts):
dm.add_reaction_location_matrix(int(RL[s,r]))
return dm
def _spatialModel(self):
import lm
return lm.SpatialModel()
def _storeParamData(self):
# need to store mapping between parameter objects and ids
with h5py.File(self.filename, 'r+') as h5:
diffConstNames = ",".join(x.name for x in self.diffRateList)
h5['Parameters'].attrs['diffConstNames'] = np.string_(diffConstNames)
h5.create_dataset("Parameters/DiffusionParameterMatrix", data=self._diffParamMatrix)
diffParams = np.zeros(len(self.diffRateList), dtype=np.double)
for k in self.diffRateList:
diffParams[k.idx] = k.value
h5.create_dataset("Parameters/DiffusionParameterValues", data=diffParams)
rxnConstNames = ",".join(x.name for x in self.rxnRateList)
h5['Parameters'].attrs['reactionConstNames'] = np.string_(rxnConstNames)
h5.create_dataset("Parameters/ReactionParameterMatrix", data=self._rxnParamMatrix)
rateParams = np.zeros(len(self.rxnRateList), dtype=np.double)
for k in self.rxnRateList:
rateParams[k.idx] = k.value
h5.create_dataset("Parameters/ReactionParameterValues", data=rateParams)
rxnConstOrder = np.array([r.order for r in self.rxnRateList],dtype=int)
h5.create_dataset("Parameters/ReactionParameterOrder", data=rxnConstOrder)
h5['Model'].attrs['name'] = np.string_(self.name)
[docs]
class LmReadMixin:
def _loadFromH5(self, replicate):
self._initParameters()
self._initSpecies()
self._initRegions()
self._initDiffParams()
self._initRateParams()
self._readDiffusionTable()
if self.Nrxn > 0:
self._readReactions()
self._readReplicates()
self._initLattice()
if len(self.replicates) > 0:
self.setReplicate(replicate)
def _readReplicates(self):
self.replicates = [int(x) for x in self.h5['Simulations']]
[docs]
def setReplicate(self, replicate=1):
"""Select replicate from HDF5 file
Args:
replicate (int):
Replicate index (1-based)
"""
self.replicate = replicate
replicate = "{:07d}".format(replicate)
self.trajData = self.h5['Simulations'][replicate]
self._loadReplicate()
return self
def _initLattice(self):
self.siteLattice[...] = self.h5['/Model/Diffusion/LatticeSites'][...].transpose(2,1,0)
inputPl= np.array(self.h5['/Model/Diffusion/Lattice'], dtype=self.particleLattice.dtype)
Lattice.hdf2lmRep(self.particleLattice, inputPl)
def _loadReplicate(self):
self.speciesCountTimes = self.trajData['SpeciesCountTimes'][...]
cs = self.trajData['SpeciesCounts'][...]
self.speciesCounts = dict()
for sp in self.speciesList:
self.speciesCounts[sp] = cs[:,sp.idx-1]
self.latticeTimes = self.trajData['LatticeTimes'][...]
last = self.latticeTimes.size-1
inputPl= np.array(self.trajData['Lattice']['{:010d}'.format(last)], dtype=self.particleLattice.dtype)
Lattice.hdf2lmRep(self.particleLattice, inputPl)
def _initSpecies(self):
par = self.h5['Parameters']
if 'SpeciesNames' in par:
namesDS = par['SpeciesNames'][:]
names = namesDS.tolist()
speciesNames = [spec[0] for spec in names]
else:
# Fall back to old version, compatible with JLM versions < 2.5
speciesNames = par.attrs['speciesNames'].decode().split(',')
for s in speciesNames:
if isinstance(s, bytes):
s = s.decode()
self.speciesList.get(s)
self.speciesList.freeze()
assert self.Nsps == len(speciesNames)
def _initRegions(self):
try:
regionNames = self.h5['Parameters'].attrs['siteTypeNames'].decode().split(',')
except KeyError:
regionNames = ["site{:03d}".format(f) for f in range(int(self.h5['Model/Diffusion'].attrs['numberSiteTypes']))]
for r in regionNames:
self.regionList.get(r)
self.regionList.freeze()
assert self.Nsts == len(regionNames)
def _initParameters(self):
self.timestep = float(self.h5['Parameters'].attrs['timestep'])
self.latticeWriteInterval = float(self.h5['Parameters'].attrs['latticeWriteInterval'])
self.speciesWriteInterval = float(self.h5['Parameters'].attrs['writeInterval'])
p = self.h5['Parameters'].attrs.get('hookInterval', b"")
if p == b'':
self.hookInterval = 0.0
else:
self.hookInterval = float(p)
self.simulationTime = float(self.h5['Parameters'].attrs['maxTime'])
self.bytesPerParticle = int(self.h5['Model/Diffusion'].attrs['bytes_per_particle'])
self.pps = int(self.h5['Model/Diffusion'].attrs['particlesPerSite'])
self.Nsps = int(self.h5['Model/Diffusion'].attrs['numberSpecies'])
self.Nrxn = int(self.h5['Model/Diffusion'].attrs['numberReactions'])
self.Nsts = int(self.h5['Model/Diffusion'].attrs['numberSiteTypes'])
def _initDiffParams(self):
diffRateNames = self.h5['Parameters'].attrs['diffConstNames'].decode().split(',')
diffParams = self.h5["Parameters/DiffusionParameterValues"][...]
for i in range(len(diffRateNames)):
self.diffusionConst(diffRateNames[i], diffParams[i])
self.NdiffParams = len(self.diffRateList)
def _readDiffusionTable(self):
D = self.h5['Model/Diffusion/DiffusionMatrix'][...]
assert D.shape == (self.Nsts, self.Nsts, self.Nsps)
diffParamMap = self.h5["Parameters/DiffusionParameterMatrix"][...]
for rFrom, rTo, sp in itertools.product(self.regionList, self.regionList, self.speciesList):
v = D[rFrom.idx, rTo.idx, sp.idx-1]
rate = self.diffRateList[diffParamMap[rFrom.idx,rTo.idx, sp.idx-1]]
assert v==rate.value
self.transitionRate(sp, rFrom, rTo, rate)
def _initRateParams(self):
if self.Nrxn > 0 :
rxnOrder = self.h5["Parameters/ReactionParameterOrder"][...]
rxnRateNames = self.h5['Parameters'].attrs['reactionConstNames'].decode().split(',')
rxnParams = self.h5['Parameters/ReactionParameterValues'][...]
for i in range(len(rxnRateNames)):
self.rateConst(rxnRateNames[i], rxnParams[i], rxnOrder[i])
self.NrxnParams = len(self.rxnRateList)
else:
self.NrxnParams = 0
def _readReactions(self):
rxnParamMatrix = self.h5['Parameters/ReactionParameterMatrix'][...]
S = self.h5['Model/Reaction/StoichiometricMatrix'][...]
rMat = self.h5['Model/Reaction/DependencyMatrix'][...]
pMat = S+rMat
RL = self.h5['Model/Diffusion/ReactionLocationMatrix'][...]
rates = self.h5['Model/Reaction/ReactionRateConstants'][...]
Nsps, Nrxn = S.shape
_, Nsts = RL.shape
assert Nsps == self.Nsps
assert Nrxn == self.Nrxn
assert Nsts == self.Nsts
for xid in range(Nrxn):
ps, rs, regions = [], [], []
for sid in range(Nsps):
ct = rMat[sid, xid]
if ct > 0:
while ct != 0:
rs.append( self.speciesList[sid+1] )
ct -= 1
ct = pMat[sid, xid]
if ct > 0:
while ct != 0:
ps.append( self.speciesList[sid+1] )
ct -= 1
for rid in range(Nsts):
if RL[xid, rid] == 1:
regions.append( self.regionList[rid] )
v = rates[xid, 0]
rate = self.rxnRateList[rxnParamMatrix[xid]]
assert rate._toLM() == v
rxn = self.reaction(rs, ps, rate)
for r in regions:
self.assignReaction(rxn, r)