# Gillespie dynamics base class # # Copyright (C) 2017--2019 Simon Dobson # # This file is part of epydemic, epidemic network simulations in Python. # # epydemic is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # epydemic is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with epydemic. If not, see . from epydemic import * import math import numpy import csv class HistoryStochasticDynamics(Dynamics): '''A dynamics that runs stochastically in :term:`continuous time`. This is a very efficient and statistically exact approach, but requires that the statistical properties of the events making up the process are known. :param p: the process to run :param g: prototype network to run the dynamics over (optional, can be provided later)''' def __init__( self, simulation_id, country_id, seeds_id, p, g = None): super(HistoryStochasticDynamics, self).__init__(p, g) self.id = simulation_id self.country_id = country_id self.seeds_id = seeds_id def do( self, params ): '''Run the simulation using Gillespie dynamics. :param params: the experimental parameters :returns: the experimental results dict''' folder = 'simulations/' with open(folder + 'sim_' + self.id + '_country_' + self.country_id + '_seeds_' + self.seeds_id + '_history.csv', mode='w') as file: writer = csv.writer(file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) header = ['time'] for n in self.network().nodes(): header.append(str(n)) writer.writerow(header) # run the dynamics proc = self.process() t = 0 events = 0 while not proc.atEquilibrium(t): snapshot = self.get_snapshot(t) writer.writerow(snapshot) # pull the transition dynamics at this timestep transitions = proc.eventRateDistribution(t) # compute the total rate of transitions for the entire network a = 0.0 for (_, r, _) in transitions: a = a + r if a == 0.0: break # no events with non-zero rates # shuffle the transitions #random.shuffle(transitions) # calculate the timestep delta r1 = numpy.random.random() dt = (1.0 / a) * math.log(1.0 / r1) # calculate which event happens (l, _, ef) = transitions[0] if len(transitions) > 1: # choose the rate threshold r2 = numpy.random.random() xc = r2 * a # find the largest event for which the cumulative rates # are less than the random threshold xs = 0 for v in range(0, len(transitions)): (l, xsp, ef) = transitions[v] if (xs + xsp) > xc: break else: xs = xs + xsp # increment the time t = t + dt # fire any events posted for at or before this time events = events + self.runPendingEvents(t) # it's possible that posted events have removed all elements # from the chosen locus, in which case we simply continue # with the next event selection # sd: is this correct? or does it mess up the statistics too much? if len(l) > 0: # draw a random element from the chosen locus e = l.draw() # perform the event by calling the event function, # passing the dynamics, event time, network, and element ef(t, e) # increment the event counter events = events + 1 # add some more metadata (self.metadata())[self.TIME] = t (self.metadata())[self.EVENTS] = events # report results rc = self.experimentalResults() return rc def get_snapshot(self, t): snapshot = [t] this_network = self.network() for k, v in this_network.nodes().items(): snapshot.append(v['compartment']) return snapshot