Source code for pyLM.RDME

# 
# University of Illinois Open Source License
# Copyright 2008-2018 Luthey-Schulten Group,
# All rights reserved.
# 
# Developed by: Luthey-Schulten Group
#                           University of Illinois at Urbana-Champaign
#                           http://www.scs.uiuc.edu/~schulten
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the Software), to deal with 
# the Software without restriction, including without limitation the rights to 
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 
# of the Software, and to permit persons to whom the Software is furnished to 
# do so, subject to the following conditions:
# 
# - Redistributions of source code must retain the above copyright notice, 
# this list of conditions and the following disclaimers.
# 
# - Redistributions in binary form must reproduce the above copyright notice, 
# this list of conditions and the following disclaimers in the documentation 
# and/or other materials provided with the distribution.
# 
# - Neither the names of the Luthey-Schulten Group, University of Illinois at
# Urbana-Champaign, nor the names of its contributors may be used to endorse or
# promote products derived from this Software without specific prior written
# permission.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL 
# THE CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR 
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, 
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
# OTHER DEALINGS WITH THE SOFTWARE.
# 
# Author(s): Michael J. Hallock and Joseph R. Peterson
# 
#

# import lm
# import math
# import os
# import h5py

import lm, math, os, h5py

from  .LMLogger import *
from  .ipyInterface import *
try:
	from tqdm import tqdm
except:
	def tqdm(x,ascii=False):
		return x

