Source code for covid19_npis.modelParams

import datetime
import numpy as np
import pandas as pd
import tensorflow as tf
import logging
from scipy.interpolate import BSpline
import pprint

log = logging.getLogger(__name__)


[docs]class ModelParams: """ This is a class for all model parameters. It is mainly used to have a convenient to access data in model wide parameters e.g. start date for simulation. This class also contains the data used for fitting. `dataframe` is the original dataframe. `data_tensor` is a tensor in the correct shape (time x countries x age) with values replace by nans when no data is available. Parameters ---------- countries: list, :py:class:`covid19_npis.data.Country` Data objects for multiple countries """ def __init__( self, countries, offset_sim_data=20, minimal_daily_cases=40, min_offset_sim_death_data=40, minimal_daily_deaths=10, spline_degree=3, spline_stride=7, dtype="float32", ): self._dtype = dtype self._offset_sim_data = offset_sim_data self._minimal_daily_cases = minimal_daily_cases self._min_offset_sim_death_data = min_offset_sim_death_data self._minimal_daily_deaths = minimal_daily_deaths self._spline_degree = spline_degree self._spline_stride = spline_stride self._indices_begin_data = None # Save data objects and calculate all other variables self.countries = countries # Make global accessible since only one instance should be active at any time globals()["modelParams"] = self
[docs] @classmethod def from_folder(cls, fpath, **kwargs): """ Create modelParams class from folder containing differet regions or countrys """ import os from .data import Country c = [] for entry in os.scandir(fpath): if os.path.isdir(entry): c.append(Country(entry.path)) return cls(countries=c, **kwargs)
@property def countries(self): """ Data objectes for each country. Return ------ : List of all country object """ return self._countries @countries.setter def countries(self, countries): """ Every time the countries are set we want to update every other, data variable i.e. dataframe, data summary and data_tensor. This is done here! """ def join_dataframes(key, check_dict, attribute_name): """Joins the dataframes for each country if the key exists in a given dictionary. Parameters ---------- key: str check_dict: dict attribute_name: str Returns ------- df (joined) """ if not check_dict[key]: return None for i, country in enumerate(self._countries): if i > 0: df = df.join(getattr(country, attribute_name)) else: df = getattr(country, attribute_name) return df self._countries = countries # Create dictionary and add existing csv files check = self._countries[0].exist for i, country in enumerate(self._countries): for key in country.exist: check[key] &= country.exist[key] self._check = check # Save for data summary """ Update dataframes """ # Positive tests self._dataframe_new_cases = join_dataframes( key="/new_cases.csv", check_dict=check, attribute_name="data_new_cases" ) # Total tests self._dataframe_total_tests = join_dataframes( key="/tests.csv", check_dict=check, attribute_name="data_total_tests" ) # Deaths self._dataframe_deaths = join_dataframes( key="/deaths.csv", check_dict=check, attribute_name="data_deaths" ) # Population self._dataframe_population = join_dataframes( key="/population.csv", check_dict=check, attribute_name="data_population" ) # Interventions self._dataframe_interventions = join_dataframes( key="/interventions.csv", check_dict=check, attribute_name="data_interventions", ) """ Update data summary """ self._update_data_summary() """ Calculate positive tests data tensor (tensorflow) Set data tensor, replaces values smaller than 40 by nans. """ self.pos_tests_data_tensor = self._dataframe_new_cases # Uses setter below! """ # Calculate total tests data tensor (tensorflow) Set data tensor, replaces values smaller than 40 by nans. """ self.total_tests_data_tensor = self._dataframe_total_tests # Uses setter below! """ # Update deaths data tensor set data tensor, replaces values smaller than 10 by nans. """ self.deaths_data_tensor = self._dataframe_deaths # Uses setter below! """ # Update intervetions data tensor """ self.date_data_tensor = self._dataframe_interventions # Uses setter below! self.gamma_data_tensor = self._dataframe_interventions # ------------------------------------------------------------------------------ # # Data Summary # ------------------------------------------------------------------------------ #
[docs] def _update_data_summary(self): """# Update Data summary""" data = { # Is set on init "data begin": self.date_data_begin, "data end": self.date_data_end, "sim begin": self.date_data_begin - datetime.timedelta(days=self._offset_sim_data), "sim end": self.date_data_end, "age_groups": [], "countries": [], "interventions": [], "files": self._check, } # Create countries lookup list dynamic from data dataframe for country_name in self.pos_tests_dataframe.columns.get_level_values( level="country" ).unique(): data["countries"].append(country_name) # Create age group list dynamic from data dataframe for age_group_name in self.pos_tests_dataframe.columns.get_level_values( level="age_group" ).unique(): data["age_groups"].append(age_group_name) # Create interventions list dynamic from interventions dataframe for i in self._dataframe_interventions.columns.get_level_values( level="intervention" ).unique(): data["interventions"].append(i) self._data_summary = data
@property def data_summary(self): """ Data summary for modelParams object. """ return self._data_summary def __str__(self): """ Nicely converted string of the data summary if the object is printed. """ return pprint.pformat(self.data_summary) def __repr__(self): return self.__str__() # ------------------------------------------------------------------------------ # # Interventions # ------------------------------------------------------------------------------ # @property def date_data_tensor(self): """ Creates a tensor with dimension intervention, country, change_points Padded with 0.0 for none existing change points """ return self._date_data_tensor @property def gamma_data_tensor(self): """ Creates a ragged tensor with dimension intervention, country, change_points The change points dimension can have different sizes. """ return self._gamma_data_tensor @gamma_data_tensor.setter def gamma_data_tensor(self, df): max_num_cp = self.max_num_cp data = [] for i, intervention in enumerate( self.countries[0].interventions ): # Should be same across all countries -> 0 d_c = [] for c, country in enumerate(self.countries): d_cp = [] for p, cp in enumerate(country.change_points[intervention.name]): d_cp.append(cp.gamma_max) if len(d_cp) < max_num_cp: for x in range(max_num_cp - len(d_cp)): d_cp.append(0.0) d_c.append(d_cp) data.append(d_c) self._gamma_data_tensor = tf.constant(data, dtype=self.dtype) @date_data_tensor.setter def date_data_tensor(self, df): max_num_cp = self.max_num_cp data = [] for i, intervention in enumerate( self.countries[0].interventions ): # Should be same across all countries -> 0 d_c = [] for c, country in enumerate(self.countries): d_cp = [] for p, cp in enumerate(country.change_points[intervention.name]): d_cp.append(self.date_to_index(cp.date_data)) if len(d_cp) < max_num_cp: for x in range(max_num_cp - len(d_cp)): d_cp.append(0.0) d_c.append(d_cp) data.append(d_c) self._date_data_tensor = tf.constant(data, dtype=self.dtype) # ------------------------------------------------------------------------------ # # Positive tests # ------------------------------------------------------------------------------ # @property def pos_tests_dataframe(self): """ New cases as multiColumn dataframe level 0 = country/region and level 1 = age group. """ return self._dataframe_new_cases @property def pos_tests_data_tensor(self): """ Tensor of daily new cases / positive tests for countries/regions and age groups. Returns ------- tf.Tensor |shape| time, country, agegroup """ return self._tensor_pos_tests @property def pos_tests_data_array(self): """ Numpy Array of daily new cases / positive tests for countries/regions and age groups. Returns ------- tf.Tensor |shape| time, country, agegroup """ return self._array_pos_tests.astype(self.dtype) @pos_tests_data_tensor.setter def pos_tests_data_tensor(self, df): """ Setter for the data tensor Parameters ---------- df: pd.DataFrame Positive tests dataframe """ new_cases_tensor = ( df.to_numpy() .astype(self.dtype) .reshape((-1, len(self.countries), len(self.age_groups))) ) new_cases_tensor = np.concatenate( [ np.zeros( (self._offset_sim_data, len(self.countries), len(self.age_groups)) ), new_cases_tensor, ] ) i_data_begin_list = [] for c in range(new_cases_tensor.shape[1]): mask = ( np.sum(new_cases_tensor[:, c, :], axis=-1) > self._minimal_daily_cases ) if mask.sum() == 0: # [False,False,False] i_data_begin = len(mask) - 1 else: i_data_begin = np.min(np.nonzero(mask)[0]) i_data_begin_list.append(i_data_begin) i_data_begin_list = np.array(i_data_begin_list) # i_data_begin_list = np.maximum(i_data_begin_list, self._offset_sim_data) self._indices_begin_data = i_data_begin_list for c, i in enumerate(self.indices_begin_data): new_cases_tensor[:i, c, :] = np.nan self._array_pos_tests = new_cases_tensor self._tensor_pos_tests = tf.constant(new_cases_tensor, dtype=self.dtype) # ------------------------------------------------------------------------------ # # Total tests # ------------------------------------------------------------------------------ # @property def total_tests_dataframe(self): """ Dataframe of total tests in all countries. Datetime index and country columns as Multiindex. """ return self._dataframe_total_tests @property def total_tests_data_tensor(self): """ Returns ------- tf.Tensor: |shape| time, country """ return self._tensor_total_tests @total_tests_data_tensor.setter def total_tests_data_tensor(self, df): """ Setter for the total tests data tensor Parameters ---------- df: pd.DataFrame Total tests dataframe """ if not self.countries[0].exist["/tests.csv"]: self._tensor_total_tests = None return total_tests_tensor = ( self._dataframe_total_tests.to_numpy() .astype(self.dtype) .reshape((-1, len(self.countries))) ) total_tests_tensor = np.concatenate( [ np.zeros((self._offset_sim_data, len(self.countries))), total_tests_tensor, ] ) for c, i in enumerate(self.indices_begin_data): total_tests_tensor[:i, c] = np.nan self._tensor_total_tests = tf.constant(total_tests_tensor, dtype=self.dtype) # ------------------------------------------------------------------------------ # # Number of deaths # ------------------------------------------------------------------------------ # @property def deaths_dataframe(self): """ Dataframe of deaths in all countries. Datetime index and country columns as Multiindex. """ return self._dataframe_deaths @property def deaths_data_tensor(self): """ Returns ------- tf.Tensor: |shape| time, country """ return self._tensor_deaths @deaths_data_tensor.setter def deaths_data_tensor(self, df): """ Setter for the deaths data tensor Parameters ---------- df: pd.DataFrame Deaths tests dataframe """ if len(df.columns.names) == 1: deaths_tensor = ( df.to_numpy() .astype(self.dtype) .reshape((-1, len(self.countries))) ## assumes non-age-stratified data ) deaths_tensor = np.concatenate( [ np.zeros((self._offset_sim_data, len(self.countries),)), deaths_tensor, ] ) if len(df.columns.names) == 2: deaths_tensor = ( df.to_numpy() .T.astype(self.dtype) .reshape( (-1, len(self.countries), len(self.age_groups)) ) ## assumes non-age-stratified data ) deaths_tensor = np.concatenate( [ np.zeros( ( self._offset_sim_data, len(self.countries), len(self.age_groups), ) ), deaths_tensor, ] ) i_data_begin_list = [] for c in range(deaths_tensor.shape[1]): mask = deaths_tensor[:, c] > self._minimal_daily_deaths if mask.sum() == 0: # [False,False,False] i_data_begin = len(mask) - 1 else: i_data_begin = np.min(np.nonzero(mask)[0]) i_data_begin_list.append(i_data_begin) i_data_begin_list = np.array(i_data_begin_list) i_data_begin_list = np.maximum( i_data_begin_list, self._min_offset_sim_death_data ) self._indices_begin_data_deaths = np.maximum( i_data_begin_list, self.indices_begin_data ) for c, i in enumerate(self._indices_begin_data_deaths): deaths_tensor[:i, c] = np.nan self._tensor_deaths = tf.constant(deaths_tensor, dtype=self.dtype) # ------------------------------------------------------------------------------ # # Population # ------------------------------------------------------------------------------ # @property def N_dataframe(self): """ Dataframe of population in all countries. Datetime index and country columns as Multiindex. """ return self._dataframe_population @property def N_data_tensor(self): """ Creates the population tensor with automatically calculated age strata/brackets. |shape| country, age_groups """ data = [] for c, country in enumerate(self.countries): d_c = [] # Get real age groups from country config age_dict = country.age_groups for age_group in self.age_groups: # Select age range from config and sum over it lower, upper = age_dict[age_group] d_c.append(country.data_population[lower:upper].sum().values[0]) data.append(d_c) return tf.constant(data, dtype="float32") @property def N_data_tensor_total(self): """ Creates the population tensor for every age. |shape| country, age """ data = [] for c, country in enumerate(self.countries): data.append(country.data_population.values[:, 0].tolist()) return tf.constant(data, dtype="float32", shape=[self.num_countries, 101]) # ------------------------------------------------------------------------------ # # Additional properties # ------------------------------------------------------------------------------ # @property def dtype(self): return self._dtype @property def offset_sim_data(self): return self._offset_sim_data @property def age_groups(self): return self.data_summary["age_groups"] @property def num_age_groups(self): return len(self.data_summary["age_groups"]) @property def num_countries(self): return len(self.data_summary["countries"]) @property def num_interventions(self): return len(self.data_summary["interventions"]) @property def num_splines(self): return self.spline_basis.shape[1] @property def indices_begin_data(self): """ Returns the index of every country when the first case is reported. It could be that for some countries, the index is later than self.offset_sim_data. """ if self._indices_begin_data is None: self.pos_tests_data_tensor = self.pos_tests_dataframe return self._indices_begin_data @property def length_data(self): """ Returns ------- :number Length of the inserted/loaded data in days """ return len(self._dataframe_new_cases) @property def length_sim(self): """ Returns ------- :number Length of the simulation in days. """ return len(self._tensor_pos_tests) @property def max_num_cp(self): data = [] for i, intervention in enumerate( self.countries[0].interventions ): # Should be same across all countries -> 0 for c, country in enumerate(self.countries): index = 0 for p, cp in enumerate(country.change_points[intervention.name]): index = index + 1 data.append(index) return max(data) @property def date_data_begin(self): return self.pos_tests_dataframe.index.min() @property def date_sim_begin(self): return self.pos_tests_dataframe.index.min() - datetime.timedelta( days=self._offset_sim_data ) @property def date_data_end(self): return self.pos_tests_dataframe.index.max() @property def spline_basis(self): """ Calculates B-spline basis. Return ------ |shape| modelParams.length_sim, modelParams.num_splines """ stride = self._spline_stride degree = self._spline_degree knots = np.arange( self.length_sim + degree * stride, 0 - (degree + 1) * stride, -stride ) knots = knots[::-1] num_splines = len(knots) - 2 * (degree - 1) spl = BSpline(knots, np.eye(num_splines), degree, extrapolate=False) spline_basis = spl(np.arange(0, self.length_sim)) return spline_basis # ------------------------------------------------------------------------------ # # Other Methods # ------------------------------------------------------------------------------ # def date_to_index(self, date): return (date - self.date_data_begin).days + self.offset_sim_data def get_weekdays(self): self._weekdays_data_tensor = tf.constant( pd.date_range(start=self.date_data_begin, end=self.date_data_begin).weekday, tf.float32, ) return self._weekdays_data_tensor
[docs] def _make_global(self): """ Run once if you want to make the modelParams global. Used in plotting """ globals()["modelParams"] = self