## @class RDMERegion
[docs] class RDMERegion: """A class that represents a type region of an RDME simulation. Reactions may be specified that live within a region. In addition a specie's diffusion constant is region dependent and diffusion between regions must be specified. For example: cytosol, membrane, extracellular, nucleoid, etc. Args: name: The name of the region (e.g. cytosol) """ def __init__(self, name): self.name=name self.reactions=[] self.defaultDiffusionRate=0.0 self.diffusionRate={}
[docs] def addReaction(self, reactant, product, rate): """Adds a 0th, 1st or 2nd order reaction that can occur in the region Reaction rates are specified as *stochastic* rates: i.e. in units of :math:`1/\mathrm{s}`. Args: reactant: A set of reactants either as a singleton or a list product: A set of products either as a singeton or a list rate: The *stochastic* rate of the reaction. Returns: RDMERegion """ if rate <= 0.0: LMLogger.error("In RDME.addReaction, rate must be positive") self.reactions.append((reactant,product,rate)) return self
[docs] def setDefaultDiffusionRate(self, rate): """Specifies the default diffusion rate of all particles in the region Args: rate: The rate of diffusion in um^2/s or um/s for 3D or 2D diffusion Returns: RDMERegion """ self.defaultDiffusionRate=rate return self
[docs] def setDiffusionRate(self, species, rate): """Specify the diffusion rate for a particular particle type Args: species: The particle type rate: The rate of diffusion in um^2/s or um/s for 3D or 2D diffusion Returns: RDMERegion """ self.diffusionRate[species]=rate return self
[docs] def getReactionCount(self): """Return the number of reactions defined in this region Returns: Get the number of reactions in the region """ return len(self.reactions)
## Create a representation that is loadable in iPython # @return A string containing an HTML object that can be displayed in the Jupyter notebook def _repr_html_(self): # Write out major parameters s = "" s += "<h1> Region: %s </h1>"%self.name s += "Default Diffusion Rate: %f<br/>"%(self.defaultDiffusionRate) s += "<h2> Reactions </h2>" s += writeTable(["Reaction","Rate","Units"], [getReactionString(rxn[0], rxn[1], rxn[2]) for rxn in self.reactions]) s += "<h2> Diffusion </h2>" s += writeTable(["Specie","Diffusion Rate"], [(specie,rate) for specie, rate in self.diffusionRate.items()]) return s
[docs] class RDMESimulation: """A class that contains all regions, reactions, diffusions and rules for a RDME simulation Specify a cuboid region that represents the extents to the reaction region as well as the lattice spacing Args: dimensions: A list of [x,y,z] spacing: Lattice spacing name: The name of the RDME simulation; default: "unnamed" defaultRegion: The name of the region that is associated with the lattice sites before any other regions are added; default:"default" """ def __init__(self, dimensions, spacing, name="unnamed", defaultRegion="default"): # site type dictionary (necessary?) self.siteTypes={} # particleMap self.particleMap={} self.customAddedParticleList=[] # Particles that were added at specific locations self.species_id=[] self.regions={} self.transitionRates=[] self.initial_counts={} self.parameters={} self.continousDimensions=dimensions self.latticeSpacing=spacing self.bytesPerParticle=1 self.addRegion(defaultRegion) self.lm_builder=lm.LatticeBuilder(dimensions[0], dimensions[1], dimensions[2], spacing, 1, 0) extraCellular=lm.Cuboid(lm.point(0.0,0.0,0.0),lm.point(*self.continousDimensions),self.siteTypes[defaultRegion]) self.lm_builder.addRegion(extraCellular) extraCellular.thisown=0 # PREVENT GC sm=lm.SpatialModel() self.lm_builder.getSpatialModel(sm) lx,ly,lz = [int(round(cDim/self.latticeSpacing)) for cDim in self.continousDimensions] #self.lattice = lm.ByteLattice(lx, ly, lz, spacing, lm.getCompiledLatticeMaxOccupancy()) self.hasBeenDiscretized = False # Metadata for Jupyter self.name = name self.replicates = [] self.filename = ""
[docs] def addRegion(self, region): """Add a region to the simulation Args: region: The region to add to the simulation Returns: region just added """ if region not in self.siteTypes: typeid=len(self.siteTypes) self.siteTypes[region]=typeid self.regions[region]=RDMERegion(region) return self.regions[region]
[docs] def addCuboidRegion(self, name,a,b): """Add a cuboid to the builder Args: name: Name of the site type for this region a: tuple for the first corner in continous space b: tuple for the second corner in continous space """ if name not in self.siteTypes: raise Exception("Cannot add cuboid of an undefined region (%s)" % name) pa=lm.point(*a) pb=lm.point(*b) shape=lm.Cuboid(pa, pb, self.siteTypes[name]) self.lm_builder.addRegion(shape) shape.thisown=0
[docs] def addShape(self, shape): """Add a Shape to the builder Args: shape: The Shape to add to the builder """ self.lm_builder.addRegion(shape) shape.thisown=0
[docs] def modifyRegion(self, region): """Return a pointer to a region so that it may be modified Args: region: Get a region that is attached to the simulation for modification Returns: The region to modify """ return self.regions[region]
[docs] def defineSpecies(self, species): """Define a specie/s of particles that exist in the simulation Args: species: A list of species to add to the simulation Returns: The simulation object """ for s in species: if s not in self.species_id: self.species_id.append(s) self.particleMap[s]=len(self.species_id) self.initial_counts[s]=0 return self
[docs] def siteVolume(self): """Get the actual volume of a specific site in L Returns: The volume of the site in L """ return self.latticeSpacing * self.latticeSpacing * self.latticeSpacing * 1000
[docs] def buildCapsidCell(self, length, diameter, membraneThickness, points = False): """Build a capsule based shell in this RDMESimulation centered within the simulation domain that includes a membrane and cytoplasm Args: length: The length of the capsule from one sphere origin to the other diameter The diameter of the cell membraneThickness: The thickness of the membrane points: OPTIONAL: List of lists containing the coordinates of the centers of the sphereoids that cap the capsid cell, e.g. [[x1, y1, z1], [x2, y2, z2]]. If not given, cell is centered in the volume and aligned in the z-direction Returns: The simulation object """ self.addRegion('cytoplasm') self.addRegion('membrane') r = diameter / 2.0 if (points != False): #test if the points lie outside the box if (min(points[0][0], points[1][0]) - r < 0) or (min(points[0][1], points[1][1]) - r < 0) or (min(points[0][2], points[1][2]) - r < 0): LMLogger.info("cell is outside the lattice volume") return if (max(points[0][0], points[1][0]) + r > self.continuousDimensions[0]) or (max(points[0][1], points[1][1]) + r > self.continuousDimensions[1]) or (max(points[0][2], points[1][2])+ r > self.continuousDimensions[2]): LMLogger.info("cell is outside the lattice volume") return p1 = lm.point(*points[0]) p2 = lm.point(*points[1]) dx,dy,dz = [points[1][i] - points[0][i] for i in range(3)] l = math.sqrt(dx*dx + dy*dy + dz*dz) if (abs(length - l) / l > 1e-2): LMLogger.warning("Cell length given does not match length defined by points! Using points!") length = l LMLogger.info("Creating capsid with length=%g, diameter=%g, and thickness=%g", length, diameter, membraneThickness) membrane = lm.CapsuleShell(p1, p2, r - membraneThickness, r, self.siteTypes['membrane']) cytoplasm = lm.Capsule(p1, p2, r - membraneThickness, self.siteTypes['cytoplasm']) self.lm_builder.addRegion(membrane) self.lm_builder.addRegion(cytoplasm) membrane.thisown = 0 # PREVENT GC cytoplasm.thisown = 0 # PREVENT GC else: x=self.continousDimensions[0]/2.0 y=self.continousDimensions[1]/2.0 z=(self.continousDimensions[2]-length)/2.0 LMLogger.info("Creating capsid with length=%g, diameter=%g, and thickness=%g", length, diameter, membraneThickness) p1=lm.point(x,y,z+r) p2=lm.point(x,y,z+length-r) membrane =lm.CapsuleShell(p1, p2, r-membraneThickness, r, self.siteTypes['membrane']) cytoplasm=lm.Capsule (p1, p2, r-membraneThickness, self.siteTypes['cytoplasm']) self.lm_builder.addRegion(membrane) self.lm_builder.addRegion(cytoplasm) membrane.thisown=0 # PREVENT GC cytoplasm.thisown=0 # PREVENT GC return self
[docs] def buildSphericalCell(self, diameter, membraneThickness, point = False): """Build a spherical based shell in this RDMESimulation centered within the simulation domain that includes a membrane and cytoplasm Args: diameter: The diameter of the cell membraneThickness: The thickness of the membrane point: The center of the spherical cell Returns: The simulation object """ self.addRegion('cytoplasm') self.addRegion('membrane') r=diameter/2.0 if (point != False): #test if the point is inside the sim volume: if (point[0] - r < 0) or (point[0] + r > self.continuousDimensions[0]) or (point[1] - r < 0) or (point[1] + r > self.continuousDimensions[1]) or (point[2] - r < 0) or (point[2] + r > self.continuousDimensions[2]): LMLogger.info("cell is outside the lattice volume") return p1 = lm.point(*point) LMLogger.info("Creating spheroid with diameter=%g, and thickness=%g", diameter, membraneThickness) membrane = lm.Sphere(p1, r, self.siteTypes['membrane']) cytoplasm = lm.Sphere(p1, r - membraneThickness, self.siteTypes['cytoplasm']) self.lm_builder.addRegion(membrane) self.lm_builder.addRegion(cytoplasm) membrane.thisown = 0 # PREVENT GC cytoplasm.thisown = 0 # PREVENT GC else: x,y,z = [cdim/2.0 for cdim in self.continousDimensions] LMLogger.info("Creating spherid with diameter=%g, and thickness=%g", diameter, membraneThickness) p1=lm.point(x,y,z) membrane =lm.Sphere(p1, r, self.siteTypes['membrane']) cytoplasm=lm.Sphere(p1, r-membraneThickness, self.siteTypes['cytoplasm']) self.lm_builder.addRegion(membrane) self.lm_builder.addRegion(cytoplasm) membrane.thisown = 0 # PREVENT GC cytoplasm.thisown = 0 # PREVENT GC return self
[docs] def addParticles(self, species='unknown', region='default', count=1): """Add a specified number of particles of the specified type to the specified region Args: species: The species to add to the region region: The region to add particles to count: Number of particles to add (default: 1) Returns: The simulation object """ try: particleNum=self.particleMap[species] except KeyError: particleNum=1 siteTypeNum=self.siteTypes[region] self.initial_counts[species]+=count self.lm_builder.addParticles(particleNum-1, siteTypeNum, count) return self
[docs] def packRegion(self, region, radius, percentage, obstacleID): """Add nonmoving obstacles to a particular region Args: region: The name of the region in which to add particles to radius: The radius of the particles percentage: The percentage of the total region volume that should be packed obstacleID: an identifier for the obstacle Returns: The simulation object """ siteTypeNum=self.siteTypes[region] self.lm_builder.fillWithRandomSpheres(percentage/100.0, radius, 255-obstacleID, siteTypeNum) return self
[docs] def setTransitionRate(self, species, via, to, rate): """Specify the diffusion rate between species; this is a one directional rate e.g. membrane->cytosol or extracellular->membrane Args: species: The specie that can transition between regions via: From this region to: To this region rate: Diffusion rate between regions Returns: The simulation object """ self.transitionRates.append((species, via, to, rate)) return self
[docs] def setTwoWayTransitionRate(self, species, one, two, rate): """Specify the diffusion rate between species; this is a two directional rate Args: species: The specie that can transition between regions one: A region two: The other region rate: Diffusion rate between regions Returns: The simulation object """ self.setTransitionRate(species, one, two, rate) self.setTransitionRate(species, two, one, rate) return self
[docs] def buildDiffusionModel(self): """Return the Lattice Microbes DiffusionModel object for fine-tuning Returns: Simulation diffusion model """ dm=lm.DiffusionModel() numReactions=0 for r in self.regions: numReactions += self.regions[r].getReactionCount() LMLogger.info("number of reactions = %d", numReactions) dm.set_number_reactions(numReactions) numSpecies=len(self.particleMap) LMLogger.info("number of species = %d", numSpecies) dm.set_number_species(numSpecies) numSiteTypes=len(self.siteTypes) LMLogger.info("number of sitetypes = %d", numSiteTypes) dm.set_number_site_types(numSiteTypes) lx,ly,lz = [int(round(cDim / self.latticeSpacing)) for cDim in self.continousDimensions] particlesPerSite = lm.getCompiledLatticeMaxOccupancy(); LMLogger.info("Lattice is %d x %d x %d with %g nm spacing and %d particles per site", lx, ly, lz, self.latticeSpacing*1e9, particlesPerSite) dm.set_lattice_x_size(lx) dm.set_lattice_y_size(ly) dm.set_lattice_z_size(lz) dm.set_lattice_spacing(self.latticeSpacing) dm.set_particles_per_site(particlesPerSite) dm.set_bytes_per_particle(self.bytesPerParticle) D=[0] * numSpecies * numSiteTypes * numSiteTypes RL=[0] * numSiteTypes * numReactions rxnum=0 # This is about to get messy fast. Use a function to determine the # proper offset in the diffusion matrix given the site types and species dix=lambda _t1,_t2,_p: _t1*(numSiteTypes*numSpecies)+_t2*(numSpecies)+_p # Get unique reactions and the locations where they occur uniqueReactions = [] uniqueReactionsLoc = [] for region in self.regions: for rx in self.regions[region].reactions: rxnHash = (rx[0],rx[1],rx[2]) if rxnHash in uniqueReactions: idx = uniqueReactions.index(rxnHash) uniqueReactionsLoc[idx].append(region) else: uniqueReactions.append(rxnHash) uniqueReactionsLoc.append( [region] ) for k in self.regions: r=self.regions[k] LMLogger.debug("D for %s (%d)",k,self.siteTypes[k]) for s in range(numSpecies): LMLogger.debug("\tD[%d][%d][%d] ([%d]) = %g", self.siteTypes[k],self.siteTypes[k],s, dix(self.siteTypes[k],self.siteTypes[k],s), r.defaultDiffusionRate) #D[offset+s]=r.defaultDiffusionRate D[dix(self.siteTypes[k],self.siteTypes[k],s)]=r.defaultDiffusionRate for s in r.diffusionRate: LMLogger.debug("\tD[%d][%d][%d] ([%d]) = %g", self.siteTypes[k],self.siteTypes[k],self.particleMap[s]-1, dix(self.siteTypes[k],self.siteTypes[k],self.particleMap[s]-1), r.diffusionRate[s]) D[dix(self.siteTypes[k], self.siteTypes[k], (self.particleMap[s]-1))]=r.diffusionRate[s] for i, rx in enumerate(uniqueReactions): for region in uniqueReactionsLoc[i]: loc = self.siteTypes[region] LMLogger.debug("RL[%d][%d] is being set cell %d in %s",rxnum, loc, rxnum * numSiteTypes + loc, region) RL[rxnum * numSiteTypes + loc]=1 rxnum+=1 LMLogger.debug("Transitions:") for t in self.transitionRates: s=t[0] via=t[1] to=t[2] rate=t[3] LMLogger.debug("\tD[%d][%d][%d] ([%d]) = %g", self.siteTypes[via],self.siteTypes[to],self.particleMap[s]-1, dix(self.siteTypes[via],self.siteTypes[to],self.particleMap[s]-1), rate) D[dix(self.siteTypes[via],self.siteTypes[to],self.particleMap[s]-1)]=rate LMLogger.debug("D: %s", D) LMLogger.debug("RL: %s", RL) for d in range(len(D)): dm.add_diffusion_matrix(D[d]) for rx in range(len(RL)): dm.add_reaction_location_matrix(RL[rx]) return dm
[docs] def buildReactionModel(self): """Return the Lattice Microbes ReactionModel object for fine-tuning Returns: The reaction model for this simulation """ rm=lm.ReactionModel() numReactions=0 for r in self.regions: numReactions += self.regions[r].getReactionCount() LMLogger.info("number of reactions = %d", numReactions) rm.set_number_reactions(numReactions) numSpecies=len(self.species_id) LMLogger.info("number of species = %d", numSpecies) rm.set_number_species(numSpecies) for s in self.species_id: LMLogger.debug("\t set initial count of %s (id %d) to %d", s, self.particleMap[s], self.initial_counts[s]) rm.add_initial_species_count(self.initial_counts[s]) # Build reaction matricies rxtypes = [0] * numReactions; rxconst = [0] * numReactions; stoich = [0] * numSpecies * numReactions; depmat = [0] * numSpecies * numReactions; # Get unique reactions and the locations where they occur uniqueReactions = [] for region in self.regions: for rx in self.regions[region].reactions: rxnHash = (rx[0],rx[1],rx[2]) if rxnHash in uniqueReactions: idx = uniqueReactions.index(rxnHash) else: uniqueReactions.append(rxnHash) rnum=0 for rx in uniqueReactions: # Define Reaction in stiochiometry, dependency and rate matricies reactant=rx[0] product=rx[1] rate=rx[2] LMLogger.debug("\t rx %d is %s -> %s at rate %g", rnum, reactant, product, rate) rxconst[rnum]=rate if(isinstance(reactant, tuple)): # Second Order if reactant[0] != reactant[1]: rxtypes[rnum]=2 for sr in reactant: depmat[rnum + numReactions*(self.particleMap[sr]-1)]=1 stoich[rnum + numReactions*(self.particleMap[sr]-1)] -= 1 elif reactant[0] == reactant[1]: # Second Order Self Reaction rxtypes[rnum]=3 depmat[rnum + numReactions*(self.particleMap[reactant[0]]-1)]=1 stoich[rnum + numReactions*(self.particleMap[reactant[0]]-1)] -= 2 else: # Zeroth order if reactant == '': rxtypes[rnum] = 0 #stoich[rnum + numReactions*(self.particleMap[reactant]-1)] += 0 else: # First order rxtypes[rnum]=1 depmat[rnum + numReactions*(self.particleMap[reactant]-1)]=1 stoich[rnum + numReactions*(self.particleMap[reactant]-1)] -= 1 if(isinstance(product, tuple)): for sp in product: stoich[rnum + numReactions*(self.particleMap[sp]-1)] += 1 else: if product != '': stoich[rnum + numReactions*(self.particleMap[product]-1)] += 1 rnum += 1 LMLogger.debug("rxtypes: %s", rxtypes) LMLogger.debug("rxconst: %s", rxconst) # Populate reaction object data for r in range(numReactions): rm.add_reaction() rm.mutable_reaction(r).set_type(rxtypes[r]) rm.mutable_reaction(r).add_rate_constant(rxconst[r]) LMLogger.debug("stoich: %s", stoich) for x in range(len(stoich)): rm.add_stoichiometric_matrix(stoich[x]) LMLogger.debug("depmat: %s", depmat) for x in range(len(depmat)): rm.add_dependency_matrix(depmat[x]) return rm
[docs] def buildSpatialModel(self): """Return the Lattice Microbes SpatialModel object for fine-tuning Returns: The spatial model (i.e. obstacles, sites, etc.) for this simulation """ # Retreive the spatial model from the builder sm=lm.SpatialModel() self.lm_builder.getSpatialModel(sm) return sm
[docs] def setWriteInterval(self, time): """Set the time interval to write data at during simulation Args: time The length of time between data writes """ self.parameters['writeInterval']=str(time)
[docs] def setLatticeWriteInterval(self, time): """Set the time interval to write latticedata at during simulation Args: time: The length of time between lattice data writes """ self.parameters['latticeWriteInterval']=str(time)
[docs] def setSimulationTime(self, time): """Set the simulation time length Args: time: The length of simulation time """ self.parameters['maxTime']=str(time)
[docs] def setTimestep(self, time): """Set the simulation time step Args: time: The length of simulation timestep for RDME """ self.parameters['timestep']=str(time)
[docs] def setHookInterval(self, time): """Set the hook interval Args: time: The interval to call hookSimulation """ self.parameters['hookInterval']=str(time)
[docs] def setRandomSeed(self, seed): """Set a known random seed Args: seed: The seed value """ self.parameters['seed']=str(seed)
[docs] def setOverflowHandler(self, mode): self.parameters['rdme.mpd.overflowhandler']=str(mode)
# ## Check that each simulation parameter is reasonable, (e.g. the diffusion rates are good, the particle numbers aren't too high, the grid isn't too fine or coarse, etc.) # # @param self # def check(self): # pass # ########################### # Lattice Modification Code # # ###########################
[docs] def getLattice(self, force=False): """Get a discretized version of the simulation domain. Call this after building all spherical and capsule cells Args: force: Force a rediscretization of the lattice. This will throw away any additional changes made after the first call. Returns: A lattice object. This function should usually only be called once. """ if (not self.hasBeenDiscretized) or force: self.hasBeenDiscretized=True self.lattice=self.allocateLattice() self.lm_builder.discretizeTo(self.lattice, 0, 0.0) return self.lattice
[docs] def allocateLattice(self): lx,ly,lz = [int(round(cDim/self.latticeSpacing)) for cDim in self.continousDimensions] return lm.ByteLattice(lx, ly, lz, self.latticeSpacing, lm.getCompiledLatticeMaxOccupancy())
[docs] def setLatticeSite(self, index, siteType): """Set a particular lattice site type Args: index (x,y,z): A list of coordinates siteType: The type to set the lattice point to. This would be the name of a region that has previously been performed """ # Bulletproofing if not self.hasBeenDiscretized: LMLogger.error('Trying to set a lattice site before calling getLattice()') return if len(index) != 3: LMLogger.error('Must pass a list of form (x,y,z) that has 3 elements, given: %s',index) return siteNum = self.siteTypes[siteType] self.lattice.setSiteType(index[0], index[1], index[2], siteNum)
[docs] def getLatticeSite(self, index): """Get a particular lattice site Args: index (x,y,z): a list of coordinates Returns: The type of the lattice site (string) """ # Bulletproofing if not self.hasBeenDiscretized: LMLogger.error('Trying to set a lattice site before calling getLattice()') return if len(index) != 3: LMLogger.error('Must pass a list of form (x,y,z) that has 3 elements, given: %s',index) return siteType = self.lattice.getSiteType(index[0], index[1], index[2]) for i in self.siteTypes: if siteType == i[1]: return i[0] LMLogger.warning('Unknown site type!') return "unknownSiteType"
[docs] def addParticleAt(self, index, particleType): """Add a particle/ to a particular site Args: index (x,y,z): a list of spatial location specie: The specie type to add """ # Bulletproofing if not self.hasBeenDiscretized: LMLogger.error('Trying to set a lattice site before calling getLattice()') return if len(index) != 3: LMLogger.error('Must pass a list of form (x,y,z) that has 3 elements, given: %s',index) return # Check bounds and convert to indices ix,iy,iz = [int(round(i/self.latticeSpacing)) for i in index] if ix < 0 or iy < 0 or iz < 0: LMLogger.error('All indices must be greater than 0') return lx,ly,lz = [int(round(cDim/self.latticeSpacing)) for cDim in self.continousDimensions] if ix > lx or iy > ly or iz > lz: LMLogger.error('All indices must be less than domain extents') return particleNum=self.particleMap[particleType] self.initial_counts[particleType]+=1 # TODO: add try/catch self.lattice.addParticle(ix, iy, iz, particleNum) self.customAddedParticleList.append((particleType,1)) # This list is consumed in the "save" function in the "Custom" section
## Create an HDF5 version of the simulation amenable for later running or stand-alone running # @param self # @param filename A file to write the simulation to
[docs] def addParticleAtIdx(self, index, particleType): """Add a particle/ to a particular site Args: index (x,y,z): a list of lattice site indices specie: The specie type to add """ # Bulletproofing if not self.hasBeenDiscretized: LMLogger.error('Trying to set a lattice site before calling getLattice()') return if len(index) != 3: LMLogger.error('Must pass a list of form (x,y,z) that has 3 elements, given: %s',index) return # Check bounds if index[0] < 0 or index[1] < 0 or index[2] < 0: LMLogger.error('All indices must be greater than 0') return lx,ly,lz = [int(round(cDim/self.latticeSpacing)) for cDim in self.continousDimensions] if index[0] > lx or index[1] > ly or index[2] > lz: LMLogger.error('All indices must be less than domain extents') return particleNum=self.particleMap[particleType] self.initial_counts[particleType]+=1 # TODO: add try/catch self.lattice.addParticle(index[0], index[1], index[2], particleNum) self.customAddedParticleList.append((particleType,1)) # This list is consumed in the "save" function in the "Custom" section
[docs] def save(self, filename): """Create an HDF5 version of the simulation amenable for later running or stand-alone running Args: filename: A file to write the simulation to """ lm.SimulationFile.create(filename) f=lm.SimulationFile(filename) rm=self.buildReactionModel() f.setReactionModel(rm) sm=self.buildSpatialModel() f.setSpatialModel(sm) dm=self.buildDiffusionModel() f.setDiffusionModel(dm) if not self.hasBeenDiscretized: llattice=self.allocateLattice() self.lm_builder.discretizeTo(llattice, 0, 0.0) f.setDiffusionModelLattice(dm, llattice) self.hasBeenDiscretized = True else: f.setDiffusionModelLattice(dm, self.lattice) for key in self.parameters: LMLogger.debug("Setting parameter %s = %s",key, self.parameters[key]) f.setParameter(key, self.parameters[key]) LMLogger.debug("speciesNames: %s", ",".join(self.species_id)) # f.setParameter("speciesNames", ",".join(self.species_id)) typenames=",".join(sorted(self.siteTypes, key=self.siteTypes.get)) LMLogger.debug("siteTypeNames: %s", typenames) f.setParameter("siteTypeNames", typenames) f.close() import h5py simFile = h5py.File(filename,'r+') dt = h5py.special_dtype(vlen=str) specNamesListAscii = [n.encode("ascii", "ignore") for n in self.species_id] simFile.create_dataset('Parameters/SpeciesNames', (len(specNamesListAscii),1), dt, specNamesListAscii) simFile.flush() simFile.close() # Custom # This section performs any cleanup for custom setup the user has specified # i.e. adding particles at specific locations # Update custom added particle counts if len(self.customAddedParticleList) > 0: try: import h5py lmFile = h5py.File(filename, 'r+') for i in self.customAddedParticleList: particleNumber = self.particleMap[i[0]] - 1 lmFile['Model']['Reaction']['InitialSpeciesCounts'][particleNumber] += i[1] lmFile.close() # Clean up after ourselves except ImportError: LMLogger.error("h5py is required for addParticleAt, but is missing.")
[docs] def run(self, filename, method, replicates=1, seed=None, cudaDevices=None, checkpointInterval=0): """Run the simulation using the specified solver (e.g. NextSubvolume, MultiParticleDiffusion, etc.) for the specified amount of time Args: filename: The HDF file to write to method: The class name for the solver to use (e.g., lm::rdme::MpdRdmeSolver") replicates: The number of replicates to serially run seed: A seed for the random number generator to use when running the simulation; None indicates default """ if cudaDevices is None: cudaDevices = [0] if seed is not None: f = lm.SimulationFile(filename) f.setParameter("seed",str(seed)) f.close() self.filename = filename for r in tqdm(range(1, replicates+1),ascii=True): lm.runSimulation(filename, r, method, cudaDevices, checkpointInterval) # update internal state with replicates that have been run self.replicates.append(r) # Update the filename self.filename = filename
[docs] def runMPI(self, filename, method, replicates=1, driver="mpirun", ppe=1, seed=None): """Run the simulation using a call to mpirun with the given options Args: filename: The HDF file to write to method: The class name for the solver to use (e.g., lm::cme::GillespieDSolver") replicates: The number of replicates to serially run driver: The program to execute the parallel run, e.g. "mpirun", "aprun", "ibrun", etc. ppe: The number of processing elements to use (e.g. number of nodes) seed: A seed for the random number generator to use when running the simulation; None indicates default """ if seed is not None: f = lm.SimulationFile(filename) f.setParameter("seed",str(seed)) f.close() # Write out system command repStr = "1" if replicates != 1: repStr = "1-%d"%replicates cmdStr = '%s -np %d lm -sp -r %s -sl "%s" -f %s'%(driver, ppe, repStr, method, filename) LMLogger.debug("Running MPI LM job with command:\n\t%s"%(cmdStr)) # Actually run command and check return code status = os.system(cmdStr) # Check that LM executed correctly if status != 0: LMLogger.error("Failed running MPI (status: %d) job running command:\n\t%s\n"%(status, cmdStr)) # Update the internal state for replicates that have been run for r in range(1,replicates): self.replicates.append(r) # Update the filename self.filename = filename
[docs] def runSolver(self, filename, solver, replicates=1, cudaDevices=None, checkpointInterval=0): """Run a simulation with a given solver Args: filename: The HDF file to write to solver: An MESolver object replicates: The number of replicates to serially run """ if cudaDevices is None: cudaDevices = [0] LMLogger.debug("Running Custom RDMESolver: %s"%(solver)) for r in tqdm(range(1, replicates+1),ascii=True): LMLogger.debug(" Running replicate: %d"%(r)) lm.runSolver(filename, r, solver, cudaDevices, checkpointInterval) # update internal state with replicates that have been run self.replicates.append(r) # Update the filename self.filename = filename
[docs] def buildVector(self, arg1,arg2,arg3): v = lm.vector(arg1,arg2,arg3) v.thisown=0 return v
[docs] def buildPoint(self, arg1,arg2,arg3): p = lm.point(arg1,arg2,arg3) p.thisown=0 return p
[docs] def buildSphere(self, arg1,arg2,arg3): s = lm.Sphere(arg1,arg2,arg3) s.thisown=0 return s
[docs] def buildEllipse(self, arg1,arg2,arg3,arg4,arg5,arg6,arg7): e = lm.Ellipse(arg1,arg2,arg3,arg4,arg5,arg6,arg7) e.thisown=0 return e
[docs] def buildDifference(self, arg1,arg2,arg3): d = lm.Difference(arg1,arg2,arg3) d.thisown=0 return d
def _repr_html_(self): header = '<td style="text-align:%s"><b>%s</b></td>' row = '<td style="text-align:%s">%s</td>' # Write out major parameters s = "" s += "<h1> RDME Simulation: %s</h1>"%(self.name) s += "<h2> Simulation Parameters </h2>" s += "Lattice Spacing: %f<br/>"%float(self.latticeSpacing) s += "Dimensions: (%f,%f,%f)<br/>"%(self.continousDimensions[0], self.continousDimensions[1], self.continousDimensions[2]) for k,v in self.parameters.items(): s += "%s: %s<br/>"%(str(k), str(v)) # Write completed simulations if len(self.replicates) > 0: s += "<h2> Simulations </h2>" s += "Filename: " + str(self.filename) + "<br/>" s += "Completed replicates: " + str(self.replicates) + "<br/>" # Write out regions if len(self.regions.keys()) > 0: s += "<h2> Regions </h2>" for k, v in self.regions.items(): s += "%s<br/>"%(k) # Write out species if len(self.species_id) > 0: s += "<h2> Species </h2>" s += writeTable(["Specie","Particle ID","Initial Count"], [(specie,self.particleMap[specie],self.initial_counts[specie]) for specie in self.species_id]) # Write out reactions if sum([len(reg.reactions) for reg in self.regions.values()]) > 0: s += "<h2> Reaction Model </h2>" rows = [] for name, reg in self.regions.items(): for rxn in reg.reactions: rxnstr, rate, units = getReactionString(rxn[0],rxn[1],rxn[2]) rows.append( (rxnstr,name,rate,units) ) s += writeTable(["Reaction","Region","Rate","Units"], rows) # Write out diffusion if len(self.regions) > 0: s += "<h2> Diffusion Model </h2>" s += "<h3> Default Rates </h3>" s += writeTable(["Region","Diffusion Rate"], [(name, reg.defaultDiffusionRate) for name,reg in self.regions.items()]) s += "<h3> Species Rates </h3>" rows = [] for name, reg in self.regions.items(): for specie, diffCoeff in reg.diffusionRate.items(): rows.append( (name, specie, diffCoeff) ) s += writeTable(["Region","Species","Diffusion Rate"], rows) if len(self.transitionRates) > 0: s += "<h3> Transition Rates </h3>" s += writeTable(["Specie","From Region", "To Region", "Diffusion Rate"], [(tr[0],tr[1],tr[2],tr[3]) for tr in self.transitionRates]) s += '<table>' # HTML string for representing the model return s
[docs] class IntRDMESimulation(RDMESimulation): def __init__(self, dimensions, spacing, name="unnamed", defaultRegion="default"): RDMESimulation.__init__(self, dimensions, spacing, name, defaultRegion) self.bytesPerParticle=4;
[docs] def allocateLattice(self): lx,ly,lz = [int(round(cDim/self.latticeSpacing)) for cDim in self.continousDimensions] return lm.IntLattice(lx, ly, lz, self.latticeSpacing, lm.getCompiledLatticeMaxOccupancy())