# -*- coding: utf-8 -*-
"""
Created on Mon Sep 02 19:41:11 2013
This module creates the **model** class (:class:`model`), which is the main class for
managing models.
To make life easy the model class is constructed through a numner of mixin classes.
Each of these handles more or less common tasks.
- :class:`BaseModel`
- :class:`Solver_Mixin`
- :class:`Zip_Mixin`
- :class:`Json_Mixin`
- :class:`Model_help_Mixin`
- :class:`Display_Mixin`
- :class:`Graph_Draw_Mixin`
- :class:`Graph_Mixin`
- :class:`Dekomp_Mixin`
- :class:`Org_model_Mixin`
- :class:`Description_Mixin`
- :class:`Excel_Mixin`
- :class:`Dash_Mixin`
- :class:`Modify_Mixin`
- :class:`Fix_Mixin`
- :class:`Stability_Mixin`
- :class:`Report_Mixin`
In addition the :class:`upd` class is defines, It defines a pandas dataframe extension
which allows the user to use the upd methods on dataframes.
The core function of **model** is :func:`BaseModel.analyzemodelnew` which first
tokenizes a model specification then anlyzes the model. Then the model can be solved
by one of the methods in the :class:`Solver_Mixin` class.
@author: Ib
"""
try:
# if in linux
import model_Excel as me
except:
...
from pathlib import Path
from io import StringIO
import json
from collections import defaultdict, namedtuple
from itertools import groupby, chain, zip_longest,accumulate
import re
import pandas as pd
import sys
import networkx as nx
import fnmatch
import numpy as np
from itertools import chain
from collections import Counter
import time
from contextlib import contextmanager
import os
from subprocess import run
import webbrowser as wb
import importlib
import gc
import copy
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import zipfile
from functools import partial,cached_property,lru_cache
from tqdm.auto import tqdm
import operator
from urllib.request import urlopen
from io import BytesIO,StringIO
from matplotlib import rcParams
rcParams.update({'figure.autolayout': True})
rcParams.update({'figure.max_open_warning': 50})
import seaborn as sns
from IPython.display import SVG, display, Image, IFrame, HTML
import ipywidgets as ip
try:
from numba import jit, njit
except:
pass
import os
try:
import xlwings as xw
except:
...
import modelmanipulation as mp
import modelvis as mv
import modelpattern as pt
from modelnet import draw_adjacency_matrix
from modelnewton import newton_diff
import modeljupyter as mj
from modelhelp import cutout, update_var
from modelnormalize import normal
from modeldekom import totdif
# functions used in BL language
from scipy.stats import norm
from math import isclose, sqrt, erf
from scipy.special import erfinv, ndtri
import gzip
np.seterr(all='ignore')
[docs]
class BaseModel():
"""Class which defines a model from equations
In itself the BaseModel is of no use.
The **model** class enriches BaseModel with additional
Mixin classes which has additional methods and properties.
A model instance has a number of properties among which theese can be particular useful:
:allvar: Information regarding all variables
:basedf: A dataframe with first result created with this model instance
:lastdf: A dataframe with the last result created with this model instance
The two result dataframes are used for comparision and visualisation. The user can set both basedf and altdf.
"""
def __init__(self, i_eq='', modelname='testmodel', silent=False, straight=False, funks=[],
tabcomplete=True, previousbase=False, use_preorder=True, normalized=True,safeorder= False,
var_description={}, model_description = '',
var_groups = {}, reports = {}, equations_latex='', eviews_dict = {},use_fbmin= True,
**kwargs):
''' initialize a model'''
if i_eq != '':
self.funks = funks
self.equations = i_eq if '$' in i_eq else mp.tofrml(i_eq, sep='\n')
self.name = modelname
# if True the dependency graph will not be called and calculation wil be in input sequence
self.straight = straight
self.safeorder= safeorder
self.save = True # saves the dataframe in self.basedf, self.lastdf
self.use_fbmin = use_fbmin
self.analyzemodelnew(silent) # on board the model equations
self.maxstart = 0
self.genrcolumns = []
# do we want tabcompletion (slows dovn input to large models)
self.tabcomplete = tabcomplete
# set basedf to the previous run instead of the first run
self.previousbase = previousbase
if not self.istopo or self.straight:
self.use_preorder = use_preorder # if prolog is used in sim2d
else:
self.use_preorder = False
self.keep_solutions = {}
self.group_dict = {}
self.var_description = var_description
self.var_groups = var_groups
self.reports = reports
self.model_description= model_description
self.eviews_dict = eviews_dict
self.equations_latex = equations_latex
return
[docs]
@classmethod
def from_eq(cls, equations, modelname='testmodel', silent=False, straight=False, funks=[],
params={}, tabcomplete=True, previousbase=False, normalized=True,
norm=True, sym=False, sep='\n', **kwargs):
"""
Creates a model from macro Business logic language.
That is the model specification is first exploded.
:parameter equations: The model
:parameter modelname: Name of the model. Defaults to 'testmodel'.
:parameter silent: Suppress messages. Defaults to False.
:parameter straigth: Don't reorder the model. Defaults to False.
:parameter funks: Functions incorporated in the model specification . Defaults to [].
:parameter params: For later use. Defaults to {}.
:parameter tabcomplete: Allow tab compleetion in editor, for large model time consuming. Defaults to True.
:parameter previousbase: Use previous run as basedf not the first. Defaults to False.
:parameter norm: Normalize the model. Defaults to True.
:parameter sym: If normalize do it symbolic. Defaults to False.
:parameter sep: Seperate the equations. Defaults to newline.
:return: A model instance
"""
udrullet = mp.explode(equations, norm=norm,
sym=sym, funks=funks, sep=sep)
pt.check_syntax_model(udrullet)
mmodel = cls(udrullet, modelname, silent=silent, straight=straight, funks=funks,
tabcomplete=tabcomplete, previousbase=previousbase, normalized=normalized, **kwargs)
mmodel.equations_original = equations
return mmodel
[docs]
def get_histmodel(self):
""" return a model instance with a model which generates historic values for equations
marked by a frml name I or IDENT
Uses :any:`find_hist_model`
"""
hist_eq = mp.find_hist_model(self.equations)
return type(self)(hist_eq, funks=self.funks)
[docs]
def analyzemodelnew(self, silent):
''' Analyze a model
The function creats:**Self.allvar** is a dictory with an entry for every variable in the model
the key is the variable name.
For each endogeneous variable there is a directory with thees keys:
:maxlag: The max lag for this variable
:maxlead: The max Lead for this variable
:endo: 1 if the variable is endogeneous (ie on the left hand side of =
:frml: String with the formular for this variable
:frmlnumber: The number of the formular
:varnr: Number of this variable
:terms: The frml for this variable translated to terms
:frmlname: The frmlname for this variable
:startnr: Start of this variable in gauss seidel solutio vector :Advanced:
:matrix: This lhs element is a matrix
:dropfrml: If this frml shoud be excluded from the evaluation.
In addition theese properties will be created:
:endogene: Set of endogeneous variable in the model
:exogene: Se exogeneous variable in the model
:maxnavlen: The longest variable name
:blank: An emty string which can contain the longest variable name
:solveorder: The order in which the model is solved - initaly the order of the equations in the model
:normalized: This model is normalized
:endogene_true: Set of endogeneous variables in model if normalized, else the set of declared endogeneous variables
'''
gc.disable()
mega_all = pt.model_parse(self.equations, self.funks)
# mega = mega_all
# breakpoint()
# now separate a model for calculating add_factor after the model is
# run # the add factor model has a frmlname of CALC_ADD_FACTOR
self.split_calc_add_factor = any( [pt.kw_frml_name(f.frmlname, 'CALC_ADD_FACTOR') for f,nt in mega_all])
if self.split_calc_add_factor:
# breakpoint()
mega = [(f,nt) for f,nt in mega_all if not pt.kw_frml_name(f.frmlname, 'CALC_ADD_FACTOR')]
mega_calc_add_factor = [(f,nt) for f,nt in mega_all if pt.kw_frml_name(f.frmlname, 'CALC_ADD_FACTOR')]
calc_add_factor_frml = [f'FRML <CALC> {f.expression}' for f,nt in mega_calc_add_factor]
self.calc_add_factor_model = model(' '.join(calc_add_factor_frml),funks=self.funks,modelname='Calculate add factors')
if not self.calc_add_factor_model.istopo:
raise Exception('The add factor calculation model should be recursive')
else:
mega = mega_all
termswithvar = {t for (f, nt) in mega for t in nt if t.var}
# varnames = list({t.var for t in termswithvar})
termswithlag = sorted([(t.var, '0' if t.lag == '' else t.lag)
for t in termswithvar], key=lambda x: x[0]) # sorted by varname and lag
groupedvars = groupby(termswithlag, key=lambda x: x[0])
varmaxlag = {varandlags[0]: (
min([int(t[1]) for t in list(varandlags[1])])) for varandlags in groupedvars}
groupedvars = groupby(termswithlag, key=lambda x: x[0])
varmaxlead = {varandlags[0]: (
max([int(t[1]) for t in list(varandlags[1])])) for varandlags in groupedvars}
# self.maxlag = min(varmaxlag[v] for v in varmaxlag.keys())
self.maxlag = min(v for k, v in varmaxlag.items())
self.maxlead = max(v for k, v in varmaxlead.items())
self.allvar = {name: {
'maxlag': varmaxlag[name],
'maxlead': varmaxlead[name],
'matrix': 0,
# 'startnr' : 0,
'endo': 0} for name in {t.var for t in termswithvar}}
# self.aequalterm = ('','','=','','') # this is how a term with = looks like
# this is how a term with = looks like
self.aequalterm = ('', '=', '', '')
for frmlnumber, ((frml, fr, n, udtryk), nt) in enumerate(mega):
# find the position of =
assigpos = nt.index(self.aequalterm)
# variables to the left of the =
zendovar = [t.var for t in nt[:assigpos] if t.var]
# do this formular define a matrix on the left of =
boolmatrix = pt.kw_frml_name(n, 'MATRIX')
for pos, endo in enumerate(zendovar):
if self.allvar[endo]['endo']:
print(' **** On the left hand side several times: ', endo)
self.allvar[endo]['dropfrml'] = (1 <= pos)
self.allvar[endo]['endo'] = 1
self.allvar[endo]['frmlnumber'] = frmlnumber
self.allvar[endo]['frml'] = frml
self.allvar[endo]['terms'] = nt[:]
self.allvar[endo]['frmlname'] = n
self.allvar[endo]['matrix'] = boolmatrix
self.allvar[endo]['assigpos'] = assigpos
# finished looping over all the equations
self.endogene = {
x for x in self.allvar.keys() if self.allvar[x]['endo']}
self.exogene = {
x for x in self.allvar.keys() if not self.allvar[x]['endo']}
self.allvar_set = self.endogene | self.exogene
self.exogene_true = {
v for v in self.exogene if not v+'___RES' in self.endogene}
# breakpoint()
self.normalized = not all((v.endswith('___RES')
for v in self.endogene))
self.endogene_true = self.endogene if self.normalized else {
v for v in self.exogene if v+'___RES' in self.endogene}
# dummy variables for variables for which to calculate adjustment variables
self.fix_dummy = sorted(vx+'_D' for vx in self.endogene
if vx+'_X' in self.exogene and vx+'_D' in self.exogene )
self.fix_add_factor = [v[:-2]+'_A' for v in self.fix_dummy]
self.fix_value = [v[:-2]+'_X' for v in self.fix_dummy]
self.fix_endo = [v[:-2] for v in self.fix_dummy]
# # the order as in the equations
# for iz, a in enumerate(sorted(self.allvar)):
# self.allvar[a]['varnr'] = iz
self.v_nr = sorted([(v, self.allvar[v]['frmlnumber'])
for v in self.endogene], key=lambda x: x[1])
self.nrorder = [v[0] for v in self.v_nr]
if self.straight: # no sequencing
self.istopo = False
self.solveorder = self.nrorder
else:
try:
self.topo = list(nx.topological_sort(self.endograph))
self.solveorder = self.topo
self.istopo = True
self.solveorder = self.topo
# check if there is formulars with several left hand side variables
# this is a little tricky
dropvar = [(v, self.topo.index(v), self.allvar[v]['frmlnumber']) for v in self.topo
if self.allvar[v]['dropfrml']] # all dropped vars and their index in topo and frmlnumber
if len(dropvar):
# all multi-lhs formulars
multiendofrml = {frmlnr for (
var, toposort, frmlnr) in dropvar}
dropthisvar = [v for v in self.endogene # theese should also be droppen, now all are dropped
if self.allvar[v]['frmlnumber'] in multiendofrml
and not self.allvar[v]['dropfrml']]
for var in dropthisvar:
self.allvar[var]['dropfrml'] = True
# now find the first lhs variable in the topo for each formulars. They have to be not dropped
# this means that they should be evaluated first
keepthisvarnr = [min([topoindex for (var, topoindex, frmlnr) in dropvar if frmlnr == thisfrml])
for thisfrml in multiendofrml]
keepthisvar = [self.topo[nr] for nr in keepthisvarnr]
for var in keepthisvar:
self.allvar[var]['dropfrml'] = False
except:
# print('This model has simultaneous elements or cyclical elements.')
self.istopo = False
self.solveorder = self.nrorder
# for pretty printing of variables
self.maxnavlen = max([len(a) for a in self.allvar.keys()])
self.blank = ' ' * (self.maxnavlen + 9) # a blank of right lenth
gc.enable()
# gc.collect()
return
[docs]
def smpl(self, start='', end='', df=None):
''' Defines the model.current_per which is used for calculation period/index
when no parameters are issues the current current period is returned \n
Either none or all parameters have to be provided '''
df_ = self.basedf if df is None else df
if start == '' and end == '':
# if first invocation just use the max slize
if not hasattr(self, 'current_per'):
istart, iend = df_.index.slice_locs(
df_.index[0-self.maxlag], df_.index[-1-self.maxlead])
self.current_per = df_.index[istart:iend]
else:
istart, iend = df_.index.slice_locs(start, end)
per = df_.index[istart:iend]
self.current_per = per
self.old_current_per = copy.deepcopy(self.current_per)
return self.current_per
[docs]
def check_sim_smpl(self, databank):
"""Checks if the current period (the SMPL) is can contain the lags and the leads"""
# breakpoint()
if 0 > databank.index.get_loc(self.current_per[0])+self.maxlag:
war = 'You are trying to solve the model before all lags are avaiable'
raise Exception(f'{war}\nMaxlag:{self.maxlag}, First solveperiod:{self.current_per[0]}, First dataframe index:{databank.index[0]}')
if len(databank.index) <= databank.index.get_loc(self.current_per[-1])+self.maxlead:
war = 'You are trying to solve the model after all leads are avaiable'
raise Exception(f'{war}\nMaxLeead:{self.maxlead}, Last solveperiod:{self.current_per[-1]}, Last dataframe index:{databank.index[-1]}')
[docs]
@contextmanager
def set_smpl(self, start='', end='', df=None):
"""
Sets the scope for the models time range, and restors it afterward
Args:
start : Start time. Defaults to ''.
end : End time. Defaults to ''.
df (Dataframe, optional): Used on a dataframe not self.basedf. Defaults to None.
"""
if hasattr(self, 'current_per'):
old_current_per = self.current_per.copy()
_ = self.smpl(start, end, df)
else:
_ = self.smpl(start, end, df)
old_current_per = self.current_per.copy()
# yield
# self.current_per = old_current_per
try:
yield
finally:
self.current_per = old_current_per
[docs]
@contextmanager
def set_smpl_relative(self, start_ofset=0, end_ofset=0):
''' Sets the scope for the models time range relative to the current, and restores it afterward'''
old_current_per = self.current_per.copy()
old_start, old_end = self.basedf.index.slice_locs(
old_current_per[0], old_current_per[-1])
new_start = max(0, old_start+start_ofset)
new_end = min(len(self.basedf.index), old_end+end_ofset)
self.current_per = self.basedf.index[new_start:new_end]
try:
yield
finally:
self.current_per = old_current_per
[docs]
@contextmanager
def keepswitch(self,switch=False,base_last=False,scenarios='*'):
"""
temporary place basedf,lastdf in keep_solutions
if scenarios contains * or ? they are separated by | else space
"""
from copy import deepcopy
# old_keep_solutions = {k:v.copy() for k,v in self.keep_solutions.items() }
old_keep_solutions = self.keep_solutions
if switch or base_last or scenarios in {'base_last'} :
basename = self.basename if hasattr(self, 'basename') else 'Baseline solution'
lastname = self.lastname if hasattr(self, 'lastname') else 'Last solution'
self.keep_solutions = {basename:self.basedf.copy() , lastname:self.lastdf.copy()}
else:
scenariolist = [k for k in self.keep_solutions.keys()]
if scenarios == '*':
selected = scenariolist
else:
sep= ' ' if '*' in scenarios or '?' in scenarios else '|'
selected = [v for up in scenarios.split(sep) for v in sorted(fnmatch.filter(scenariolist, up))]
# print(f'{selected=}')
# print(f'{scenariolist=}')
if len(selected):
self.keep_solutions = {v:self.keep_solutions[v] for v in selected}
else:
print(f'No scenarios match the scenarios: {scenarios}')
self.keep_solutions = old_keep_solutions
# yield
# self.keep_solutions = old_keep_solutions
try:
yield
finally:
self.keep_solutions = old_keep_solutions
@property
def endograph(self):
''' Dependencygraph for currrent periode endogeneous variable, used for reorder the equations
if self.safeorder is true feedback for all lags are included
safeorder was a fix to handle lags = -0 which unexpected was used in WB models. Now it is handeled in modelpattern
'''
if not hasattr(self, '_endograph'):
terms = ((var, inf['terms'])
for var, inf in self.allvar.items() if inf['endo'])
rhss = ((var, term[self.allvar[var]['assigpos']:])
for var, term in terms)
if self.safeorder:
# print('safeorder')
rhsvar = ((var, {v.var for v in rhs if v.var and v.var in self.endogene and v.var !=
var }) for var, rhs in rhss)
else:
rhsvar = ((var, {v.var for v in rhs if v.var and v.var in self.endogene and v.var !=
var and not v.lag}) for var, rhs in rhss)
edges = ((v, e) for e, rhs in rhsvar for v in rhs)
self._endograph = nx.DiGraph(edges)
self._endograph.add_nodes_from(self.endogene)
# print(self._endograph)
return self._endograph
@property
def calculate_freq(self):
''' The number of operators in the model '''
if not hasattr(self, '_calculate_freq'):
operators = (t.op for v in self.endogene for t in self.allvar[v]['terms'] if (
not self.allvar[v]['dropfrml']) and t.op and t.op not in '$,()=[]')
res = Counter(operators).most_common()
all = sum((n[1] for n in res))
self._calculate_freq = res+[('Total', all)]
return self._calculate_freq
@cached_property
def flop_get(self):
''' The number of operators in the model prolog,core and epilog'''
res = {
'prolog':self.calculate_freq_list(self.preorder),
'core':self.calculate_freq_list(self.coreorder),
'epilog':self.calculate_freq_list(self.epiorder)
}
return res
[docs]
def calculate_freq_list(self,varlist):
'''
Args:
varlist (TYPE): calculates number of operations in list of variables.
Returns:
calculate_freq (TYPE): DESCRIPTION.
'''
operators = (t.op for v in varlist for t in self.allvar[v]['terms'] if (
not self.allvar[v]['dropfrml']) and t.op and t.op not in '$,()=[]')
res = Counter(operators).most_common()
all = sum((n[1] for n in res))
calculate_freq = res+[('Total', all)]
return calculate_freq
[docs]
def get_columnsnr(self, df):
''' returns a dict a databanks variables as keys and column number as item
used for fast getting and setting of variable values in the dataframe'''
return {v: i for i, v in enumerate(df.columns)}
[docs]
def outeval(self, databank):
''' takes a list of terms and translates to a evaluater function called los
The model axcess the data through:Dataframe.value[rowindex+lag,coloumnindex] which is very efficient
'''
short, long, longer = 4*' ', 8*' ', 12 * ' '
def totext(t):
''' This function returns a python representation of a term'''
if t.op:
return '\n' if (t.op == '$') else t.op.lower()
elif t.number:
return t.number
elif t.var:
return 'values[row'+t.lag+','+str(columnsnr[t.var])+']'
columnsnr = self.get_columnsnr(databank)
fib1 = ['def make_los(funks=[]):\n']
fib1.append(short + 'from modeluserfunk import ' +
(', '.join(pt.userfunk)).lower()+'\n')
fib1.append(short + 'from modelBLfunk import ' +
(', '.join(pt.BLfunk)).lower()+'\n')
funktext = [short+f.__name__ + ' = funks[' +
str(i)+']\n' for i, f in enumerate(self.funks)]
fib1.extend(funktext)
fib1.append(short + 'def los(values,row,solveorder, allvar):\n')
fib1.append(long+'try :\n')
startline = len(fib1)+1
content = (longer + ('pass # '+v + '\n' if self.allvar[v]['dropfrml']
else ''.join((totext(t) for t in self.allvar[v]['terms'])))
for v in self.solveorder)
fib2 = [long + 'except :\n']
fib2.append(
longer + 'print("Error in",allvar[solveorder[sys.exc_info()[2].tb_lineno-'+str(startline)+']]["frml"])\n')
fib2.append(longer + 'raise\n')
fib2.append(long + 'return \n')
fib2.append(short + 'return los\n')
return ''.join(chain(fib1, content, fib2))
# def errfunk(self,linenr,startlines=4):
# ''' developement function
# to handle run time errors in model calculations'''
# winsound.Beep(500,1000)
# print('>> Error in :',self.name)
# print('>> At :',self._per)
# self.errdump = pd.DataFrame(self.values,columns=self.currentdf.columns, index= self.currentdf.index)
# outeq = self.currentmodel[linenr-startlines]
# varout = sorted(list({(var,lag) for (var,lag) in re.findall(self.ypat,outeq) if var not in self.funk}))
# print('>> Equation :',outeq)
# maxlen = str(3+max([len(var) for (var,lag) in varout]))
# fmt = '{:>'+maxlen+'} {:>3} {:>20} '
# print('>>',fmt.format('Var','Lag','Value'))
# for var,lag in varout:
# lagout = 0 if lag =='' else int(lag)
# print('>>',('{:>'+maxlen+'} {:>3} {:>20} ').format(var,lagout,self.errdump.loc[self._per+lagout,var]))
# print('A snapshot of the data at the error point is at .errdump ')
[docs]
def eqcolumns(self, a, b):
''' compares two lists'''
if len(a) != len(b):
return False
else:
return all(a == b)
[docs]
def xgenr(self, databank, start='', end='', silent=0, samedata=1, **kwargs):
'''Evaluates this model on a databank from start to end (means end in Danish).
First it finds the values in the Dataframe, then creates the evaluater function through the *outeval* function
(:func:`modelclass.model.fouteval`)
then it evaluates the function and returns the values to a the Dataframe in the databank.
The text for the evaluater function is placed in the model property **make_los_text**
where it can be inspected
in case of problems.
'''
sol_periode = self.smpl(start, end, databank)
self.check_sim_smpl(databank)
if not silent:
print('Will start calculating: ' + self.name)
if (not samedata) or (not hasattr(self, 'solve_dag')) or (not self.eqcolumns(self.genrcolumns, databank.columns)):
# fill all Missing value with 0.0
databank = insertModelVar(databank, self)
for i in [j for j in self.allvar.keys() if self.allvar[j]['matrix']]:
# Make sure columns with matrixes are of this type
databank.loc[:, i] = databank.loc[:, i].astype('O')
self.genrcolumns = databank.columns.copy()
make_los_text = self.outeval(databank)
self.make_los_text = make_los_text
exec(make_los_text, globals()) # creates the los function
self.solve_dag = make_los(self.funks)
values = databank.values.copy() #
for periode in sol_periode:
row = databank.index.get_loc(periode)
self.solve_dag(values, row, self.solveorder, self.allvar)
if not silent:
print(periode, ' solved')
outdf = pd.DataFrame(values, index=databank.index,
columns=databank.columns)
if not silent:
print(self.name + ' calculated ')
return outdf
[docs]
def findpos(self):
''' find a startposition in the calculation array for a model
places startposition for each variable in model.allvar[variable]['startpos']
places the max startposition in model.maxstart '''
if self.maxstart == 0:
variabler = (x for x in sorted(self.allvar.keys()))
start = 0
for v, m in ((v, self.allvar[v]['maxlag']) for v in variabler):
self.allvar[v]['startnr'] = start
start = start+(-int(m))+1
# print(v.ljust(self.maxnavlen),str(m).rjust(6),str(self.allvar[v]['start']).rju
self.maxstart = start
[docs]
def make_gaussline(self, vx, nodamp=False):
''' takes a list of terms and translates to a line in a gauss-seidel solver for
simultanius models
the variables are mapped to position in a vector which has all relevant varaibles lagged
this is in order to provide opertunity to optimise data and solving
New version to take hand of several lhs variables. Dampning is not allowed for
this. But can easely be implemented by makeing a function to multiply tupels
'''
termer = self.allvar[vx]['terms']
assigpos = self.allvar[vx]['assigpos']
if nodamp:
ldamp = False
else:
# convention for damping equations
if 'Z' in self.allvar[vx]['frmlname'] or pt.kw_frml_name(self.allvar[vx]['frmlname'], 'DAMP'):
assert assigpos == 1, 'You can not dampen equations with several left hand sides:'+vx
endovar = [t.op if t.op else ('a['+str(self.allvar[t.var]['startnr'])+']')
for j, t in enumerate(termer) if j <= assigpos-1]
# to implemet dampning of solution
damp = '(1-alfa)*('+''.join(endovar)+')+alfa*('
ldamp = True
else:
ldamp = False
out = []
for i, t in enumerate(termer[:-1]): # drop the trailing $
if t.op:
out.append(t.op.lower())
if i == assigpos and ldamp:
out.append(damp)
if t.number:
out.append(t.number)
elif t.var:
lag = int(t.lag) if t.lag else 0
out.append('a['+str(self.allvar[t.var]['startnr']-lag)+']')
if ldamp:
out.append(')') # the last ) in the dampening
res = ''.join(out)
return res
[docs]
def make_resline(self, vx):
''' takes a list of terms and translates to a line calculating line
'''
termer = self.allvar[vx]['terms']
assigpos = self.allvar[vx]['assigpos']
out = []
for i, t in enumerate(termer[:-1]): # drop the trailing $
if t.op:
out.append(t.op.lower())
if t.number:
out.append(t.number)
elif t.var:
lag = int(t.lag) if t.lag else 0
if i < assigpos:
out.append('b['+str(self.allvar[t.var]['startnr']-lag)+']')
else:
out.append('a['+str(self.allvar[t.var]['startnr']-lag)+']')
res = ''.join(out)
return res
[docs]
def createstuff3(self, dfxx):
''' Connect a dataframe with the solution vector used by the iterative sim2 solver)
return a function to place data in solution vector and to retrieve it again. '''
columsnr = {v: i for i, v in enumerate(dfxx.columns)}
pos0 = sorted([(self.allvar[var]['startnr']-lag, (var, lag, columsnr[var]))
for var in self.allvar for lag in range(0, -1+int(self.allvar[var]['maxlag']), -1)])
# if problems check if find_pos has been calculated
posrow = np.array([lag for (startpos, (var, lag, colpos)) in pos0])
poscol = np.array([colpos for (startpos, (var, lag, colpos)) in pos0])
poscolendo = [columsnr[var] for var in self.endogene]
posstartendo = [self.allvar[var]['startnr'] for var in self.endogene]
def stuff3(values, row, ljit=False):
'''Fills a calculating vector with data,
speeded up by using dataframe.values '''
if ljit:
# a = np.array(values[posrow+row,poscol],dtype=np.dtype('f8'))
# a = np.ascontiguousarray(values[posrow+row,poscol],dtype=np.dtype('f8'))
a = np.ascontiguousarray(
values[posrow+row, poscol], dtype=np.dtype('f8'))
else:
# a = values[posrow+row,poscol]
# a = np.array(values[posrow+row,poscol],dtype=np.dtype('f8'))
a = np.ascontiguousarray(
values[posrow+row, poscol], dtype=np.dtype('f8'))
return a
def saveeval3(values, row, vector):
values[row, poscolendo] = vector[posstartendo]
return stuff3, saveeval3
[docs]
def outsolve(self, order='', exclude=[]):
''' returns a string with a function which calculates a
Gauss-Seidle iteration of a model
exclude is list of endogeneous variables not to be solved
uses:
model.solveorder the order in which the variables is calculated
model.allvar[v]["gauss"] the ccalculation
'''
short, long, longer = 4*' ', 8*' ', 12 * ' '
solveorder = order if order else self.solveorder
fib1 = ['def make(funks=[]):']
fib1.append(short + 'from modeluserfunk import ' +
(', '.join(pt.userfunk)).lower())
fib1.append(short + 'from modelBLfunk import ' +
(', '.join(pt.BLfunk)).lower())
funktext = [short+f.__name__ + ' = funks[' +
str(i)+']' for i, f in enumerate(self.funks)]
fib1.extend(funktext)
fib1.append(short + 'def los(a,alfa):')
f2 = (long + self.make_gaussline(v) for v in solveorder
if (v not in exclude) and (not self.allvar[v]['dropfrml']))
fib2 = [long + 'return a ']
fib2.append(short+'return los')
out = '\n'.join(chain(fib1, f2, fib2))
return out
[docs]
def make_solver(self, ljit=False, order='', exclude=[], cache=False):
''' makes a function which performs a Gaus-Seidle iteration
if ljit=True a Jittet function will also be created.
The functions will be placed in:
model.solve
model.solve_jit '''
a = self.outsolve(order, exclude) # find the text of the solve
exec(a, globals()) # make the factory defines
# using the factory create the function
self.solve = make(funks=self.funks)
if ljit:
print('Time for a cup of coffee')
self.solve_jit = jit(
"f8[:](f8[:],f8)", cache=cache, fastmath=True)(self.solve)
return
[docs]
def base_sim(self, databank, start='', end='', max_iterations=1, first_test=1, ljit=False, exclude=[], silent=False, new=False,
conv=[], samedata=True, dumpvar=[], ldumpvar=False,
dumpwith=15, dumpdecimal=5, lcython=False, setbase=False,
setlast=True, alfa=0.2, sim=True, absconv=0.01, relconv=0.00001,
debug=False, stats=False, **kwargs):
''' solves a model with data from a databank if the model has a solve function else it will be created.
The default options are resonable for most use:
:parameter start,end: Start and end of simulation, default as much as possible taking max lag into acount
:parameter max_iterations: Max interations
:parameter first_test: First iteration where convergence is tested
:parameter ljit: If True Numba is used to compile just in time - takes time but speeds solving up
:parameter new: Force creation a new version of the solver (for testing)
:parameter exclude: Don't use use theese foormulas
:parameter silent: Suppres solving informations
:parameter conv: Variables on which to measure if convergence has been achived
:parameter samedata: If False force a remap of datatrframe to solving vector (for testing)
:parameter dumpvar: Variables to dump
:parameter ldumpvar: toggels dumping of dumpvar
:parameter dumpwith: with of dumps
:parameter dumpdecimal: decimals in dumps
:parameter lcython: Use Cython to compile the model (experimental )
:parameter alfa: Dampning of formulas marked for dampning (<Z> in frml name)
:parameter sim: For later use
:parameter absconv: Treshold for applying relconv to test convergence
:parameter relconv: Test for convergence
:parameter debug: Output debug information
:parameter stats: Output solving statistics
:return outdf: A dataframe with the solution
'''
sol_periode = self.smpl(start, end, databank)
self.check_sim_smpl(databank)
self.findpos()
# fill all Missing value with 0.0
databank = insertModelVar(databank, self)
with self.timer('create stuffer and gauss lines ', debug) as t:
if (not hasattr(self, 'stuff3')) or (not self.eqcolumns(self.simcolumns, databank.columns)):
self.stuff3, self.saveeval3 = self.createstuff3(databank)
self.simcolumns = databank.columns.copy()
with self.timer('Create solver function', debug) as t:
if ljit:
if not hasattr(self, 'solve_jit'):
self.make_solver(ljit=True, exclude=exclude)
this_solve = self.solve_jit
else:
if not hasattr(self, 'solve'):
self.make_solver(exclude=exclude)
this_solve = self.solve
values = databank.values.copy()
# columsnr=self.get_columnsnr(databank)
ittotal = 0 # total iteration counter
# convvar = [conv] if isinstance(conv,str) else conv if conv != [] else list(self.endogene)
convvar = self.list_names(self.endogene, conv)
convplace = [self.allvar[c]['startnr']
for c in convvar] # this is how convergence is measured
if ldumpvar:
dump = convvar if dumpvar == [] else self.vlist(dumpvar)
dumpplac = [self.allvar[v]['startnr'] for v in dump]
dumphead = ' '.join(
[('{:>'+str(dumpwith)+'}').format(d) for d in dump])
starttime = time.time()
for periode in sol_periode:
row = databank.index.get_loc(periode)
with self.timer('stuffing', debug) as tt:
a = self.stuff3(values, row, ljit)
# b=self.stuff2(values,row,columsnr)
# assert all(a == b)
if ldumpvar:
print('\nStart solving', periode)
print(' '+dumphead)
print('Start '+' '.join(
[('{:>'+str(dumpwith)+',.'+str(dumpdecimal)+'f}').format(a[p]) for p in dumpplac]))
jjj = 0
for j in range(max_iterations):
jjj = j+1
if debug:
print('iteration :', j)
with self.timer('iteration '+str(jjj), debug) as tttt:
itbefore = a[convplace].copy()
a = this_solve(a, alfa)
if ldumpvar:
print('Iteration {:>3}'.format(
j)+' '.join([('{:>'+str(dumpwith)+',.'+str(dumpdecimal)+'f}').format(a[p]) for p in dumpplac]))
if j > first_test:
itafter = a[convplace].copy()
convergence = True
for after, before in zip(itafter, itbefore):
# print(before,after)
if before > absconv and abs(after-before)/abs(before) > relconv:
convergence = False
break
if convergence:
if not silent:
print(periode, 'Solved in ', j, 'iterations')
break
else:
itbefore = itafter.copy()
else:
print('No convergence ', periode, ' after', jjj, ' iterations')
with self.timer('saving', debug) as t:
# self.saveeval2(values,row,columsnr,a) # save the result
self.saveeval3(values, row, a) # save the result
ittotal = ittotal+jjj
if not silent:
print(self.name, ': Solving finish from ',
sol_periode[0], 'to', sol_periode[-1])
outdf = pd.DataFrame(values, index=databank.index,
columns=databank.columns)
if stats:
numberfloats = self.calculate_freq[-1][1]*ittotal
endtime = time.time()
simtime = endtime-starttime
print('{:<40}: {:>15,}'.format(
'Floating point operations :', self.calculate_freq[-1][1]))
print('{:<40}: {:>15,}'.format('Total iterations :', ittotal))
print('{:<40}: {:>15,}'.format(
'Total floating point operations', numberfloats))
print('{:<40}: {:>15,.2f}'.format(
'Simulation time (seconds) ', simtime))
if simtime > 0.0:
print('{:<40}: {:>15,.0f}'.format(
'Floating point operations per second', numberfloats/simtime))
return outdf
[docs]
def outres(self, order='', exclude=[]):
''' returns a string with a function which calculates a
calculation for residual check
exclude is list of endogeneous variables not to be solved
uses:
model.solveorder the order in which the variables is calculated
'''
short, long, longer = 4*' ', 8*' ', 12 * ' '
solveorder = order if order else self.solveorder
fib1 = ['def make(funks=[]):']
fib1.append(short + 'from modeluserfunk import ' +
(', '.join(pt.userfunk)).lower())
fib1.append(short + 'from modelBLfunk import ' +
(', '.join(pt.BLfunk)).lower())
fib1.append(short + 'from numpy import zeros,float64')
funktext = [short+f.__name__ + ' = funks[' +
str(i)+']' for i, f in enumerate(self.funks)]
fib1.extend(funktext)
fib1.append(short + 'def los(a):')
fib1.append(long+'b=zeros(len(a),dtype=float64)\n')
f2 = [long + self.make_resline(v) for v in solveorder
if (v not in exclude) and (not self.allvar[v]['dropfrml'])]
fib2 = [long + 'return b ']
fib2.append(short+'return los')
out = '\n'.join(chain(fib1, f2, fib2))
return out
[docs]
def make_res(self, order='', exclude=[]):
r''' makes a function which performs a Gaus-Seidle iteration
if ljit=True a Jittet function will also be created.
The functions will be placed in:
- model.solve
- model.solve_jit
'''
xxx = self.outres(order, exclude) # find the text of the solve
exec(xxx, globals()) # make the factory defines
# using the factory create the function
res_calc = make(funks=self.funks)
return res_calc
[docs]
def base_res(self, databank, start='', end='', silent=1, **kwargs):
''' calculates a model with data from a databank
Used for check wether each equation gives the same result as in the original databank'
'''
if not hasattr(self, 'res_calc'):
self.findpos()
self.res_calc = self.make_res()
databank = insertModelVar(databank, self) # kan man det her? I b
values = databank.values
bvalues = values.copy()
sol_periode = self.smpl(start, end, databank)
stuff3, saveeval3 = self.createstuff3(databank)
for per in sol_periode:
row = databank.index.get_loc(per)
aaaa = stuff3(values, row)
b = self.res_calc(aaaa)
if not silent:
print(per, 'Calculated')
saveeval3(bvalues, row, b)
xxxx = pd.DataFrame(bvalues, index=databank.index,
columns=databank.columns)
if not silent:
print(self.name, ': Res calculation finish from ',
sol_periode[0], 'to', sol_periode[-1])
return xxxx
def __len__(self):
return len(self.endogene)
def __repr__(self):
fmt = '{:40}: {:>20} \n'
out = fmt.format('Model name', self.name)
out += fmt.format('Model structure ',
'Recursive' if self.istopo else 'Simultaneous')
out += fmt.format('Number of variables ', len(self.allvar))
out += fmt.format('Number of exogeneous variables ',
len(self.exogene))
out += fmt.format('Number of endogeneous variables ',
len(self.endogene))
return '<\n'+out+'>'
#
[docs]
class Org_model_Mixin():
''' This mixin enrich the Basemodel class with different utility methods.
'''
@property
def lister(self):
'''
lists used in the equations
Returns:
dict: Dictionary of lists defined in the input equations.
'''
return pt.list_extract(self.equations) # lists used in the equations
@property
def listud(self):
'''returns a string of the models listdefinitions \n
used when ceating (small) models based on this model '''
udlist = []
for l in self.lister:
udlist.append('list '+l+' = ')
for j, sl in enumerate(self.lister[l]):
lensl = str(max(30, len(sl)))
if j >= 1:
udlist.append(' / ')
udlist.append(('{:<'+lensl+'}').format('\n '+sl+' '))
udlist.append(':')
for i in self.lister[l][sl]:
udlist.append(' '+i)
udlist.append(' $ \n')
return ''.join(udlist)
# @lru_cache(maxsize=2048)
[docs]
def vlist(self, pat):
'''
Returns a list of variable in the model matching the pattern, the pattern can be a list of patterns
Args:
pat (string or list of strings): One or more pattern seperated by space wildcards * and ?, special pattern: #ENDO
Returns:
out (list): list of variable names matching the pat.
'''
''''''
if isinstance(pat, list):
upat = pat
else:
upat = [pat]
try:
if pat.upper() == '#ENDO':
out = sorted(self.endogene)
return out
except:
...
if pat.startswith('#'):
if pat[1:] in self.var_groups.keys():
upat = [self.var_groups[pat[1:]]]
else:
lf='\n'
print(f'No grouping like this. Select from: {lf}{lf.join(self.var_groups.keys())}')
raise Exception('Try with a correct group name')
ipat = upat
# breakpoint()
patlist = [apat.upper() for p in ipat for apat in p.split('|' if p.startswith('!') else ' ' )]
# patlist = [self.var_groups[apat[1:]] if apat.startswith('#') else apat for apat in patlist0]
# print(f'{patlist=}')
try:
out = [v for up in patlist for v in sorted(
self.deslist(up[1:] ) if up.startswith('!')
else
fnmatch.filter(self.allvar.keys(), up.upper())
)]
except:
''' in case the model instance is an empty instance around datatframes, typical for visualization'''
out = [v for p in ipat for up in p.split() for v in sorted(
fnmatch.filter(self.lastdf.columns, up.upper())
)
]
return out
[docs]
def vlist_names(self,input, pat):
'''returns a list of variable in input matching the pattern'''
gross_list = self.vlist(pat)
out = [v for v in gross_list if v in input]
return out
[docs]
@staticmethod
def list_names(input, pat, sort=True):
'''returns a list of variable in input matching the pattern'''
if sort:
out = [v for up in pat.split()
for v in sorted(fnmatch.filter(input, up.upper()))]
else:
out = [v for up in pat.split()
for v in fnmatch.filter(input, up.upper())]
return out
[docs]
def exodif(self, a=None, b=None):
'''
Finds the differences between two dataframes in exogeneous variables for the model
Defaults to getting the two dataframes (basedf and lastdf) internal to the model instance
Exogeneous with a name ending in <endo>__RES are not taken in, as they are part of a un_normalized model
Args:
a (TYPE, optional): DESCRIPTION. Defaults to None. If None model.basedf will be used.
b (TYPE, optional): DESCRIPTION. Defaults to None. If None model.lastdf will be used.
Returns:
DataFrame: the difference between the models exogenous variables in a and b.
'''
selected = sorted(self.exogene_true)
aexo = a.loc[:, selected] if isinstance(
a, pd.DataFrame) else self.basedf.loc[:,selected]
bexo = b.loc[:, selected] if isinstance(
b, pd.DataFrame) else self.lastdf.loc[:, selected]
diff = pd.eval('bexo-aexo')
out2 = diff.loc[(diff != 0.0).any(axis=1), (diff != 0.0).any(axis=0)]
return out2.T.sort_index(axis=0).T
[docs]
def get_eq_values(self, varnavn, last=True, databank=None, nolag=False, per=None, showvar=False, alsoendo=False):
''' Returns a dataframe with values from a frml determining a variable
options:
:last: the lastdf is used else baseline dataframe
:nolag: only line for each variable '''
if varnavn in self.endogene:
if type(databank) == type(None):
df = self.lastdf if last else self.basedf
else:
df = databank
if per == None:
current_per = self.current_per
else:
current_per = per
varterms = [(term.var, int(term.lag) if term.lag else 0)
# for term in self.allvar[varnavn.upper()]['terms'] if term.var]
for term in self.allvar[varnavn.upper()]['terms'] if term.var and not (term.var == varnavn.upper() and term.lag == '')]
# now we have droped dublicate terms and sorted
sterms = sorted(set(varterms), key=lambda x: (x[0], -x[1]))
if nolag:
sterms = sorted({(v, 0) for v, l in sterms})
if showvar:
sterms = [(varnavn, 0)]+sterms
try:
lines = [[get_a_value(df, p, v, lag)
for p in current_per] for v, lag in sterms]
except:
breakpoint()
out = pd.DataFrame(lines, columns=current_per,
index=[r[0]+(f'({str(r[1])})' if r[1] else '') for r in sterms])
return out
else:
return None
[docs]
def get_eq_dif(self, varnavn, filter=False, nolag=False, showvar=False):
''' returns a dataframe with difference of values from formula'''
out0 = (self.get_eq_values(varnavn, last=True, nolag=nolag, showvar=showvar) -
self.get_eq_values(varnavn, last=False, nolag=nolag, showvar=showvar))
if filter:
mask = out0.abs() >= 0.00000001
out = out0.loc[mask]
else:
out = out0
return out
[docs]
def get_var_growth(self,varname,showname= False,diff=False):
'''Returns the growth rate of this variable in the base and the last dataframe'''
with self.set_smpl_relative(-1, 0):
basevalues = self.basedf.loc[self.current_per, varname].pct_change()
basevalues.name = f'{varname+" " if showname else ""}Base growth'
lastvalues = self.lastdf.loc[self.current_per, varname].pct_change()
lastvalues.name = f'{varname +" " if showname else ""}Last growth'
diffvalues = lastvalues-basevalues
diffvalues.name = f'{varname +" " if showname else ""}Diff growth'
out = pd.DataFrame([basevalues,lastvalues,diffvalues]) if diff else pd.DataFrame([basevalues,lastvalues])
return out.loc[:,self.current_per]*100.
[docs]
def get_values(self, v, pct=False):
''' returns a dataframe with the data points for a node, including lags '''
t = pt.udtryk_parse(v, funks=[])
var = t[0].var
lag = int(t[0].lag) if t[0].lag else 0
bvalues = [float(get_a_value(self.basedf, per, var, lag))
for per in self.current_per]
lvalues = [float(get_a_value(self.lastdf, per, var, lag))
for per in self.current_per]
dvalues = [float(get_a_value(self.lastdf, per, var, lag) -
get_a_value(self.basedf, per, var, lag)) for per in self.current_per]
if pct:
pvalues = [(100*d/b if b != 0.0 else np.NAN)
for b, d in zip(bvalues, dvalues)]
df = pd.DataFrame([bvalues, lvalues, dvalues, pvalues], index=[
'Base', 'Last', 'Diff', 'Pct'], columns=self.current_per)
else:
df = pd.DataFrame([bvalues, lvalues, dvalues], index=[
'Base', 'Last', 'Diff'], columns=self.current_per)
return df
[docs]
def __getitem__(self, name):
'''
To execute the index operator []
Uses the :any:`modelvis.vis` operator
'''
try:
a = self.vis(name)
return a
except Exception as e:
# This will catch any exception
print(f"Error: {e}")
a = mv.DummyVis()
return a
[docs]
def __getattr__(self, name):
'''To execute the . operator
'''
if name.startswith('model_var_'):
wishfor= name[len('model_var_'):]
return(self.var_with_frmlname(wishfor))
elif name.startswith('model_eq_'):
wishfor= name[len('model_eq_'):]
return(self.frml_with_frmlname(wishfor))
else:
if name.upper() in self.allvar_set:
return mv.varvis(model=self, var=name.upper())
else:
raise AttributeError(f'The specification:"{name}" did not match a method, property or variable name')
[docs]
def __dir__(self):
if self.tabcomplete:
if not hasattr(self, '_res'):
# self._res = sorted(list(self.allvar.keys()) + list(self.__dict__.keys()) + list(type(self).__dict__.keys()))
self._res = sorted(list(self.allvar.keys(
)) + list(self.__dict__.keys()) + list(type(self).__dict__.keys()))
return self. _res
else:
res = list(self.__dict__.keys())
return res
[docs]
def todynare(self, paravars=[], paravalues=[]):
''' This is a function which converts a Modelflow model instance to Dynare .mod format
'''
def totext(t):
if t.op:
return ';\n' if (t.op == '$') else t.op.lower()
elif t.number:
return t.number
elif t.var:
return t.var+(('('+t.lag+')') if t.lag else '')
content = ('// Dropped '+v + '\n' if self.allvar[v]['dropfrml']
else ''.join((totext(t) for t in self.allvar[v]['terms']))
for v in self.solveorder)
paraout = ('parameters \n ' + '\n '.join(v for v in sorted(paravars)) + ';\n\n' +
';\n'.join(v for v in paravalues)+';\n')
# print(paraout)
out = ('\n'.join(['@#define '+k+' = [' + ' , '.join
(['"'+d+'"' for d in kdic])+']' for l, dic in self.lister.items() for k, kdic in dic.items()]) + '\n' +
'var \n ' + '\n '.join((v for v in sorted(self.endogene)))+';\n' +
'varexo \n ' + '\n '.join(sorted(self.exogene-set(paravars))) + ';\n' +
paraout +
'model; \n ' + ' '.join(content).replace('**', '^') + ' \n end; \n')
return out
[docs]
class Model_help_Mixin():
''' Helpers to model'''
[docs]
def var_with_frmlname(self,kwname=''):
''' returns endogenous variables where the frmlname contains kwname'''
return {v for v in self.endogene if pt.kw_frml_name(self.allvar[v]['frmlname'],kwname)}
[docs]
def frml_with_frmlname(self,kwname=''):
''' returns equations where the frmlname contains kwname'''
relevant_var = self.var_with_frmlname(kwname)
out = {v:self.allvar[v]['frml'] for v in relevant_var}
return out
@property
def model_exogene(self):
''' True model exogeneous
- all exogenous excluding _D _A and _X variables
'''
out = self.exogene-set(self.fix_dummy + self.fix_add_factor + self.fix_value)
return out
@property
def model_endogene(self):
'''True endogenous, for normalized models
all endogenoue - fitted variables
'''
out = self.endogene - self.var_with_frmlname('fit')
return out
[docs]
@staticmethod
@contextmanager
def timer(input='test', show=True, short=True):
'''
A timer context manager, implemented using a
generator function. This one will report time even if an exception occurs
the time in seconds can be retrieved by <retunr value>.seconds """
Parameters
----------
input : string, optional
a name. The default is 'test'.
show : bool, optional
show the results. The default is True.
short : bool, optional
. The default is False.
Returns
-------
None.
'''
import time
class xtime():
'''A help class to retrieve the time outside the with statement'''
def __init__(self, input):
self.seconds = 0.0
self.input = input
def show(self):
print(self.__repr__(), flush=True)
def __repr__(self):
minutes = self.seconds/60.
seconds = self.seconds
if minutes < 2.:
afterdec = 1 if seconds >= 10 else (
3 if seconds >= 1.01 else 10)
width = (10 + afterdec)
return f'{self.input} took : {seconds:>{width},.{afterdec}f} Seconds'
else:
afterdec = 1 if minutes >= 10 else 4
width = (10 + afterdec)
return f'{self.input} took : {minutes:>{width},.{afterdec}f} Minutes'
cxtime = xtime(input)
start = time.time()
if show and not short:
print(
f'{input} started at : {time.strftime("%H:%M:%S"):>{15}} ', flush=True)
try:
error = False
yield cxtime
except BaseException as e:
print(f'Failed: {e}:')
error = e
finally:
end = time.time()
cxtime.seconds = (end - start)
if show:
cxtime.show()
if error:
raise error
return
[docs]
@staticmethod
def update_from_list(indf, basis, lprint=False):
df = indf.copy(deep=True)
for l in basis.split('\n'):
if len(l.strip()) == 0:
continue
# print(f'line:{l}:')
var, op, value, *arg = l.split()
if len(arg) == 0:
arg = df.index[0], df.index[-1]
elif len(arg) == 1:
arg = type(df.index[0])(arg[0]), type(df.index[0]+1)(arg[0])
else:
arg = type(df.index[0])(arg[0]), type(df.index[0])(arg[1])
# print(var,op,value,arg,sep='|')
update_var(df, var.upper(), op, value, *
arg, create=True, lprint=lprint)
return df
[docs]
@staticmethod
def update_from_list_new(indf, basis, lprint=False):
df = indf.copy(deep=True)
for l in basis.split('\n'):
if len(l.strip()) == 0 or l.strip().startswith('#'):
continue
# print(f'line:{l}:')
var, op, *value, arg1, arg2 = l.split()
istart, iend = df.index.slice_locs(arg1, arg2)
arg = df.index[istart], df.index[iend]
# print(var,op,value,arg,sep='|')
update_var(df, var.upper(), op, value, *
arg, create=True, lprint=lprint)
return df
[docs]
@staticmethod
def update_old(indf, updates, lprint=False,scale = 1.0,create=True,keep_growth=False,start='',end='' ):
'''
Updates a dataframe and returns a dataframe
Args:
indf (DataFrame): input dataframe.
basis (string): lines with variable updates look below.
lprint (bool, optional): if True each update is printed Defaults to False.
scale (float, optional): A multiplier used on all update input . Defaults to 1.0.
create (bool, optional): Creates a variables if not in the dataframe . Defaults to True.
keep_growth(bool, optional): Keep the growth rate after the update time frame. Defaults to False.
start (string, optional): Global start
end (string, optional): Global end
Returns:
df (TYPE): the updated dataframe .
A line in updates looks like this:
´<`[[start] end]`>` <var> <=|+|*|%|=growth|+growth|=diff> <value>... [/ [[start] end] [--keep_growth_rate|--no_keep_growth_rate]]
'''
# start experiments for including pattern and wildcards in an updateline to update several variables in one line
# operatorpat = '('+ ('|'.join(f' {o} ' for o in r'\=|\+|\*|%|\=growth|\+growth|\=diff'.split('|'))) + ')'
# testline = ' PAKGGREVCO2CE* PAKGGREVCO2CER PAKGGREVCO2CER = 29 '
# re.split(operatorpat,testline)
df = indf.copy(deep=True)
for whole_line in updates.split('\n'):
stripped0 = whole_line.strip()
stripped = stripped0.split('#')[0]
if len(stripped) == 0 or stripped.startswith('#'):
continue
if r'/' in stripped:
...
elif '--' in stripped:
stripped = stripped.replace('--','/ --',1)
else:
stripped = stripped+r'/'
l0,loptions = stripped.upper().split(r'/')
options = loptions.split()
time_options = [o for o in options if not o.startswith('--')]
other_options = [o for o in options if o.startswith('--')]
if len(other_options) > 1:
raise Exception(f'To many options\nOffending:"{whole_line}"')
other_options[0] = '--keep_growth' if other_options[0] =='--KG' else other_options[0]
# print(f'{time_options=}')
l=(timesplit := l0.rsplit('>'))[-1]
if len(timesplit) == 2:
if len(time_options):
print(whole_line)
raise Exception(f'Do not specify time in both ends of update line\nOffending:"{whole_line}"')
time_options = timesplit[0].replace('<','').replace(',',' ').replace(':',' ').split()
if len(time_options)==1:
start = time_options[0]
end = ''
elif len(time_options)==2:
start = time_options[0]
end = time_options[1]
else:
raise Exception(f'To many times\nOffending:"{whole_line}"')
if len(l.strip())==0: # if wo only want to set time
continue
# breakpoint()
# breakpoint()
if len(time_options)==2:
arg1,arg2 = time_options
elif len(time_options)==1:
arg1 = arg2 = time_options[0]
elif start != '' and end != '':
arg1=start
arg2=end
elif (start != '' and end == ''):
arg1 = arg2 = start
elif (start == '' and end != ''):
raise Exception(f'When end is set start should also be set \nOffending:"{whole_line}"')
elif len(time_options)==0:
arg1=df.index[0]
arg2=df.index[-1]
else:
raise Exception(f'Problems with timesetting\nOffending:"{whole_line}"')
# print(f'{start=},{end=}')
# print(f'line:{l}:{time_options=} :{arg1=} {arg2=}')
if '--' in str(arg1) or '--' in str(arg2):
raise Exception(f'Probably no blank after time \nOffending:"{whole_line}"')
istart, iend = df.index.slice_locs(arg1, arg2)
current = df.index[istart:iend]
time1,time2 = current[0],current[-1]
var, op, *value = l.split()
update_growth = False
if (keep_growth or '--KEEP_GROWTH' in other_options) and not '--NO_KEEP_GROWTH' in other_options:
if var not in df.columns:
raise Exception(f'Can not keep growth for created variable\nOffending:"{whole_line}"')
resttime = df.index[iend:]
if len(resttime):
update_growth = True
growth_rate = (df.loc[:,var].pct_change())[resttime].to_list()
multiplier = list(accumulate([(1+i) for i in growth_rate],operator.mul))
# print(var,op,value,arg,sep='|')
update_var(df, var.upper(), op, value,time1,time2 ,
create=create, lprint=lprint,scale = scale)
if update_growth:
lastvalue = df.loc[time2,var]
df.loc[resttime,var]= [lastvalue * m for m in multiplier]
# breakpoint()
return df
[docs]
@staticmethod
def update(indf, updates, lprint=False,scale = 1.0,create=True,keep_growth=False,):
'''
Updates a dataframe and returns a dataframe
Args:
indf (DataFrame): input dataframe.
basis (string): lines with variable updates look below.
lprint (bool, optional): if True each update is printed Defaults to False.
scale (float, optional): A multiplier used on all update input . Defaults to 1.0.
create (bool, optional): Creates a variables if not in the dataframe . Defaults to True.
keep_growth(bool, optional): Keep the growth rate after the update time frame. Defaults to False.
Returns:
df (TYPE): the updated dataframe .
A line in updates looks like this::
[<[[start] end]>] <var> <=|+|*|%|=growth|+growth|=diff> <value>... [--keep_growth_rate|--kg|--no_keep_growth_rate|--nkg]
'''
# start experiments for including pattern and wildcards in an updateline to update several variables in one line
operatorpat = '('+ ('|'.join(f' {o} ' for o in r'=|\+|\*|%|=growth|\+growth|=diff'.upper().split('|'))) + ')'
# testline = ' PAKGGREVCO2CE* PAKGGREVCO2CER PAKGGREVCO2CER = 29 '
# re.split(operatorpat,testline)
legal_options = {'--KEEP_GROWTH','--NO_KEEP_GROWTH','--KG','--NKG'}
start = '-0' # start of dataframe
end = '-1' # end of dataframe
df = indf.copy(deep=True)
for whole_line in updates.split('\n'):
stripped0 = whole_line.strip()
stripped = stripped0.split('#')[0]
if len(stripped) == 0 :
continue
if '--' in stripped:
stripped = stripped.replace('--','/ --',1) # The first option
else:
stripped = stripped+r'/'
try:
l0,loptions = stripped.upper().split(r'/')
except:
raise Exception(f'Probably to many /\nOffending:"{whole_line}"')
options = loptions.split()
if len(options) > 1:
raise Exception(f'To many options :{options}')
for o in options:
if o not in legal_options:
raise Exception(f'Illegal option:{o}, legal is:{legal_options}')
l=(timesplit := l0.rsplit('>'))[-1]
if len(timesplit) == 2:
time_options = timesplit[0].replace('<','').replace(',',' ').replace(':',' ').replace('/',' ').split()
if len(time_options)==1:
start = time_options[0]
end = time_options[0]
elif len(time_options)==2:
start,end = time_options
else:
raise Exception(f'To many times \nOffending:"{whole_line}"')
if len(l.strip())==0: # if wo only want to set time
continue
# breakpoint()
arg1=df.index[0] if start == '-0' else start
arg2=df.index[-1] if end == '-1' else end
# print(f'{start=},{end=}')
# print(f'line:{l}:{time_options=} :{arg1=} {arg2=}')
istart, iend = df.index.slice_locs(arg1, arg2)
current = df.index[istart:iend]
# print(f'{current=}')
time1,time2 = current[0],current[-1]
varstring,opstring,valuestring =re.split(operatorpat,l)
value=valuestring.split()
op= opstring.strip()
varlist = varstring.split()
if not len(varlist):
raise Exception(f'No variables to update \nOffending:"{whole_line}"')
for varname0 in varlist:
varname = varname0.strip()
if not len(varname):
raise Exception(f'No variable name \nOffending:"{whole_line}"')
update_growth = False
if (keep_growth or '--KEEP_GROWTH' in options or '--KG' in options) and \
not ('--NO_KEEP_GROWTH' in options or '--NKG' in options):
if varname not in df.columns:
raise Exception(f'Can not keep growth for created variable\nOffending:"{whole_line}"')
resttime = df.index[iend:]
if len(resttime):
update_growth = True
growth_rate = (df.loc[:,varname].pct_change())[resttime].to_list()
multiplier = list(accumulate([(1+i) for i in growth_rate],operator.mul))
# print(varname,op,value,arg,sep='|')
update_var(df, varname.upper(), op, value,time1,time2 ,
create=create, lprint=lprint,scale = scale)
if update_growth:
lastvalue = df.loc[time2,varname]
# assert 1==2
df.loc[resttime,varname]= [lastvalue * float(m) for m in multiplier]
# breakpoint()
return df
[docs]
def insertModelVar(self, dataframe, addmodel=[]):
"""Inserts all variables from this model, not already in the dataframe.
If model is specified, the dataframw will contain all variables from this and model models.
also located at the module level for backward compability
"""
if isinstance(addmodel, list):
moremodels = addmodel
else:
moremodels = [addmodel]
imodel = [self]+moremodels
myList = []
for item in imodel:
myList.extend(item.allvar.keys())
manglervars = list(set(myList)-set(dataframe.columns))
if len(manglervars):
extradf = pd.DataFrame(
0.0, index=dataframe.index, columns=manglervars).astype('float64')
data = pd.concat([dataframe, extradf], axis=1)
return data
else:
return dataframe
[docs]
@staticmethod
def in_notebook():
try:
from IPython import get_ipython
if 'IPythonKernel' not in str(get_ipython().kernel): # pragma: no cover
return False
except ImportError:
return False
return True
[docs]
@staticmethod
def is_running_in_jupyter():
try:
# Check if the 'get_ipython' function is available
shell = get_ipython().__class__.__name__
if shell == 'ZMQInteractiveShell':
# Running in Jupyter Notebook or JupyterLab
return True
except NameError:
# 'get_ipython' function not found, not running in Jupyter Notebook
pass
return False
[docs]
@staticmethod
class defsub(dict):
'''A subclass of dict.
if a *defsub* is indexed by a nonexisting keyword it just return the keyword '''
def __missing__(self, key):
return key
[docs]
def test_model(self, base_input, start=None, end=None, maxvar=1_000_000, maxerr=100, tol=0.0001, showall=False, dec=8, width=30, ref_df=None):
'''
Compares a straight calculation with the input dataframe.
shows which variables dont have the same value
Very useful when implementing a model where the results are known
Args:
df (DataFrame): dataframe to run.
start (index, optional): start period. Defaults to None.
end (index, optional): end period. Defaults to None.
maxvar (int, optional): how many variables are to be chekked. Defaults to 1_000_000.
maxerr (int, optional): how many errors to check Defaults to 100.
tol (float, optional): check for absolute value of difference. Defaults to 0.0001.
showall (bolean, optional): show more . Defaults to False.
ref_df (DataFrame, optional): this dataframe is used for reference, used if add_factors has been calculated
Returns:
None.
'''
_start = start if start else self.current_per[0]
_end = end if end else self.current_per[-1]
self.basedf = ref_df if type(ref_df) == type(
pd.DataFrame) else base_input
resresult = self(base_input, _start, _end,
reset_options=True, solver='base_res')
pd.options.display.float_format = f'{{:.{dec}f}}'.format
err = 0
print(f'\nChekking residuals for {self.name} {_start} to {_end}')
for i, v in enumerate(self.solveorder):
if i > maxvar:
break
if err > maxerr:
break
check = self.get_values(v, pct=True).T
check.columns = ['Before check',
'After calculation', 'Difference', 'Pct']
# breakpoint()
if (check.Difference.abs() >= tol).any():
err = err+1
maxdiff = check.Difference.abs().max()
maxpct = check.Pct.abs().max()
# breakpoint()
print(f"{v}, Max difference:{maxdiff:{width}.{dec}f} Max Pct {maxpct:{width}.{dec}f}% It is number {i} in the solveorder and error number {err}")
if showall:
print(f'\n{self.allvar[v]["frml"]}')
print(f'{check}')
print(f'\nValues: \n {self.get_eq_values(v,showvar=1)} \n')
self.oldkwargs = {}
@property
def print_model(self):
print(self.equations)
@property
def print_model_latex(self):
mj.get_frml_latex(self)
# @staticmethod
# def get_a_repo_yaml(owner: str = "IbHansen",
# repo_name: str = "modelflow-manual",
# branch: str = 'main',
# file: str = r'papers/mfbook/repo_def.yaml',
# **kwargs):
# """
# Retrieve and load a YAML file from a specified GitHub repository.
# Parameters:
# - owner (str): The owner of the GitHub repository. Default is "IbHansen".
# - repo_name (str): The name of the GitHub repository. Default is "modelflow-manual".
# - branch (str): The branch where the file is located. Default is "main".
# - file (str): The path to the file within the GitHub repository. Default is "papers/mfbook/repo_def.yaml".
# Returns:
# - dict or list: A dictionary or list containing the loaded YAML data.
# Exceptions:
# - Raises an exception if there is an error retrieving or loading the file,
# with the exception message printed to the console.
# """
# from urllib.request import urlopen
# import yaml
# url = Path(rf"https://raw.githubusercontent.com/{owner}/{repo_name}/{branch}/{file}").as_posix().replace('https:/','https://')
# # print(f'Open file from URL: {url}')
# try:
# with urlopen(url) as f:
# res = yaml.safe_load(f)
# except Exception as e:
# print(f'{e}')
# return res
[docs]
@staticmethod
def get_a_repo_yaml(owner: str = "IbHansen",
repo_name: str = "modelflow-manual",
branch: str = 'main',
file: str = r'papers/mfbook/repo_def.yaml',
**kwargs):
"""
Retrieve and load a YAML file from a specified GitHub repository.
args:
owner (str): The owner of the GitHub repository. Default is "IbHansen".
repo_name (str): The name of the GitHub repository. Default is "modelflow-manual".
branch (str): The branch where the file is located. Default is "main".
file (str): The path to the file within the GitHub repository. Default is "papers/mfbook/repo_def.yaml".
Returns:
dict or list: A dictionary or list containing the loaded YAML data.
Exceptions:
Raises an exception if there is an error retrieving or loading the file,
with the exception message printed to the console.
"""
from urllib.request import urlopen
import yaml
url = Path(rf"https://raw.githubusercontent.com/{owner}/{repo_name}/{branch}/{file}").as_posix().replace('https:/','https://')
# print(f'Open file from URL: {url}')
try:
with urlopen(url) as f:
res = yaml.safe_load(f)
except Exception as e:
print(f'{e}')
return res
[docs]
@staticmethod
def download_github_repo_old(owner: str = "IbHansen",
repo_name: str = 'wb-repos',
branch: str = 'main',
destination = '.',
go = True,
silent=False,
reset = False ,
**kwargs,
):
"""
Download an entire GitHub repository and extract it to a specified location.
args:
owner: The owner of the GitHub repository.
repo_name: The name of the repository.
branch: The branch to download.
destination: The local path where the repository should be extracted.
go: display toc of notebooks
silent: keep silent
description:optional description which will be used as folder name
Returns:
A message indicating whether the download was successful or not.
"""
# Construct the URL to download the zip file of the entire repository
import shutil
url = f'https://github.com/{owner}/{repo_name}/archive/refs/heads/{branch}.zip'
try:
extract_to = Path(destination) # Using pathlib.Path here
extract_to.mkdir(parents=True, exist_ok=True) # Create the directory if it doesn't exist
except Exception as e:
return f"An error occurred creating {destination}: {e}"
try:
# breakpoint()
# urllib.request.urlopen(urlfile) as f
with urlopen(url) as f:
with zipfile.ZipFile(BytesIO(f.read())) as z:
# Extract all contents of the zip file to the specified location
location = extract_to / f'{repo_name}-{branch}'
new_location = extract_to / f'{kwargs.get("description",repo_name)}'
if reset:
if new_location.exists() and new_location.is_dir():
shutil.rmtree(new_location)
print( f'"{new_location}" is reset, that is deletet ' )
if new_location.exists():
if go:
print(f"'{new_location}' already exist. Can't download to the same location twice. ")
print("Consider reset=1. ")
print(f"** Warning reset=1 deletes all file in '{new_location}' ")
model.display_toc('**Avaiable notebooks**',folder=new_location)
return
else:
raise Exception( f"Can't download to the same location twice, consider reset=1 when retrying :\n{new_location}\n")
z.extractall(extract_to)
try:
location.rename(new_location)
except:
print(f"Can't rename {location} to '{new_location}' you are probably using the folder just now")
if not silent:
print(f'Repo downloaded to "{new_location}"' )
if go:
model.display_toc('**Avaiable notebooks**',folder=new_location)
return
except Exception as e:
raise Exception( f'No download of {kwargs.get("description",repo_name)}:\n{e} ')
return f"An error occurred: {e}"
[docs]
@staticmethod
def download_github_repo(owner: str = "IbHansen",
repo_name: str = 'wb-repos',
branch: str = 'main',
destination = './wb-repos',
go = True,
colab=False,
silent=True,
replace = False,
):
"""
Download an entire GitHub repository and extract it to a specified location.
Parameters:
owner: The owner of the GitHub repository.
repo_name: The name of the repository.
branch: The branch to download.
destination: The local path where the repository should be extracted.
go: display toc of notebooks
silent: keep silent
replace: if True replace existing files with the files from repo
description:optional description which will be used as folder name
Returns:
A message indicating whether the download was successful or not.
"""
# Construct the URL to download the zip file of the entire repository
import shutil
import tempfile
def copy_new_files_only(src, dst):
"""
Copy files from src to dst if they do not already exist in dst.
src and dst are Path objects or strings representing the source and destination directories.
"""
src_path = Path(src)
dst_path = Path(dst)
for item in src_path.glob('**/*'): # **/* means all files and directories recursively
if item.is_file(): # Only proceed if it's a file
relative_path = item.relative_to(src_path)
relative_path_parts_except_first = relative_path.parts[1:]
relative_path_except_first = Path(*relative_path_parts_except_first)
destination_file = dst_path / relative_path_except_first
# print(' ')
# print(f"{item=} to \n{destination_file=}")
# print(f"{relative_path=}\n{relative_path_except_first=}")
if not destination_file.exists():
destination_file.parent.mkdir(parents=True, exist_ok=True) # Ensure the destination directory exists
shutil.copy2(item, destination_file)
if not silent: print(f"Copied : {relative_path_except_first}")
else:
if replace:
shutil.copy2(item, destination_file)
if not silent: print(f"Copied file overwritten: {relative_path_except_first}")
else:
if not silent: print(f"Already exists, skipped: {relative_path_except_first}")
url = f'https://github.com/{owner}/{repo_name}/archive/refs/heads/{branch}.zip'
try:
extract_to =Path(tempfile.mkdtemp()) # Using pathlib.Path here
if not silent: print(f'Temporary directory created at: {extract_to}')
except Exception as e:
return f"An error occurred creating temporary download location: {e}"
try:
with urlopen(url) as f:
with zipfile.ZipFile(BytesIO(f.read())) as z:
# Extract all contents of the zip file to the specified location
z.extractall(extract_to)
if not silent: print(f'Repo downloaded to "{extract_to}"' )
except Exception as e:
raise Exception( f'No download of {kwargs.get("description",repo_name)}:\n{e} ')
return f"An error occurred: {e}"
new_location =Path(destination)
# if reset:
# if new_location.exists() and new_location.is_dir():
# shutil.rmtree(new_location)
# print( f'"{new_location}" is reset, that is deletet ' )
copy_new_files_only(extract_to,new_location)
shutil.rmtree(extract_to)
if go:
if colab:
model.display_toc_github('**Avaiable notebooks github**',folder=new_location,
colab=colab,
owner = owner,
repo_name = repo_name,
branch = branch,
)
else:
model.display_toc_github('**Avaiable notebooks**',folder=new_location,colab=False)
return
[docs]
@staticmethod
def display_toc_github(text='**Jupyter notebooks**',folder='.',all=False,nocwd=False,
colab=True,
owner: str = "IbHansen",
repo_name: str = 'wb-repos',
branch: str = 'main',
destination = './wb-repos',
):
'''In a jupyter notebook this function displays a clickable table of content of all
jupyter notebooks in this and sub folders'''
from IPython.display import display, Markdown, HTML
from pathlib import Path
display(Markdown(text))
for dir in sorted(Path(folder).glob('**')):
# print(f'{dir=} {nocwd=}')
if len(dir.parts) and str(dir.parts[-1]).startswith('.'):
continue
if dir == Path('.') and nocwd :
continue
filelist = (list(dir.glob('*readme.ipynb'))
+ [f for f in sorted(dir.glob('*.ipynb'))
if not f.stem.endswith('readme')])
for i, notebook in enumerate(filelist):
# print(notebook)
if (not all) and (notebook.name.startswith('test') or notebook.name.startswith('Overview')):
continue
if i == 0:
blanks = ''.join(
[' ']*len(dir.parts))
if len(dir.parts):
thisdir = Path(*dir.parts[1:])
display(HTML(f'{blanks}<b>{str(thisdir)}</b>'))
else:
display(
HTML(f'{blanks}<b>{str(Path.cwd().parts[-1])} (.)</b>'))
name = notebook.name.split('.')[0]
# print(notebook)
notebookshort = Path(*notebook.parts[1:])
# print(notebookshort.as_posix())
if colab:
githubname = f'https://colab.research.google.com/github/{owner}/{repo_name}/blob/{branch}/{notebookshort.as_posix()}'
# print(githubname)
display(HTML(
f' {blanks} <a href="{githubname}" target="_blank">{name}</a>'))
else:
display(HTML(
f' {blanks} <a href="{notebook}" target="_blank">{name}</a>'))
[docs]
@staticmethod
def Worldbank_Models(owner: str = "worldbank",
repo_name: str = 'MFMod-ModelFlow',
branch: str = 'main',
destination = './WorldbankModels',
go = True,
silent=True,
replace = False ,
):
"""
Download an entire GitHub repository and extract it to a specified location.
Parameters:
owner: The owner of the GitHub repository.
repo_name: The name of the repository.
branch: The branch to download.
destination: The local path where the repository should be extracted.
go: display toc of notebooks
silent: keep silent
Returns:
A message indicating whether the download was successful or not.
"""
return model.download_github_repo(owner = owner,
repo_name = repo_name,
branch = branch,
destination = destination ,
go = go ,
silent=silent,
replace = replace
)
[docs]
class Dekomp_Mixin():
'''This class defines methods and properties related to equation attribution analyses (dekomp)
'''
[docs]
@lru_cache(maxsize=2048)
def dekomp(self, varnavn, start='', end='', basedf=None, altdf=None, lprint=True,time_att=False):
'''Print all variables that determines input variable (varnavn)
optional -- enter period and databank to get var values for chosen period
Optionally, enter the period and databank to get variable values for the chosen period.
Parameters
----------
varnavn : str
Input variable name.
start : str, optional
Start period for retrieving variable values (default is '').
end : str, optional
End period for retrieving variable values (default is '').
basedf : pd.DataFrame, optional
Base dataframe to use (default is None).
altdf : pd.DataFrame, optional
Alternative dataframe to use (default is None).
lprint : bool, optional
Flag to print the results (default is True).
time_att : bool, optional
Flag to do a timewise attribute (default is False).
Returns
-------
namedtuple
A named tuple containing the following decomposition results:
- diff_level : pd.DataFrame
DataFrame with level differences.
- att_level : pd.DataFrame
DataFrame with attributions to the level difference.
- att_pct : pd.DataFrame
DataFrame with the share of attributions to the difference in level.
- diff_growth : pd.DataFrame
DataFrame with differences in growth rate.
- att_growth : pd.DataFrame
DataFrame with attributions to the difference in growth rate.
'''
basedf_ = basedf if isinstance(basedf, pd.DataFrame) else self.basedf
altdf_ = altdf if isinstance(altdf, pd.DataFrame) else self.lastdf
start_ = start if start != '' else self.current_per[0]
end_ = end if end != '' else self.current_per[-1]
mfrml = model(self.allvar[varnavn]['frml'],
funks=self.funks) # calculate the formular
print_per = mfrml.smpl(start_, end_, altdf_)
vars = mfrml.allvar.keys()
varterms = [(term.var, int(term.lag) if term.lag else 0)
for term in mfrml.allvar[varnavn]['terms'] if term.var and not (term.var == varnavn and term.lag == '')]
# now we have droped dublicate terms and sorted
sterms = sorted(set(varterms), key=lambda x: varterms.index(x))
# find all the eksperiments to be performed
eksperiments = [(vt, t) for vt in sterms for t in print_per]
if time_att:
...
smallalt = altdf_.loc[:, vars].copy(deep=True) # for speed
smallbase = smallalt.shift().copy() # for speed
else:
smallalt = altdf_.loc[:, vars].copy(deep=True) # for speed
smallbase = basedf_.loc[:, vars].copy(deep=True) # for speed
# make a dataframe for each experiment
alldf = {e: smallalt.copy() for e in eksperiments}
for e in eksperiments:
(var_, lag_), per_ = e
set_a_value(alldf[e], per_, var_, lag_,
get_a_value(smallbase, per_, var_, lag_))
# alldf[e].loc[e[1]+e[0][1],e[0][0]] = smallbase.loc[e[1]+e[0][1],e[0][0]] # update the variable in each eksperiment
# to inspect the updates
difdf = {e: smallalt - alldf[e] for e in eksperiments}
# allres = {e : mfrml.xgenr(alldf[e],str(e[1]),str(e[1]),silent= True ) for e in eksperiments} # now evaluate each experiment
# now evaluate each experiment
allres = {e: mfrml.xgenr(
alldf[e], e[1], e[1], silent=True) for e in eksperiments}
# dataframes with the effect of each update
diffres = {e: smallalt - allres[e] for e in eksperiments}
diffres_growth = {e: smallalt.pct_change() - allres[e].pct_change() for e in eksperiments}
# we are only interested in the efect on the left hand variable
res = {e: diffres[e].loc[e[1], varnavn] for e in eksperiments}
res_growth = {e: diffres_growth[e].loc[e[1], varnavn] for e in eksperiments}
# the resulting dataframe
multi = pd.MultiIndex.from_tuples([e[0] for e in eksperiments], names=[
'Variable', 'lag']).drop_duplicates()
att_level = pd.DataFrame(index=multi, columns=print_per)
for e in eksperiments:
att_level.at[e[0], e[1]] = res[e]
att_growth = pd.DataFrame(index=multi, columns=print_per)
for e in eksperiments:
att_growth.at[e[0], e[1]] = res_growth[e]*100.
# a dataframe with some summaries
diff_level = pd.DataFrame(index=multi, columns=print_per)
diff_level.loc[('t-1' if time_att else 'Base', '0'), print_per] = smallbase.loc[print_per, varnavn]
diff_level.loc[('t' if time_att else 'Alternative', '0'),
print_per] = smallalt.loc[print_per, varnavn]
diff_level.loc[('Difference', '0'), print_per] = difendo = smallalt.loc[print_per,
varnavn] - smallbase.loc[print_per, varnavn]
diff_level.loc[('Percent ', '0'), print_per] = 100*(smallalt.loc[print_per,
varnavn] / (0.0000001+smallbase.loc[print_per, varnavn])-1)
diff_level = diff_level.dropna()
# a dataframe with summaries of growth
diff_growth_index = diff_level.index[:-1]
diff_growth = pd.DataFrame(index=diff_growth_index, columns=print_per)
diff_growth.loc[diff_growth.index[0],print_per] = smallbase.loc[:,varnavn].pct_change().loc[print_per] * 100.
diff_growth.loc[diff_growth.index[1],print_per] = smallalt.loc[:,varnavn].pct_change().loc[print_per] * 100.
diff_growth.loc[diff_growth.index[2],print_per] = (diff_growth.loc[diff_growth.index[1],print_per]-
diff_growth.loc[diff_growth.index[0],print_per])
growth_residual = att_growth.sum() - diff_growth.loc[diff_growth.index[2],print_per]
att_growth.loc[('Total', 0), print_per] = att_growth.sum()
att_growth.loc[('Residual', 0), print_per] = growth_residual
#
att_pct = (att_level / (difendo[print_per]) *(abs(difendo[print_per])>0.000001) * 100).sort_values(
print_per[-1], ascending=False) # each contrinution in pct of total change
# breakpoint()
residual = att_pct.sum() - 100
att_pct.loc[('Total', 0), print_per] = att_pct.sum()
att_pct.loc[('Residual', 0), print_per] = residual
if lprint:
print('\nFormula :', mfrml.allvar[varnavn]['frml'], '\n')
print(diff_level.to_string(
float_format=lambda x: '{0:10.2f}'.format(x)))
print('\n Contributions to differende for ', varnavn)
print(att_level.to_string(
float_format=lambda x: '{0:10.2f}'.format(x)))
print('\n Share of contributions to differende for ', varnavn)
print(att_pct.to_string(
float_format=lambda x: '{0:10.0f}%'.format(x)))
print('\n Difference in growth rate', varnavn)
print(print(diff_growth.to_string(
float_format=lambda x: '{0:10.1f}%'.format(x))))
print('\n Contribution to growth rate', varnavn)
print(att_growth.to_string(
float_format=lambda x: '{0:10.1f}%'.format(x)))
att_pct = att_pct[att_pct.columns].astype(float)
dekomp_res_name = namedtuple('dekompres', 'diff_level, att_level, att_pct, ,diff_growth , att_growth')
dekomp_res = dekomp_res_name(diff_level, att_level, att_pct, diff_growth, att_growth )
return dekomp_res
[docs]
def impact(self, var, ldekomp=False, leq=False, adverse=None, base=None, maxlevel=3, start='', end=''):
for v in self.treewalk(self.endograph, var.upper(), parent='start', lpre=True, maxlevel=maxlevel):
if v.lev <= maxlevel:
if leq:
print('---'*v.lev+self.allvar[v.child]['frml'])
self.print_eq(v.child.upper(), data=self.lastdf,
start=start, end=end)
else:
print('---'*v.lev+v.child)
if ldekomp:
x = self.dekomp(v.child, lprint=1, start=start, end=end)
[docs]
def dekomp_plot_per(self, varnavn, sort=False, pct=True, per='', threshold=0.0
,rename=True
,nametrans = lambda varnames,thismodel : varnames
,time_att=False,ysize=7):
'''
Returns a waterfall diagram with attribution for a variable in one time frame
Parameters
----------
varnavn : TYPE
variable name.
sort : TYPE, optional
. The default is False.
pct : TYPE, optional
display pct contribution . The default is True.
per : TYPE, optional
DESCRIPTION. The default is ''.
threshold : TYPE, optional
cutoff. The default is 0.0.
rename : TYPE, optional
Use descriptions instead of variable names. The default is True.
time_att : TYPE, optional
Do time attribution . The default is False.
Returns
-------
a matplotlib figure instance .
'''
thisper = self.current_per[-1] if per == '' else per
xx = self.dekomp(varnavn.upper(), lprint=False,time_att=time_att)
ddf = join_name_lag(xx[2] if pct else xx[1])
if rename:
ddf = ddf.rename(index= self.var_description)
else:
ddf.index = nametrans(ddf.index,self)
# tempdf = pd.DataFrame(0,columns=ddf.columns,index=['Start']).append(ddf)
tempdf = ddf
per_loc = tempdf.columns.get_loc(per)
nthreshold = '' if threshold == 0.0 else f', threshold = {threshold}'
ntitle = f'Decomposition in {per}, pct{nthreshold}:' if pct else f'Decomposition in {per}, {nthreshold}'
plotdf = tempdf.loc[[c for c in tempdf.index.tolist(
) if c.strip() != 'Total'], :].iloc[:, [per_loc]]
plotdf.columns = [self.var_description.get(varnavn.upper(),varnavn.upper())] if rename else [varnavn.upper()]
# waterdf = self.cutout(plotdf,threshold
waterdf = plotdf
res = mv.waterplot(waterdf, autosum=1, allsort=sort, top=0.86,
sort=sort, title=ntitle, bartype='bar', threshold=threshold,ysize=ysize)
return res
[docs]
def get_att_diff(self,n,type='pct', start='', end='',time_att=False,**kwargs):
tstart = self.current_per[0] if start == '' else start
tend = self.current_per[-1] if end == '' else end
res = self.dekomp(n, lprint=0, start=tstart, end=tend, time_att=time_att)
if type in { 'level' , 'pct'}:
diffdf = getattr(res, f'diff_level').groupby(level=[0]).sum()
else:
diffdf = getattr(res, f'diff_{type}').groupby(level=[0]).sum()
return diffdf.astype('float')
[docs]
def get_att(self, n, type='pct', filter=False, lag=True, start='', end='',
time_att=False, threshold=0.0):
'''
Calculate the attribution percentage for a variable.
Parameters:
n (str): Name of the variable to calculate attribution for.
type (str): Type of attribution calculation. Options: 'pct' (percentage), 'level', 'growth'. Default: 'pct'.
filter (bool): [Deprecated] Use threshold instead of filter. Default: False.
lag (bool): Flag to indicate whether to include lag information in the output. Default: True.
start (str): Start period for calculation. If not provided, uses the first period in the model instance. Default: ''.
end (str): End period for calculation. If not provided, uses the last period in the model instance. Default: ''.
time_att (bool): Flag to indicate time attribute calculation. Default: False.
threshold (float): Threshold value for excluding rows with values close to zero. Default: 0.0.
Returns:
pandas.DataFrame: DataFrame containing the calculated attribution results.
Raises:
Exception: If an invalid type is provided.
'''
legal_types = {'pct', 'level', 'growth'}
if type not in legal_types:
raise Exception(f'Invalid type provided. Type must be one of {legal_types}, not {type}')
if filter:
raise Exception("The 'filter' parameter is deprecated. Use 'threshold' instead.")
tstart = self.current_per[0] if start == '' else start
tend = self.current_per[-1] if end == '' else end
res = self.dekomp(n, lprint=0, start=tstart, end=tend, time_att=time_att)
if type == 'level':
res_relevant = getattr(res, f'att_{type}')
else:
res_relevant = getattr(res, f'att_{type}').iloc[:-2, :]
if lag:
out = pd.DataFrame(res_relevant.values, columns=res_relevant.columns,
index=[r[0] + (f'({str(r[1])})' if r[1] else '') for r in res_relevant.index])
else:
out = res_relevant.groupby(level=[0]).sum()
out = cutout(out, threshold)
return out.astype('float')
[docs]
def get_att_pct(self, n, filter=False, lag=True, start='', end='',time_att=False,threshold=0.0):
''' det attribution pct for a variable.
I little effort to change from multiindex to single node name'''
tstart = self.current_per[0] if start =='' else start
tend = self.current_per[-1] if end =='' else end
res = self.dekomp(n, lprint=0, start=tstart, end=tend,time_att=time_att)
res_pct = res[2].iloc[:-2, :]
if lag:
out_pct = pd.DataFrame(res_pct.values, columns=res_pct.columns,
index=[r[0]+(f'({str(r[1])})' if r[1] else '') for r in res_pct.index])
else:
out_pct = res_pct.groupby(level=[0]).sum()
out = out_pct.loc[(out_pct != 0.0).any(
axis=1), :] if filter else out_pct
out=cutout(out_pct,threshold)
return out
[docs]
def get_att_pct_to_from(self,to_var,from_var,lag=False,time_att=False):
'''Get the attribution for a singel variable'''
outdf = self.get_att_pct(to_var.upper(),lag=lag,filter=False,time_att=time_att)
# print(f'{to_var=} {from_var=} {outdf.loc[from_var.upper(),:].abs().max()}')
return outdf.loc[from_var.upper(),:]
[docs]
def get_att_level(self, n, filter=False, lag=True, start='', end='',time_att=False):
''' det attribution pct for a variable.
I little effort to change from multiindex to single node name'''
tstart = self.current_per[0] if start =='' else start
tend = self.current_per[-1] if end =='' else end
res = self.dekomp(n, lprint=0, start=tstart, end=tend,time_att=time_att)
res_level = res[1].iloc[:, :]
if lag:
out_pct = pd.DataFrame(res_level.values, columns=res_level.columns,
index=[r[0]+(f'({str(r[1])})' if r[1] else '') for r in res_level.index])
else:
out_pct = res_level.groupby(level=[0]).sum()
out = out_pct.loc[(out_pct != 0.0).any(
axis=1), :] if filter else out_pct
return out
[docs]
def dekomp_plot(self, varnavn, sort=True, pct=True, per='', top=0.9, threshold=0.0,lag=True
,rename=True
,nametrans = lambda varnames,thismodel : varnames
,time_att=False):
'''
Returns a chart with attribution for a variable over the smpl
Parameters
----------
varnavn : TYPE
variable name.
sort : TYPE, optional
. The default is False.
pct : TYPE, optional
display pct contribution . The default is True.
per : TYPE, optional
DESCRIPTION. The default is ''.
threshold : TYPE, optional
cutoff. The default is 0.0.
rename : TYPE, optional
Use descriptions instead of variable names. The default is True.
time_att : TYPE, optional
Do time attribution . The default is False.
lag : TYPE, optional
separete by lags The default is True.
top : TYPE, optional
where to place the title
Returns
-------
a matplotlib figure instance .
'''
# xx = self.dekomp(varnavn,self.current_per[0],self.current_per[-1],lprint=False)
# # breakpoint()
# ddf0 = join_name_lag(xx[2] if pct else xx[1]).pipe(
# lambda df: df.loc[[i for i in df.index if i != 'Total'], :])
ddf0 = self.get_att_pct(varnavn,lag=lag,time_att=time_att) if pct else self.get_att_level(varnavn,lag=lag)
if rename:
ddf0 = ddf0.rename(index= self.var_description)
else:
ddf0.index = nametrans(ddf0.index,self)
ddf = cutout(ddf0, threshold)
fig, axis = plt.subplots(nrows=1, ncols=1, figsize=(
10, 5), constrained_layout=False)
ax = axis
ddf.T.plot(ax=ax, stacked=True, kind='bar')
ax.set_ylabel(self.var_description[varnavn], fontsize='x-large')
ax.set_xticklabels(ddf.T.index.tolist(),
rotation=45, fontsize='x-large')
ax.xaxis.set_major_locator(plt.MaxNLocator(10))
nthreshold = '' if threshold == 0.0 else f', threshold = {threshold}'
ntitle = f'Decomposition{nthreshold}'
fig.suptitle(ntitle, fontsize=20)
# fig.subplots_adjust(top=top)
return fig
[docs]
def get_dekom_gui(self, var=''):
'''
Interactive wrapper around :any:`dekomp_plot` and :any:`dekomp_plot_per`
Args:
var (TYPE, optional): start variable . Defaults to ''.
Returns:
show (TYPE): dict of matplotlib figs .
'''
def show_dekom(Variable, Pct, Periode, Threshold=0.0):
print(self.allvar[Variable]['frml'].replace(' ', ' '))
self.dekomp_plot(Variable, pct=Pct, threshold=Threshold)
self.dekomp_plot_per(
Variable, pct=Pct, threshold=Threshold, per=Periode, sort=True)
xvar = var.upper() if var.upper() in self.endogene else sorted(
list(self.endogene))[0]
show = ip.interactive(show_dekom,
Variable=ip.Dropdown(options=sorted(
self.endogene), value=xvar),
Pct=ip.Checkbox(
description='Percent growth', value=False),
Periode=ip.Dropdown(options=self.current_per),
Threshold=(0.0, 10.0, 1.))
return show
[docs]
def totexplain(self, pat='*', vtype='all', stacked=True, kind='bar', per='', top=0.9, title='', use='level', threshold=0.0):
'''
makes a total explanation for the variables defined by pat
Args:
pat (TYPE, optional): DESCRIPTION. Defaults to '*'.
vtype (TYPE, optional): DESCRIPTION. Defaults to 'all'.
stacked (TYPE, optional): DESCRIPTION. Defaults to True.
kind (TYPE, optional): DESCRIPTION. Defaults to 'bar'.
per (TYPE, optional): DESCRIPTION. Defaults to ''.
top (TYPE, optional): DESCRIPTION. Defaults to 0.9.
title (TYPE, optional): DESCRIPTION. Defaults to ''.
use (TYPE, optional): DESCRIPTION. Defaults to 'level'.
threshold (TYPE, optional): DESCRIPTION. Defaults to 0.0.
Returns:
fig (TYPE): DESCRIPTION.
'''
if not hasattr(self, 'totdekomp'):
from modeldekom import totdif
self.totdekomp = totdif(self, summaryvar='*', desdic={})
fig = self.totdekomp.totexplain(pat=pat, vtype=vtype, stacked=stacked, kind=kind,
per=per, top=top, title=title, use=use, threshold=threshold)
return fig
[docs]
def totdif(self,summaryvar='*',desdic=None,experiments = None):
self.totdekomp = totdif(model=self, summaryvar=summaryvar,
desdic=desdic if desdic != None else self.var_description,
experiments=experiments)
return self.totdekomp
[docs]
def get_att_gui(self, var='FY', spat='*', desdic={}, use='level',ysize=7):
'''Creates a jupyter ipywidget to display model level
attributions '''
if not hasattr(self, 'totdekomp'):
self.totdekomp = totdif(model=self, summaryvar=spat, desdic=self.var_description)
print('TOTDEKOMP made')
if self.totdekomp.go:
xvar = var if var in self.endogene else sorted(list(self.endogene))[0]
xx = mj.get_att_gui(self.totdekomp, var=xvar,
spat=spat, desdic=desdic, use=use,ysize=ysize)
display(xx)
else:
del self.totdekomp
return 'Nothing to attribute'
[docs]
class Description_Mixin():
'''This Class defines description related methods and properties
'''
@property
def var_description(self):
'''A dictionary with variable descriptions, if no value matching the key the variable name is returned '''
return self._var_description
@var_description.setter
def var_description(self,a_dict):
allvarset = set(self.allvar.keys())
self._var_description = self.defsub({k:v for k,v in a_dict.items() if k in allvarset})
@property
def var_description_reverse(self):
return {v.upper() : k for k,v in self.var_description.items()}
[docs]
def set_var_description(self, a_dict):
''' sets var_description **Obsolete** now just use = to set a new var_description'''
self.var_description = a_dict
[docs]
def var_description_add (self, a_dict):
''' adds a dict with var_descriptions to the var_description, new will overwrite old values'''
self.var_description = {**self._var_description, **a_dict}
[docs]
def set_var_description_old (self, a_dict):
allvarset = set(self.allvar.keys())
self.var_description = self.defsub({k:v for k,v in a_dict.items() if k in allvarset})
[docs]
@staticmethod
def html_replace(ind):
'''Replace special characters in html '''
out = (ind.replace('&','&').replace('<', '<').replace('>', '>').replace('\n', ' ')
.replace('æ', 'æ').replace('ø', 'ø').replace('å', 'å')
.replace('Æ', 'Æ').replace('Ø', 'Ø').replace('Å', 'Å')
.replace('$','$')
)
return out
[docs]
def var_des(self, var):
'''Returns blank if no description'''
# breakpoint()
des = self.var_description[var.split('(')[0]]
return des if des != var else ''
[docs]
def get_eq_des(self, var, show_all=False):
'''Returns a string of descriptions for all variables in an equation:'''
if var in self.endogene:
rhs_var = sorted([start for start, this in self.totgraph_nolag.in_edges(
var) if self.var_des(start) or show_all])
all_var = [var] + rhs_var
maxlen = max(len(v) for v in all_var) if len(all_var) else 10
return '\n'.join(f'{rhs:{maxlen}}: {self.var_des(rhs)}' for rhs in all_var)
else:
return ''
[docs]
def get_des_html(self, var, show=1):
if show:
out = f'{var}:{self.var_des(var)}'
# breakpoint()
return f'{self.html_replace(out)}'
else:
return f'{var}'
[docs]
@staticmethod
def read_wb_xml_var_des(filename):
'''Read a xml file with variable description world bank style '''
import xml
with open( filename,'rt') as f:
xmltext = f.read()
root = xml.etree.ElementTree.fromstring(xmltext)
variables = root.findall('variable')
vardes= { v.find('Mnem').text : {c.tag: c.text for c in v.find('Languages')} for v in variables if type(des:= v.find('Languages')) != type(None) }
description_dic = {k : vardes[k] for k in sorted(vardes.keys()) }
return description_dic
[docs]
@staticmethod
def languages_wb_xml_var_des(filename):
'''Find languages in a xml file with variable description world bank style'''
description_dic = mmodel.read_wb_xml_var_des(filename)
languages = {l for var,descriptions in description_dic.items() for l in descriptions}
return languages
[docs]
def set_wb_xml_var_description(self,filename,language='English'):
''' set variable descriptions from a xml file with variable description world bank style'''
description_dic = mmodel.read_wb_xml_var_des(filename)
this_description = {var : descriptions['English'] for var,descriptions in description_dic.items() if language in descriptions.keys()}
enriched_var_description = self.enrich_var_description(this_description)
self.set_var_description(enriched_var_description)
[docs]
def enrich_var_description(self,var_description):
'''Takes a dict of variable descriptions and enhance it for the standard suffixes for generated variables'''
add_d = { newname : 'Add factor:'+ var_description.get(v,v) for v in self.endogene if (newname := v+'_A') in self.exogene }
dummy_d = { newname : 'Fix dummy:'+ var_description.get(v,v) for v in self.endogene if (newname := v+'_D') in self.exogene }
fix_d = { newname : 'Fix value:'+ var_description.get(v,v) for v in self.endogene if (newname := v+'_X') in self.exogene }
fitted_d = { newname : 'Fitted value:'+ var_description.get(v,v) for v in self.endogene if (newname := v+'_FITTED') in self.endogene }
# breakpoint()
var_description_new = {**var_description,**add_d,**dummy_d,**fix_d,**fitted_d}
return var_description_new
[docs]
def deslist(self, pat):
'''
Returns a list of variable where the description in the model matching the pattern, the pattern can be a list of patterns
Args:
pat (string or list of strings): One or more pattern seperated by space wildcards * and ?, special pattern: #ENDO
Returns:
out (list): list of variable names where the description matching the pat.
'''
''''''
# breakpoint()
if isinstance(pat, list):
upat = pat
else:
upat = [pat]
ipat = upat
out = [v for p in ipat for up in p.split('|') for v in sorted(
[self.var_description_reverse[v] for v in
fnmatch.filter(self.var_description_reverse.keys(),up.upper())])]
return out
[docs]
class Modify_Mixin():
'''Class to modify a model with new equations, (later alse delete, and new normalization)'''
def __add__(self,another_model):
if len(endooverlab:=self.endogene & another_model.endogene):
print(f'No overlab in endogenous allowed. Fix it.\nOffending variables: \n{endooverlab}')
raise Exception('When adding two models, no overlab in endogenous allowed ')
mnew = self.__class__(self.equations+another_model.equations,
modelname=f'Combined {self.name} and {another_model.name}',
var_description = {**self.var_description , **another_model.var_description},
var_groups = {**self.var_groups , **another_model.var_groups },
reports = self.reports | another_model.reports ,
equations_latex = self.equations_latex+ another_model.equations_latex ,
eviews_dict = {**self.eviews_dict,**another_model.eviews_dict},
model_description = f'''This model was created by combining the two models:{self.name} and {another_model.name}\n
{self.name} description:\n{self.model_description}\n
{another_model.name} description:\n{another_model.model_description}''')
mnew.lastdf = pd.concat([self.lastdf,another_model.lastdf],axis=1).fillna(0.0 )
mnew.current_per = self.current_per
return mnew
[docs]
def copy_properties(self,mmodel):
''' Transfer useful propoerties from self to a new model'''
[docs]
def eqflip(self,flip=None,calc_add=True,newname='',sep='\n'):
'''
Parameters
----------
newnormalisation : TYPE, optional
Not implementet yet . The default is None.
newfunks : TYPE, optional
Additional userspecified functions. The default is [].
calc_add : bool, optional
Additional userspecified functions. The default is [].
Returns
-------
newmodel : TYPE
The new model with the new and deleted equations .
newdf : TYPE
a dataframe with calculated add factors. Origin is the original models lastdf.
'''
newmodelname = newname if newname else self.name+' flipped'
updatefunks=self.funks #
# breakpoint()
lines = flip.strip().split(sep)
newnormalisation = [l.split() for l in lines]
vars_before_normalization = {beforeendo.upper() for beforeendo,afterendo in newnormalisation }
frml_normalize_strip = {(beforeendo.upper(),afterendo.upper()) : self.allvar[beforeendo]['frml'].replace('$','').split(' ',2) for beforeendo,afterendo in newnormalisation }
frml_normalize_gross = {(beforeendo,afterendo) : (fname ,normal(expression,the_endo = afterendo,endo_lhs=False,add_factor=False)) for (beforeendo,afterendo),(frml,fname,expression) in frml_normalize_strip.items() }
frmldict_normalized = {afterendo : f'FRML {fname} {nexpression.normalized}$'
for (beforeendo,afterendo),(fname,nexpression) in frml_normalize_gross.items() }
# breakpoint()
frmldict = {k: v['frml'] for (k,v) in self.allvar.items() if k in self.endogene and not k in vars_before_normalization } # frml's in the existing model
newfrmldict = {**frmldict,**frmldict_normalized} # frml's in the new model
newfrml = '\n'.join([f for f in newfrmldict.values()])
newmodel = self.__class__(newfrml,modelname = f'updated {self.name}',
funks=updatefunks,
var_description=self.var_description,
model_description = self.model_description,
var_groups = self.var_groups ,
reports = self.reports ,
equations_latex='',
)
print(f'\nThe model:"{self.name}" Has endogeneous variable flipped, new model name is:"{newmodelname}"')
for (beforeendo,afterendo),(fname,nexpression) in frml_normalize_gross.items():
print(f'\nEndogeneous flip for {beforeendo} to {afterendo}')
print(f'Old frml :FRML {fname} {nexpression.original}$')
print(f'New frml :FRML {fname} {nexpression.normalized}$')
print()
newdf = self.lastdf.copy()
newmodel.current_per = self.current_per.copy()
return newmodel,newdf
...
[docs]
def eqdelete(self,deleteeq=None,newname=''):
'''
Parameters
----------
deleteeq : TYPE, optional
Variables where equations are to be deleted. The default is None.
Returns
-------
newmodel : TYPE
The new model with the new and deleted equations .
newdf : TYPE
a dataframe with calculated add factors. Origin is the original models lastdf.
'''
vars_todelete = self.vlist(deleteeq)
newmodelname = newname if newname else self.name+' with deleted eq'
newfrmldict = {k: v['frml'] for (k,v) in self.allvar.items() if k in self.endogene and not k in vars_todelete} # frml's in the existing model
neweviewsdict = {k: v for k,v in self.eviews_dict.items() if not k in vars_todelete}
newfrml = '\n'.join([f for f in newfrmldict.values()])
newmodel = self.__class__(newfrml,modelname = f'updated {self.name}'
,funks=self.funks,
var_description=self.var_description,
model_description = self.model_description + f'\nWith deledet variables: {vars_todelete}',
var_groups = self.var_groups ,
reports = self.reports ,
eviews_dict = neweviewsdict ,
equations_latex='',
)
print(f'\nThe model:"{self.name}" Has equations deleted, new model name is:"{newmodelname}"')
print('The following equations are deleted')
for v in vars_todelete:
if v in self.endogene:
print(f'{v} :deleted ')
else:
print(f'{v} : is exogenous and not deleted ')
newdf = self.lastdf.copy()
newmodel.current_per = self.current_per.copy()
newmodel.name = newmodelname
return newmodel,newdf
...
[docs]
def equpdate(self,updateeq=None,newfunks=[],add_add_factor=False,calc_add=True,
do_preprocess=True,newname='',silent=True):
'''
Parameters
----------
updateeq : TYPE
new equations seperated by newline .
newfunks : TYPE, optional
Additional userspecified functions. The default is [].
calc_add : bool, optional
Additional userspecified functions. The default is [].
Returns
-------
newmodel : TYPE
The new model with the new and deleted equations .
newdf : TYPE
a dataframe with calculated add factors. Origin is the original models lastdf.
'''
updatefunks=list(set(self.funks+newfunks) ) #
newmodelname = newname if newname else self.name+' Updated'
dfvars = set(self.lastdf.columns)
updatemodel = mp.tofrml(updateeq) # create the moedl with the usual processing of frml's
while ' ' in updatemodel:
updatemodel = updatemodel.replace(' ', ' ')
frml2 = updatemodel.split('$')
frml2_strip = [mp.split_frml(f+'$') for f in frml2 if len(f)>2]
frml2_normal = [[frml,fname,
normal(expression[:-1],do_preprocess=do_preprocess,add_add_factor=add_add_factor,
make_fixable = pt.kw_frml_name(fname.upper(),'FIXABLE'))]
for allfrml,frml,fname,expression in frml2_strip]
frmldict_update = {nexpression.endo_var: f'{frml} {fname} {nexpression.normalized}$' for
frml,fname,nexpression in frml2_normal}
if add_add_factor:
frmldict_calc_add = {nexpression.endo_var: f'{frml} {fname} {nexpression.calc_add_factor}$' for
frml,fname,nexpression in frml2_normal if nexpression.endo_var in self.endogene|self.exogene|dfvars }
else:
frmldict_calc_add={}
# breakpoint()
frmldict = {k: v['frml'] for (k,v) in self.allvar.items() if k in self.endogene } # frml's in the existing model
newfrmldict = {**frmldict,**frmldict_update} # frml's in the new model
newfrml = '\n'.join([f for f in newfrmldict.values()])
newmodel = self.__class__(newfrml,modelname = f'updated {self.name}',
funks=updatefunks,
var_description=self.var_description,
model_description = self.model_description,
var_groups = self.var_groups ,
reports = self.reports ,
equations_latex='',
)
if len(frmldict_calc_add) and calc_add:
calc_add_frml = '\n'.join([f for f in frmldict_calc_add.values()])
calc_add_model = self.__class__(calc_add_frml,modelname = f'adjustment calculation for updated {self.name}',funks=updatefunks)
var_add = sorted(calc_add_model.endogene)
print(f'\nThe model:"{self.name}" got new equations, new model name is:"{newmodelname}"')
for varname,frml in frmldict_update.items():
print(f'New equation for For {varname}')
print(f'Old frml :{frmldict.get(varname,"new endogeneous variable ")}')
if not varname in self.endogene|self.exogene and varname in dfvars:
print('This new endogeneous variable is no an existing exogeneous, but is in .lastdf')
print(f'New frml :{frml}')
print(f'Adjust calc:{frmldict_calc_add.get(varname,"No frml for adjustment calc ")}')
print()
# breakpoint()
thisdf = newmodel.insertModelVar(self.lastdf)
# breakpoint()
if len(frmldict_calc_add) and calc_add:
calc_add_model.current_per = self.current_per
newdf = calc_add_model(thisdf,silent=silent)
print('\nNew add factors to get the same results for existing variables:')
print(newdf.loc[self.current_per,var_add])
else:
newdf = thisdf
newmodel.current_per = self.current_per.copy()
return newmodel,newdf
...
[docs]
def equpdate_old(self,updateeq=None,newfunks=[],add_adjust=False,calc_add=True,do_preprocess=True,newname=''):
'''
Parameters
----------
updateeq : TYPE
new equations seperated by newline .
newfunks : TYPE, optional
Additional userspecified functions. The default is [].
calc_add : bool, optional
Additional userspecified functions. The default is [].
Returns
-------
newmodel : TYPE
The new model with the new and deleted equations .
newdf : TYPE
a dataframe with calculated add factors. Origin is the original models lastdf.
'''
updatefunks=list(set(self.funks+newfunks) ) #
newmodelname = newname if newname else self.name+' Updated'
dfvars = set(self.lastdf.columns)
updatemodel = self.__class__.from_eq(updateeq,funks=updatefunks) # create the moedl with the usual processing of frml's
frmldict2 = {k: v['frml'] for (k,v) in updatemodel.allvar.items() if k in updatemodel.endogene} # A dict with the frml's of the modify
frmldic2_strip = {k : v.replace('$','').split(' ',2) for k,v in frmldict2.items() } # {endovariabbe : [FRML, FRMLNAME, EXPRESSION]...}
frmldic2_normal = {k : [frml,fname,normal(expression,do_preprocess=do_preprocess,
add_adjust=add_adjust)] for k,(frml,fname,expression)
in frmldic2_strip.items()}
frmldict_update = {k: f'{frml} {fname} {nexpression.normalized}$' for k,(frml,fname,nexpression)
in frmldic2_normal.items()}
if add_adjust:
frmldict_calc_add = {k: f'{frml} {fname} {nexpression.calc_adjustment}$' for k,(frml,fname,nexpression)
in frmldic2_normal.items() if k in self.endogene|self.exogene|dfvars }
else:
frmldict_calc_add={}
# breakpoint()
frmldict = {k: v['frml'] for (k,v) in self.allvar.items() if k in self.endogene } # frml's in the existing model
newfrmldict = {**frmldict,**frmldict_update} # frml's in the new model
newfrml = '\n'.join([f for f in newfrmldict.values()])
newmodel = self.__class__(newfrml,modelname = f'updated {self.name}',funks=updatefunks)
if len(frmldict_calc_add) and add_adjust:
calc_add_frml = '\n'.join([f for f in frmldict_calc_add.values()])
calc_add_model = self.__class__(calc_add_frml,modelname = f'adjustment calculation for updated {self.name}',funks=updatefunks)
var_add = calc_add_model.endogene
print(f'\nThe model:"{self.name}" got new equations, new model name is:"{newmodelname}"')
for varname,frml in frmldict_update.items():
print(f'New equation for For {varname}')
print(f'Old frml :{frmldict.get(varname,"new endogeneous variable ")}')
if not varname in self.endogene|self.exogene and varname in dfvars:
print('This new endogeneous variable is no an existing exogeneous, but is in .lastdf')
print(f'New frml :{frml}')
print(f'Adjust calc:{frmldict_calc_add.get(varname,"No frml for adjustment calc ")}')
print()
# breakpoint()
thisdf = newmodel.insertModelVar(self.lastdf)
if len(frmldict_calc_add and calc_add):
calc_add_model.current_per = self.current_per
newdf = calc_add_model(thisdf)
print('\nNew add factors to get the same results for existing variables:')
print(newdf.loc[self.current_per,var_add])
else:
newdf = thisdf
newmodel.current_per = self.current_per.copy()
newmodel.var_description = self.var_description
newmodel.name = newmodelname
return newmodel,newdf
...
[docs]
class Graph_Mixin():
'''
This class defines graph related methods and properties
'''
[docs]
@staticmethod
def create_strong_network(g, name='Network', typeout=False, show=False):
''' create a solveorder and blockordering of af graph
uses networkx to find the core of the model
'''
strong_condensed = nx.condensation(g)
strong_topo = list(nx.topological_sort(strong_condensed))
solveorder = [
v for s in strong_topo for v in strong_condensed.nodes[s]['members']]
if typeout:
block = [[v for v in strong_condensed.nodes[s]['members']]
for s in strong_topo]
# count the simultaneous blocks so they do not get lumped together
type = [('Simultaneous'+str(i) if len(l) != 1 else 'Recursiv ', l)
for i, l in enumerate(block)]
# we want to lump recursive equations in sequense together
strongblock = [[i for l in list(item) for i in l[1]]
for key, item in groupby(type, lambda x: x[0])]
strongtype = [list(item)[0][0][:-1]
for key, item in groupby(type, lambda x: x[0])]
if show:
print('Blockstructure of:', name)
print(*Counter(strongtype).most_common())
for i, (type, block) in enumerate(zip(strongtype, strongblock)):
print('{} {:<15} '.format(i, type), block)
return solveorder, strongblock, strongtype
else:
return solveorder
@property
def strongorder(self):
if not hasattr(self, '_strongorder'):
self._strongorder = self.create_strong_network(self.endograph)
return self._strongorder
@property
def strongblock(self):
if not hasattr(self, '_strongblock'):
xx, self._strongblock, self._strongtype = self.create_strong_network(
self.endograph, typeout=True)
return self._strongblock
@property
def strongtype(self):
if not hasattr(self, '_strongtype'):
xx, self._strongblock, self._strongtype = self.create_strong_network(
self.endograph, typeout=True)
return self._strongtype
@property
def strongfrml(self):
''' To search simultaneity (circularity) in a model
this function returns the equations in each strong block
'''
simul = [block for block, type in zip(
self.strongblock, self.strongtype) if type.startswith('Simultaneous')]
out = '\n\n'.join(['\n'.join([self.allvar[v]['frml']
for v in block]) for block in simul])
return 'Equations with feedback in this model:\n'+out
[docs]
def superblock(self):
""" finds prolog, core and epilog variables """
if not hasattr(self, '_prevar'):
self._prevar = []
self._epivar = []
this = self.endograph.copy()
while True:
new = [v for v, indegree in this.in_degree() if indegree == 0]
if len(new) == 0:
break
self._prevar = self._prevar+new
this.remove_nodes_from(new)
while True:
new = [v for v, outdegree in this.out_degree() if outdegree == 0]
if len(new) == 0:
break
self._epivar = new + self._epivar
this.remove_nodes_from(new)
episet = set(self._epivar)
preset = set(self._prevar)
self.common_pre_epi_set = episet.intersection(preset)
noncore = episet | preset
self._coreorder = [v for v in self.nrorder if not v in noncore]
xx, self._corestrongblock, self._corestrongtype = self.create_strong_network(
this, typeout=True)
self._superstrongblock = ([self._prevar] +
(self._corestrongblock if len(
self._corestrongblock) else [[]])
+ [self._epivar])
self._superstrongtype = (['Recursiv'] +
(self._corestrongtype if len(
self._corestrongtype) else [[]])
+ ['Recursiv'])
self.coregraph = this
self.fblist,self.daglist = self.get_minimal_feedback_set(this)
# Find the feedback graph
if 1:
try:
self.FVS_graph = nx.DiGraph()
# Add only the feedback vertices to the new graph
self.FVS_graph.add_nodes_from(self.fblist)
# Optionally, add edges between these vertices if they exist in the original graph
for u in self.fblist:
for v in self.fblist:
if self.coregraph.has_edge(u, v):
self.FVS_graph.add_edge(u, v)
h,a = nx.hits(self.FVS_graph)
self.fbhubs = {k:v for k,v in sorted( h.items(), key=lambda x: x[1], reverse=True)}
self.fbaut = {k:v for k,v in sorted( a.items(), key=lambda x: x[1], reverse=True)}
self.fblist = [v for v in self.fbhubs.keys()]
except:
...
if self.use_fbmin:
self._corevar = self.daglist + self.fblist
else:
self._corevar = list(chain.from_iterable((v for v in self.nrorder if v in block) for block in self._corestrongblock))
@property
def prevar(self):
""" returns a set with names of endogenopus variables which do not depend
on current endogenous variables """
if not hasattr(self, '_prevar'):
self.superblock()
return self._prevar
@property
def epivar(self):
""" returns a set with names of endogenopus variables which do not influence
current endogenous variables """
if not hasattr(self, '_epivar'):
self.superblock()
return self._epivar
@property
def preorder(self):
''' the endogenous variables which can be calculated in advance '''
# return [v for v in self.nrorder if v in self.prevar]
return self.prevar
@property
def epiorder(self):
''' the endogenous variables which can be calculated in advance '''
return self.epivar
@property
def coreorder(self):
''' the solution order of the endogenous variables in the simultaneous core of the model '''
if not hasattr(self, '_coreorder'):
self.superblock()
return self._corevar
@property
def coreset(self):
'''The set of variables of the endogenous variables in the simultaneous core of the model '''
if not hasattr(self, '_coreset'):
self._coreset = set(self.coreorder)
return self._coreset
@property
def precoreepiorder(self):
return self.preorder+self.coreorder+self.epiorder
@property
def prune_endograph(self):
if not hasattr(self, '_endograph'):
_ = self.endograph
self._endograph.remove_nodes_from(self.prevar)
self._endograph.remove_nodes_from(self.epivar)
return self._endograph
@property
def use_preorder(self):
return self._use_preorder
@use_preorder.setter
def use_preorder(self, use_preorder):
if use_preorder:
if self.istopo or self.straight:
pass
# print(f"You can't use preorder in this model, it is topological or straight")
# print(f"We pretend you did not try to set the option")
self._use_preorder = False
else:
self._use_preorder = True
self._oldsolveorder = self.solveorder[:]
self.solveorder = self.precoreepiorder
else:
self._use_preorder = False
if hasattr(self, '_oldsolveorder'):
self.solveorder = self._oldsolveorder
@property
def totgraph_nolag(self):
''' The graph of all variables, lagged variables condensed'''
if not hasattr(self, '_totgraph_nolag'):
terms = ((var, inf['terms']) for var, inf in self.allvar.items()
if inf['endo'])
rhss = ((var, term[term.index(self.aequalterm):])
for var, term in terms)
rhsvar = ((var, {v.var for v in rhs if v.var and v.var != var})
for var, rhs in rhss)
# print(list(rhsvar))
edges = (((v, e) for e, rhs in rhsvar for v in rhs))
self._totgraph_nolag = nx.DiGraph(edges)
return self._totgraph_nolag
@property
def totgraph(self):
''' Returns the total graph of the model, including leads and lags '''
if not hasattr(self, '_totgraph'):
self._totgraph = self.totgraph_get()
return self._totgraph
@property
def endograph_nolag(self):
''' Dependencygraph for all periode endogeneous variable, shows total dependencies '''
if not hasattr(self, '_endograph_nolag'):
terms = ((var, inf['terms'])
for var, inf in self.allvar.items() if inf['endo'])
rhss = ((var, term[self.allvar[var]['assigpos']:])
for var, term in terms)
rhsvar = (
(var, {v.var for v in rhs if v.var and v.var in self.endogene and v.var != var}) for var, rhs in rhss)
edges = ((v, e) for e, rhs in rhsvar for v in rhs)
# print(edges)
self._endograph_nolag = nx.DiGraph(edges)
self._endograph_nolag.add_nodes_from(self.endogene)
return self._endograph_nolag
@property
def endograph_lag_lead(self):
''' Returns the graph of all endogeneous variables including lags and leads'''
if not hasattr(self, '_endograph_lag_lead'):
self._endograph_lag_lead = self.totgraph_get(onlyendo=True)
return self._endograph_lag_lead
[docs]
def totgraph_get(self, onlyendo=False):
''' The graph of all variables including and seperate lagged and leaded variable
onlyendo : only endogenous variables are part of the graph
'''
def lagvar(xlag):
''' makes a string with lag '''
return '('+str(xlag)+')' if int(xlag) < 0 else ''
def lagleadvar(xlag):
''' makes a string with lag or lead '''
return f'({int(xlag):+})' if int(xlag) != 0 else ''
terms = ((var, inf['terms']) for var, inf in self.allvar.items()
if inf['endo'])
rhss = ((var, term[term.index(self.aequalterm):])
for var, term in terms)
if onlyendo:
rhsvar = ((var, {(v.var+'('+v.lag+')' if v.lag else v.var)
for v in rhs if v.var if v.var in self.endogene}) for var, rhs in rhss)
edgeslag = [(v+lagleadvar(lag+1), v+lagleadvar(lag)) for v, inf in self.allvar.items()
for lag in range(inf['maxlag'], 0) if v in self.endogene]
edgeslead = [(v+lagleadvar(lead-1), v+lagleadvar(lead)) for v, inf in self.allvar.items()
for lead in range(inf['maxlead'], 0, -1) if v in self.endogene]
else:
rhsvar = ((var, {(v.var+'('+v.lag+')' if v.lag else v.var)
for v in rhs if v.var}) for var, rhs in rhss)
edgeslag = [(v+lagleadvar(lag+1), v+lagleadvar(lag))
for v, inf in self.allvar.items() for lag in range(inf['maxlag'], 0)]
edgeslead = [(v+lagleadvar(lead-1), v+lagleadvar(lead))
for v, inf in self.allvar.items() for lead in range(inf['maxlead'], 0, -1)]
# print(list(rhsvar))
edges = (((v, e) for e, rhs in rhsvar for v in rhs))
# edgeslag = [(v,v+lagvar(lag)) for v,inf in m2test.allvar.items() for lag in range(inf['maxlag'],0)]
totgraph = nx.DiGraph(chain(edges, edgeslag, edgeslead))
totgraph.add_nodes_from(self.endogene)
return totgraph
[docs]
def graph_remove(self, paralist):
''' Removes a list of variables from the totgraph and totgraph_nolag
mostly used to remove parmeters from the graph, makes it less crowded'''
if not hasattr(self, '_totgraph') or not hasattr(self, '_totgraph_nolag'):
_ = self.totgraph
_ = self.totgraph_nolag
self._totgraph.remove_nodes_from(paralist)
self._totgraph_nolag.remove_edges_from(paralist)
return
[docs]
def graph_restore(self):
''' If nodes has been removed by the graph_remove, calling this function will restore them '''
if hasattr(self, '_totgraph') or hasattr(self, '_totgraph_nolag'):
delattr(self, '_totgraph')
delattr(self, '_totgrapH_nolag')
return
[docs]
@staticmethod
def get_minimal_feedback_set(G):
"""
Compute an approximate minimal feedback vertex set for a directed graph and perform
a topological sort on the resulting acyclic graph.
This function attempts to make the input directed graph acyclic by removing a minimal
set of vertices (feedback vertex set). It does so by iteratively finding and removing
vertices from cycles, preferring those with the highest degree (number of edges). After
removing sufficient vertices to break all cycles, it performs a topological sort on the
modified graph.
Parameters:
G (nx.DiGraph): A directed graph represented as a NetworkX DiGraph.
Returns:
tuple: A tuple containing two elements:
- A set of vertices constituting the approximate minimal feedback vertex set.
- A list representing the topological sort order of the modified graph.
If the input graph G is already acyclic, the feedback vertex set will be empty,
and the topological sort order will be of the original graph G.
Note:
This function does not guarantee the absolute minimal feedback vertex set due to the
NP-hard nature of the problem. The topological sort is only valid if the resulting graph
is acyclic.
"""
# Create a copy of the graph to avoid modifying the original graph
H = G.copy()
fvs = set()
# Function to calculate the degree of a vertex in the cycle
def vertex_degree(vertex):
return H.degree(vertex)
# Iterate until the graph becomes acyclic
while True:
try:
# Find a cycle in the graph
cycle = nx.find_cycle(H, orientation='original')
# Find the vertex in the cycle with the highest degree
vertex_to_remove = max(cycle, key=lambda x: vertex_degree(x[0]))[0]
# Add the vertex to the feedback vertex set
fvs.add(vertex_to_remove)
# Remove the vertex from the graph
H.remove_node(vertex_to_remove)
except nx.NetworkXNoCycle:
# If no cycle is found, the graph is now acyclic, and we break the loop
break
# Perform a topological sort on the modified graph
topological_sorted_order = list(nx.topological_sort(H))
return list(fvs) , topological_sorted_order
[docs]
class Graph_Draw_Mixin():
"""This class defines methods and properties which draws and vizualize using different
graphs of the model
"""
[docs]
def treewalk(self, g, navn, level=0, parent='Start', maxlevel=20, lpre=True):
''' Traverse the call tree from name, and returns a generator \n
to get a list just write: list(treewalk(...))
maxlevel determins the number of generations to back up
lpre=0 we walk the dependent
lpre=1 we walk the precednc nodes
'''
if level <= maxlevel:
if parent != 'Start':
# return level=0 in order to prune dublicates
yield node(level, parent, navn)
for child in (g.predecessors(navn) if lpre else g[navn]):
yield from self.treewalk(g, child, level + 1, navn, maxlevel, lpre)
[docs]
def drawendo(self, **kwargs):
'''draws a graph of of the whole model'''
alllinks = (node(0, n[1], n[0]) for n in self.endograph.edges())
return self.todot2(alllinks, **kwargs)
[docs]
def drawendo_lag_lead(self, **kwargs):
'''draws a graph of of the whole model'''
alllinks = (node(0, n[1], n[0])
for n in self.endograph_lag_lead.edges())
return self.todot2(alllinks, **kwargs)
[docs]
def drawmodel(self, lag=True, **kwargs):
'''draws a graph of of the whole model'''
graph = self.totgraph if lag else self.totgraph_nolag
alllinks = (node(0, n[1], n[0]) for n in graph.edges())
return self.todot2(alllinks, **kwargs)
[docs]
def plotadjacency(self, size=(5, 5), title='Structure', nolag=False):
'''
Draws an adjacendy matrix
Args:
size (TYPE, optional): DESCRIPTION. Defaults to (5, 5).
title (TYPE, optional): DESCRIPTION. Defaults to 'Structure'.
nolag (TYPE, optional): DESCRIPTION. Defaults to False.
Returns:
fig (matplotlib figure): A adjacency matrix drawing.
'''
if nolag:
G = self.endograph_nolag
order, blocks, blocktype = self.create_strong_network(
G, typeout=True)
fig = draw_adjacency_matrix(
G, order, blocks, blocktype, size=size, title=title)
else:
fig = draw_adjacency_matrix(self.endograph, self.precoreepiorder,
self._superstrongblock, self._superstrongtype, size=size, title=title)
return fig
[docs]
def draw(self, navn, down=1, up=1, lag=False, endo=False, filter=0, **kwargs):
'''draws a graph of dependensies of navn up to maxlevel
:lag: show the complete graph including lagged variables else only variables.
:endo: Show only the graph for current endogenous variables
:down: level downstream
:up: level upstream
'''
if filter and lag:
print('No lagged nodes when using filter')
graph = self.totgraph if (lag and not filter) else self.totgraph_nolag
graph = self.endograph if endo else graph
uplinks = self.upwalk(graph, navn.upper(), maxlevel=up, lpre=True,filter=filter )
downlinks = (node(-level, navn, parent) for level, parent, navn in
self.upwalk(graph, navn.upper(), maxlevel=down, lpre=False,filter=filter))
alllinks = chain(uplinks, downlinks)
return self.todot2(alllinks, navn=navn.upper(), down=down, up=up, filter = filter, **kwargs)
[docs]
def trans(self, ind, root, transdic=None, debug=False):
''' as there are many variable starting with SHOCK, the can renamed to save nodes'''
if debug:
print('>', ind)
ud = ind
if ind == root or transdic is None:
pass
else:
for pat, to in transdic.items():
if debug:
print('trans ', pat, ind)
if bool(re.match(pat.upper(), ind)):
if debug:
print(f'trans match {ind} with {pat}')
return to.upper()
if debug:
print('trans', ind, ud)
return ud
[docs]
def color(self, v, navn=''):
# breakpoint()
if navn == v:
out = 'red'
return out
if v in self.endogene:
out = 'steelblue1'
elif v in self.exogene:
out = 'yellow'
elif '(' in v:
namepart = v.split('(')[0]
out = 'springgreen' if namepart in self.endogene else 'olivedrab1'
else:
out = 'red'
return out
[docs]
def upwalk(self, g, navn, level=0, parent='Start', maxlevel=20, filter=0.0, lpre=True):
''' Traverse the call tree from name, and returns a generator \n
to get a list just write: list(upwalk(...))
maxlevel determins the number
of generations to back maxlevel
'''
if filter:
if level <= maxlevel:
# print(level,parent,navn)
# print(f'upwalk {level=} {parent=} {navn=}')
if parent != 'Start':
# print(f'Look at {parent=} {navn=}' )
try:
if ((self.get_att_pct_to_from(parent,navn) if lpre
else self.get_att_pct_to_from(navn,parent)).abs() >= filter).any():
# print(f'yield {parent=} {navn=}' )
yield node(level, parent, navn)
except:
pass
# yield node(level, parent, navn)
for child in (g.predecessors(navn) if lpre else g[navn]):
# breakpoint()
try:
if ((self.get_att_pct_to_from(navn,child) if lpre
else self.get_att_pct_to_from(child,navn)).abs() >= filter).any():
# print(f'yield from {child=} {navn=}')
yield from self.upwalk(g, child, level + 1, navn, maxlevel, filter,lpre)
except:
yield from self.upwalk(g, child, level + 1, navn, maxlevel, filter,lpre)
pass
else:
if level <= maxlevel:
if parent != 'Start':
# return level=0 in order to prune dublicates
yield node(level, parent, navn)
for child in (g.predecessors(navn) if lpre else g[navn]):
yield from self.upwalk(g, child, level + 1, navn, maxlevel, filter, lpre)
[docs]
def upwalk_old(self, g, navn, level=0, parent='Start', up=20, select=0.0, lpre=True):
''' Traverse the call tree from name, and returns a generator \n
to get a list just write: list(upwalk(...))
up determins the number
of generations to back up
'''
if select:
if level <= up:
# print(level,parent,navn)
# print(f'upwalk {level=} {parent=} {navn=}')
if parent != 'Start':
# print(level,parent,navn,'\n',(g[navn][parent]['att']))
if (g[navn][parent]['att'].abs() >= select).any(axis=1).any():
# return level=0 in order to prune dublicates
# print(f'Yield {level=} {parent=} {navn=}')
yield node(level, parent, navn)
for child in (g.predecessors(navn) if lpre else g[navn]):
try:
# print('vvv',level,parent,navn)
if parent == 'Start': # (g[parent][navn]['att'].abs() >= select).any(axis=1).any():
# print(f'Yield upwalk {(level+1)=} {child=} {navn=}')
yield from self.upwalk(g, child, level + 1, navn, up, select,lpre)
elif (g[navn][parent]['att'].abs() >= select).any(axis=1).any():
# print(f'yield {level=} {parent=} {navn=} ')
yield from self.upwalk(g, child, level + 1, navn, up, select,lpre)
else:
# print(f'UPS {level=} {parent=} {navn=} ')
pass
except Exception as e:
# breakpoint()
# print(f'Exception {e}')
# print('Problem ',level,parent,navn,'\n',g[navn][parent]['att'])
pass
else:
if level <= up:
if parent != 'Start':
# return level=0 in order to prune dublicates
yield node(level, parent, navn)
for child in (g.predecessors(navn) if lpre else g[navn]):
yield from self.upwalk(g, child, level + 1, navn, up, select, lpre)
[docs]
def explain(self, var, up=1, start='', end='', filter = 0, showatt=True, lag=True,
debug=0, noshow=False, dot=False, **kwargs):
''' Walks a tree to explain the difference between basedf and lastdf
Parameters:
:var: the variable we are looking at
:up: how far up the tree will we climb
:select: Only show the nodes which contributes
:showatt: Show the explanation in pct
:lag: If true, show all lags, else aggregate lags for each variable.
:HR: if true make horisontal graph
:title: Title
:saveas: Filename
:pdf: open the pdf file
:svg: display the svg file
:browser: if true open the svg file in browser
:noshow: Only return the resulting graph
:dot: Return the dot file only
'''
if up > 0:
with self.timer('Get totgraph', debug) as t:
startgraph = self.totgraph if lag else self.totgraph_nolag
edgelist = list({v for v in self.upwalk(
startgraph, var.upper(), maxlevel=up)})
nodelist = list({v.child for v in edgelist}) + \
[var] # remember the starting node
nodestodekomp = list(
{n.split('(')[0] for n in nodelist if n.split('(')[0] in self.endogene})
# print(nodelist)
# print(nodestodekomp)
with self.timer('Dekomp', debug) as t:
pctdic2 = {n: self.get_att_pct(
n, lag=lag, start=start, end=end) for n in nodestodekomp}
edges = {(r, n): {'att': df.loc[[r], :]}
for n, df in pctdic2.items() for r in df.index}
self.localgraph = nx.DiGraph()
self.localgraph.add_edges_from(
[(v.child, v.parent) for v in edgelist])
nx.set_edge_attributes(self.localgraph, edges)
self.newgraph = nx.DiGraph()
for v in self.upwalk(self.localgraph, var.upper(), maxlevel=up, filter=filter):
# print(f'{"-"*v.lev} {v.child} {v.parent} \n',self.localgraph[v.child][v.parent].get('att','**'))
# print(f'{"-"*v.lev} {v.child} {v.parent} \n')
self.newgraph.add_edge(
v.child, v.parent, att=self.localgraph[v.child][v.parent].get('att', None))
nodeatt = {n: {'att': i} for n, i in pctdic2.items()}
nx.set_node_attributes(self.newgraph, nodeatt)
nodevalues = {n: {'values': self.get_values(
n)} for n in self.newgraph.nodes}
nx.set_node_attributes(self.newgraph, nodevalues)
else:
self.newgraph = nx.DiGraph([(var, var)])
nx.set_node_attributes(
self.newgraph, {var: {'values': self.get_values(var)}})
nx.set_node_attributes(self.newgraph, {
var: {'att': self.get_att_pct(var, lag=lag, start=start, end=end)}})
if noshow:
return self.newgraph
if dot:
return self.todot(self.newgraph, navn=var, showatt=showatt, dot=True, **kwargs)
else:
self.gdraw(self.newgraph, navn=var, showatt=showatt, **kwargs)
return self.newgraph
[docs]
def dftodottable(self, df, dec=0):
xx = '\n'.join([f"<TR {self.maketip(row[0],True)}><TD ALIGN='LEFT' {self.maketip(row[0],True)}>{row[0]}</TD>" +
''.join(["<TD ALIGN='RIGHT'>"+(f'{b:{25},.{dec}f}'.strip()+'</TD>').strip()
for b in row[1:]])+'</TR>' for row in df.itertuples()])
return xx
[docs]
def todot(self, g, navn='', browser=False, **kwargs):
''' makes a drawing of subtree originating from navn
all is the edges
attributex can be shown
:sink: variale to use as sink
:svg: Display the svg image
'''
size = kwargs.get('size', (6, 6))
alledges = (node(0, n[1], n[0]) for n in g.edges())
if 'transdic' in kwargs:
alllinks = (node(x.lev, self.trans(x.parent, navn, kwargs['transdic']), self.trans(
x.child, navn, kwargs['transdic'])) for x in alledges)
elif hasattr(self, 'transdic'):
alllinks = (node(x.lev, self.trans(x.parent, navn, self.transdic), self.trans(
x.child, navn, self.transdic)) for x in alledges)
else:
alllinks = alledges
ibh = {node(0, x.parent, x.child)
for x in alllinks} # To weed out multible links
if kwargs.get('showatt', False):
att_dic = nx.get_node_attributes(g, 'att')
values_dic = nx.get_node_attributes(g, 'values')
showatt = True
else:
showatt = False
#
dec = kwargs.get('dec', 0)
nodelist = {n for nodes in ibh for n in (nodes.parent, nodes.child)}
def dftotable(df, dec=0):
xx = '\n'.join([f"<TR {self.maketip(row[0],True)}><TD ALIGN='LEFT' {self.maketip(row[0],True)}>{row[0]}</TD>" +
''.join(["<TD ALIGN='RIGHT'>"+(f'{b:{25},.{dec}f}'.strip()+'</TD>').strip()
for b in row[1:]])+'</TR>' for row in df.itertuples()])
return xx
def makenode(v):
# tip= f'{pt.split_frml(self.allvar[v]["frml"])[3][:-1]}' if v in self.endogene else f'{v}'
if showatt:
dfval = values_dic[v]
dflen = len(dfval.columns)
lper = "<TR><TD ALIGN='LEFT'>Per</TD>" + \
''.join(['<TD>'+(f'{p}'.strip()+'</TD>').strip()
for p in dfval.columns])+'</TR>'
hval = f"<TR><TD COLSPAN = '{dflen+1}' {self.maketip(v,True)}>{v}</TD></TR>"
lval = dftotable(dfval, dec)
try:
latt = f"<TR><TD COLSPAN = '{dflen+1}'> % Explained by</TD></TR>{dftotable(self.att_dic[v],dec)}" if len(
self.att_dic[v]) else ''
except:
latt = ''
# breakpoint()
linesout = hval+lper+lval+latt
out = f'"{v}" [shape=box fillcolor= {self.color(v,navn)} margin=0.025 fontcolor=blue style=filled ' + (
f" label=<<TABLE BORDER='1' CELLBORDER = '1' > {linesout} </TABLE>> ]")
pass
else:
out = f'"{v}" [shape=box fillcolor= {self.color(v,navn)} margin=0.025 fontcolor=blue style=filled ' + (
f" label=<<TABLE BORDER='0' CELLBORDER = '0' > <TR><TD>{v}</TD></TR> </TABLE>> ]")
return out
pre = 'digraph TD { rankdir ="HR" \n' if kwargs.get(
'HR', False) else 'digraph TD { rankdir ="LR" \n'
nodes = '{node [margin=0.025 fontcolor=blue style=filled ] \n ' + \
'\n'.join([makenode(v) for v in nodelist])+' \n} \n'
def getpw(v):
'''Define pennwidth based on explanation in the last period '''
try:
return max(0.5, min(5., abs(g[v.child][v.parent]['att'].iloc[0, -1])/20.))
except:
return 0.5
if showatt or True:
pw = [getpw(v) for v in ibh]
else:
pw = [1 for v in ibh]
links = '\n'.join(
[f'"{v.child}" -> "{v.parent}" [penwidth={p}]' for v, p in zip(ibh, pw)])
psink = '\n{ rank = sink; "' + \
kwargs['sink'].upper()+'" ; }' if kwargs.get('sink', False) else ''
psource = '\n{ rank = source; "' + \
kwargs['source'].upper()+'" ; }' if kwargs.get('source',
False) else ''
fname = kwargs.get(
'saveas', f'{navn} explained' if navn else "A_model_graph")
ptitle = '\n label = "'+kwargs.get('title', fname)+'";'
post = '\n}'
out = pre+nodes+links+psink+psource+ptitle+post
if kwargs.get('dot', False):
return out
self.display_graph(out, fname, **kwargs)
[docs]
def gdraw(self, g, **kwargs):
'''draws a graph of of the whole model'''
out = self.todot(g, **kwargs)
return out
[docs]
def maketip(self, v, html=False):
'''
Return a tooltip for variable v.
For use when generating .dot files for Graphviz
If html==True it can be incorporated into html string'''
var_name = v.split("(")[0]
des0 = f"{self.var_description[var_name]}\n{self.allvar[var_name]['frml'] if var_name in self.endogene else 'Exogen'}"
des = self.html_replace(des0)
if html:
return f'TOOLTIP="{des}" href="bogus"'
else:
return f'tooltip="{des}"'
[docs]
def makedotnew(self, alledges, navn='', **kwargs):
''' makes a drawing of all edges in list alledges
all is the edges
this can handle both attribution and plain
:all: show values for .dfbase and .lastdf
:last: show the values for .lastdf
:growthshow: Show growthrates
:attshow: Show attributiuons
:filter: Prune tree branches where all(abs(attribution)<filter value)
:sink: variale to use as sink
:source: variale to use as ssource
:svg: Display the svg image in browser
:pdf: display the pdf result in acrobat reader
:saveas: Save the drawing as name
:size: figure size default (6,6)
:warnings: warnings displayed in command console, default =False
:invisible: set of invisible nodes
:labels: dict of labels for edges
:transdic: dict of translations for consolidation of nodes {'SHOCK[_A-Z]*__J':'SHOCK__J','DEV__[_A-Z]*':'DEV'}
:dec: decimal places in numbers
:HR: horisontal orientation default = False
:des: inject variable descriptions
:fokus2: Variable for which values are shown
:fokus2all: Show values for all variables
'''
invisible = kwargs.get('invisible', set())
des = kwargs.get('des', True)
# breakpoint()
attshow = kwargs.get('attshow',kwargs.get('ats',False))
growthshow = kwargs.get('growthshow',kwargs.get('gs',False))
showdata = kwargs.get('showdata',kwargs.get('sd',None))
fokus2= []
fokus2all = False
# breakpoint()
if type(showdata) in {int,bool}:
fokus2all = showdata
elif type(showdata) == str:
fokus2 = set(self.vlist(showdata))
else:
# for legacy
fokus200 = kwargs.get('fokus2', '')
fokus2 = set(self.vlist(fokus200)) if type(fokus200) == str else fokus200
fokus2all = kwargs.get('fokus2all', False)
dec = kwargs.get('dec', 3)
att = kwargs.get('att', True)
filter = kwargs.get('filter', 0)
def color( v, navn=''):
# breakpoint()
if navn == v:
out = 'red'
return out
if v in fokus2:
out = 'chocolate1'
return out
if v in self.endogene:
out = 'steelblue1'
elif v in self.exogene:
out = 'yellow'
elif '(' in v:
namepart = v.split('(')[0]
out = 'springgreen' if namepart in self.endogene else 'olivedrab1'
else:
out = 'red'
return out
def dftotable(df, dec=0):
xx = '\n'.join([f"<TR {self.maketip(row[0],True)}><TD ALIGN='LEFT' {self.maketip(row[0],True)}>{row[0]}</TD>" +
''.join(["<TD ALIGN='RIGHT'>"+(f'{b:{25},.{dec}f}'.strip()+'</TD>').strip()
for b in row[1:]])+'</TR>' for row in df.itertuples()])
return xx
def getpw(v):
'''Define pennwidth based on max explanation over the period '''
try:
# breakpoint()
return f'{max(1., min(8., self.att_dic[v.parent].loc[v.child].abs().max()/10.)):2}'
# return f'{max(1., min(8., self.get_att_pct_to_from(v.parent,v.child).abs().max()/10.)):.2}'
except:
return '0.5'
def getpct(v):
'''Define pennwidth based on explanation in the last period '''
try:
# breakpoint()
return f'" {v.child} -> {v.parent} Min. att. {self.att_dic[v.parent].loc[v.child].min():.0f}% max: {self.att_dic[v.parent].loc[v.child].max():.0f}%"'
except:
return 'NA'
def stylefunk(n1=None, n2=None, invisible=set()):
if n1 in invisible or n2 in invisible:
if n2:
return 'style = invisible arrowhead=none '
else:
return 'style = invisible '
else:
# return ''
return 'style = filled'
def stylefunkhtml(n1=None, invisible=set()):
# return ''
if n1 in invisible:
return 'style = "invisible" '
else:
return 'style = "filled"'
if 'transdic' in kwargs:
alllinks = [node(x.lev, self.trans(x.parent, navn, kwargs['transdic']), self.trans(
x.child, navn, kwargs['transdic'])) for x in alledges]
elif hasattr(self, 'transdic'):
alllinks = [node(x.lev, self.trans(x.parent, navn, self.transdic), self.trans(
x.child, navn, self.transdic)) for x in alledges]
else:
alllinks = list(alledges)
labelsdic = kwargs.get('labels', {})
labels = self.defsub(labelsdic)
if not len(alllinks):
print(f'No graph {navn}')
return
maxlevel = max([l for l, p, c in alllinks])
if att:
try:
to_att = {p for l, p, c in alllinks }|{ c.split('(')[0] for l, p, c in alllinks if c.split('(')[0] in self.endogene}
self.att_dic = {v: self.get_att_pct(
v.split('(')[0], lag=False, start='', end='') for v in to_att}
self.att_dic_level = {v: self.get_att_level(
v.split('(')[0], lag=False, start='', end='') for v in to_att}
except:
self.att_dic_leve ={}
self.att_dic={}
ibh = {node(0, x.parent, x.child)
for x in alllinks} # To weed out multible links
#
nodelist = {n for nodes in ibh for n in (nodes.parent, nodes.child)}
try:
self.value_dic = {v: self.get_values(v) for v in nodelist }
except:
self.value_dic = dict()
# breakpoint()
# print(nodelist)
def makenode(v, navn):
# print(v,fokus2all)
show = (v in fokus2) or fokus2all
# print(f'{v=}, {show=} {fokus2=}')
if (kwargs.get('last', True) or kwargs.get('all', False) or
attshow or growthshow) and show :
# breakpoint()
try:
t = pt.udtryk_parse(v, funks=[])
var = t[0].var
lag = int(t[0].lag) if t[0].lag else 0
try:
bvalues = [float(get_a_value(self.basedf, per, var, lag))for per in self.current_per]
base = "<TR><TD ALIGN='LEFT' TOOLTIP='Baseline values' href='bogus'>Base</TD>"+''.join(["<TD ALIGN='RIGHT' TOOLTIP='Baseline values' href='bogus' >"+(
f'{b:{25},.{dec}f}'.strip()+'</TD>').strip() for b in bvalues])+'</TR>'
except:
base= ''
try:
lvalues = [float(get_a_value(self.lastdf, per, var, lag)) for per in self.current_per]
last = "<TR><TD ALIGN='LEFT' TOOLTIP='Latest run values' href='bogus'>Last</TD>"+''.join(["<TD ALIGN='RIGHT' >"+(
f'{b:{25},.{dec}f}'.strip()+'</TD>').strip() for b in lvalues])+'</TR>'
except:
last=''
try:
dvalues = [float(get_a_value(self.lastdf, per, var, lag)-get_a_value(self.basedf, per, var, lag))
for per in self.current_per]
dif = "<TR><TD ALIGN='LEFT' TOOLTIP='Difference between baseline and latest run' href='bogus'>Diff</TD>" + \
''.join(["<TD ALIGN='RIGHT'>"+(f'{b:{25},.{dec}f}'.strip()+'</TD>').strip(
) for b in dvalues])+'</TR>'
except:
dif = ''
if attshow:
try:
# breakpoint()
attvalues = self.att_dic[v]
# attvalues = self.get_att_pct(v.split('(')[0], lag=False, start='', end='')
if filter:
attvalues = cutout(attvalues,filter)
# attvalues2 = self.att_dic_level[v]
dflen = len(attvalues.columns)
latt = f"<TR><TD COLSPAN = '{dflen+1}'> % Explained by</TD></TR>{dftotable(attvalues,dec)}" if len(
self.att_dic[v]) else ''
except:
latt = ''
else:
latt = ''
if growthshow:
try:
growthdf = self.get_var_growth(v,diff=True)
dflen = len(growthdf.columns)
lgrowth = f"<TR><TD COLSPAN = '{dflen+1}'> Growth rate in %</TD></TR>{dftotable(growthdf,dec)}" if len(
self.att_dic[v]) else ''
except:
lgrowth = ''
else:
lgrowth = ''
per = "<TR><TD ALIGN='LEFT'>Per</TD>" + \
''.join(['<TD>'+(f'{p}'.strip()+'</TD>').strip()
for p in self.current_per])+'</TR>'
# tip= f' tooltip="{self.allvar[var]["frml"]}"' if self.allvar[var]['endo'] else f' tooltip = "{v}" '
# out = f'"{v}" [shape=box fillcolor= {color(v,navn)} margin=0.025 fontcolor=blue {stylefunk(var,invisible=invisible)} ' + (
out = f'"{v}" [shape=box style=filled fillcolor=None margin=0.025 fontcolor=blue ' + (
f" label=<<TABLE BORDER='1' CELLBORDER = '1' {stylefunkhtml(var,invisible=invisible)} > <TR><TD COLSPAN ='{len(lvalues)+1}' bgcolor='{color(v,navn)}' {self.maketip(v,True)} >{self.get_des_html(v,des)}</TD></TR>{per} {base}{last}{dif}{lgrowth}{latt} </TABLE>> ]")
pass
except Exception as inst:
# breakpoint()
out = f'"{v}" [shape=box fillcolor= {color(v,navn)} {self.maketip(v,True)} margin=0.025 fontcolor=blue {stylefunk(var,invisible=invisible)} ' + (
f" label=<<TABLE BORDER='0' CELLBORDER = '0' {stylefunkhtml(var,invisible=invisible)} > <TR><TD>{self.get_des_html(v)}</TD></TR> <TR><TD> Condensed</TD></TR></TABLE>> ]")
else:
out = f'"{v}" [ shape=box fillcolor= {color(v,navn)} {self.maketip(v,False)} margin=0.025 fontcolor=blue {stylefunk(v,invisible=invisible)} ' + (
f" label=<<TABLE BORDER='0' CELLBORDER = '0' {stylefunkhtml(v,invisible=invisible)} > <TR><TD {self.maketip(v)}>{self.get_des_html(v,des)}</TD></TR> </TABLE>> ]")
return out
pre = 'digraph TD {rankdir ="HR" \n' if kwargs.get(
'HR', False) else 'digraph TD { rankdir ="LR" \n'
nodes = '{node [margin=0.025 fontcolor=blue style=filled ] \n ' + \
'\n'.join([makenode(v, navn) for v in nodelist])+' \n} \n'
links = '\n'.join([f'"{v.child}" -> "{v.parent}" [ {stylefunk(v.child,v.parent,invisible=invisible)} tooltip={getpct(v)} href="bogus" penwidth = {getpw(v)} ]'
for v in ibh])
# breakpoint()
# links = '\n'.join(
# [f'"{v.child}" -> "{v.parent}" [penwidth={p}]' for v, p in zip(ibh, pw)])
psink = '\n{ rank = sink; "' + \
kwargs['sink'] + '" ; }' if kwargs.get('sink', False) else ''
psource = '\n{ rank = source; "' + \
kwargs['source']+'" ; }' if kwargs.get('source', False) else ''
clusterout = ''
# expect a dict with clustername as key and a list of nodes as content
if kwargs.get('cluster', False):
clusterdic = kwargs.get('cluster', False)
for i, (c, cl) in enumerate(clusterdic.items()):
varincluster = ' '.join([f'"{v.upper()}"' for v in cl])
clusterout = clusterout + \
f'\n subgraph cluster{i} {{ {varincluster} ; label = "{c}" ; color=lightblue ; style = filled ;fontcolor = yellow}}'
fname = kwargs.get('saveas', navn if navn else "A_model_graph")
ptitle = '\n label = "'+kwargs.get('title', fname)+'";'
post = '\n}'
out = pre+nodes+links+psink+psource+clusterout+ptitle+post
# self.display_graph(out,fname,**kwargs)
# run('%windir%\system32\mspaint.exe '+ pngname,shell=True) # display the drawing
return out
[docs]
def todot2(self, alledges, navn='', **kwargs):
''' makes a drawing of all edges in list alledges
all is the edges
:all: show values for .dfbase and .dflaste
:last: show the values for .dflast
:sink: variale to use as sink
:source: variale to use as ssource
:svg: Display the svg image in browser
:pdf: display the pdf result in acrobat reader
:saveas: Save the drawing as name
:size: figure size default (6,6)
:warnings: warnings displayed in command console, default =False
:invisible: set of invisible nodes
:labels: dict of labels for edges
:transdic: dict of translations for consolidation of nodes {'SHOCK[_A-Z]*__J':'SHOCK__J','DEV__[_A-Z]*':'DEV'}
:dec: decimal places in numbers
:HR: horisontal orientation default = False
:des: inject variable descriptions
'''
dot = self.makedotnew(alledges, navn=navn, **kwargs)
if type(dot) == type(None):
print('The graph is empty')
if kwargs.get('filter',False):
print('Perhaps filter prunes to much')
return
fname = kwargs.get('saveas', navn if navn else "A_model_graph")
if kwargs.get('dot', False):
return dot
self.display_graph(dot, fname, **kwargs)
return
[docs]
def display_graph_old(self, dot, fname, browser, kwargs):
size = kwargs.get('size', (6, 6))
tpath = os.path.join(os.getcwd(), 'graph')
if not os.path.isdir(tpath):
try:
os.mkdir(tpath)
except:
print("ModelFlow: Can't create folder for graphs")
return
# filename = os.path.join(r'graph',navn+'.gv')
filename = os.path.join(tpath, fname+'.gv')
pngname = '"'+os.path.join(tpath, fname+'.png')+'"'
svgname = '"'+os.path.join(tpath, fname+'.svg')+'"'
pdfname = '"'+os.path.join(tpath, fname+'.pdf')+'"'
epsname = '"'+os.path.join(tpath, fname+'.eps')+'"'
with open(filename, 'w') as f:
f.write(dot)
warnings = "" if kwargs.get("warnings", False) else "-q"
# run('dot -Tsvg -Gsize=9,9 -o'+svgname+' "'+filename+'"',shell=True) # creates the drawing
# creates the drawing
run(f'dot -Tsvg -Gsize={size[0]},{size[1]} -o{svgname} "{filename}" {warnings} ', shell=True)
# creates the drawing
run(f'dot -Tpng -Gsize={size[0]},{size[1]} -Gdpi=300 -o{pngname} "{filename}" {warnings} ', shell=True)
# creates the drawing
run(f'dot -Tpdf -Gsize={size[0]},{size[1]} -o{pdfname} "{filename}" {warnings} ', shell=True)
# run('dot -Tpdf -Gsize=9,9 -o'+pdfname+' "'+filename+'"',shell=True) # creates the drawing
# run('dot -Teps -Gsize=9,9 -o'+epsname+' "'+filename+'"',shell=True) # creates the drawing
if kwargs.get('png', False):
display(Image(filename=pngname[1:-1]))
else:
try:
display(SVG(filename=svgname[1:-1]))
except:
display(Image(filename=pngname[1:-1]))
# breakpoint()
if browser:
wb.open(svgname, new=2)
# breakpoint()
if kwargs.get('pdf', False):
print('close PDF file before continue')
os.system(pdfname)
[docs]
@staticmethod
def display_graph(out, fname, **kwargs):
'''Generates a graphviz file from the commands in out.
The file is placed in cwd/graph
A png and a svg file is generated, and the svg file is displayed if possible.
options pdf and eps determins if a pdf and an eps file is genrated.
option fpdf will cause the graph displayed in a seperate pdf window
option browser determins if a seperate browser window is open'''
from IPython.core.magic import register_line_magic, register_cell_magic
from pathlib import Path
import webbrowser as wb
from subprocess import run
# breakpoint()
tsize = kwargs.get('size', (6, 6))
size = tsize if type(tsize) == tuple else tuple(int(i)
for i in tsize[1:-1].split(',')) # if size is a string
lpdf = kwargs.get('pdf', False)
fpdf = kwargs.get('fpdf', False)
lpng = kwargs.get('png', False)
leps = kwargs.get('eps', False)
browser = kwargs.get('browser', False)
warnings = "" if kwargs.get("warnings", False) else "-q"
# path = Path.cwd() / 'graph'
path = Path('graph')
path.mkdir(parents=True, exist_ok=True)
filename = path / (fname + '.gv')
with open(filename, 'w') as f:
f.write(out)
pngname = filename.with_suffix('.png')
svgname = filename.with_suffix('.svg')
pdfname = filename.with_suffix('.pdf')
epsname = filename.with_suffix('.eps')
xx0 = run(f'dot -Tsvg -Gsize={size[0]},{size[1]} -o"{svgname}" "{filename}" -q {warnings} ',
shell=True, capture_output=True, text=True).stderr # creates the drawing
xx1 = run(f'dot -Tpng -Gsize={size[0]},{size[1]} -Gdpi=300 -o"{pngname}" "{filename}" {warnings} ',
shell=True, capture_output=True, text=True).stderr # creates the drawing
xx2 = '' if not (lpdf or fpdf) else run(
f'dot -Tpdf -Gsize={size[0]},{size[1]} -o"{pdfname}" "{filename}" {warnings} ', shell=True, capture_output=True, text=True).stderr # creates the drawing
xx3 = '' if not leps else run(f'dot -Teps -Gsize={size[0]},{size[1]} -o"{epsname}" "{filename}" {warnings} ',
shell=True, capture_output=True, text=True).stderr # creates the drawing
for x in [xx0, xx1, xx2, xx3]:
if x:
print(
f'Error in generation graph picture:\n{x}\nPerhaps it is already open - then close the application')
# return
if lpng:
display(Image(filename=pngname))
elif lpdf:
display(IFrame(pdfname, width=1000, height=500))
else:
try:
display(SVG(filename=svgname))
except Error as e:
display(Image(filename=pngname))
if browser:
wb.open(f'file://{svgname.resolve()}', new=2)
if fpdf:
wb.open(f'file://{pdfname.resolve()}', new=2)
# display(IFrame(pdfname,width=500,height=500))
# os.system(pdfname)
return
[docs]
class Display_Mixin():
[docs]
def vis(self, *args, **kwargs):
''' Visualize the data of this model instance
if the user has another vis class she can place it in _vis, then that will be used'''
if not hasattr(self, '_vis'):
self._vis = mv.vis
return self._vis(self, *args, **kwargs)
[docs]
def varvis(self, *args, **kwargs):
return mv.varvis(self, *args, **kwargs)
[docs]
def compvis(self, *args, **kwargs):
return mv.compvis(self, *args, **kwargs)
[docs]
def ibsstyle_old(self,df,description_dict = None, dec=2,transpose=None):
'''
Args:
df (TYPE): Dataframe.
description_dict (TYPE, optional): Defaults to None then the var_description else this dict .
dec (TYPE, optional): decimals. Defaults to 2. Deciu
transpose (TYPE, optional): if Trus then rows are time else thje. Defaults to 0.
Returns:
TYPE: DESCRIPTION.
'''
# breakpoint()
if self.in_notebook():
if type(transpose) == type(None):
xtranspose = (df.index[0] in self.lastdf.index)
else:
xtranspose = transpose
des = self.var_description if type(description_dict)==type(None) else description_dict
if not xtranspose:
tt = pd.DataFrame([[des.get(v,v) for t in df.columns] for v in df.index ],index=df.index,columns=df.columns)
else:
tt = pd.DataFrame([[des.get(v,v) for v in df.columns ]for t in df.index] ,index=df.index,columns=df.columns)
xdec = f'{dec}'
result = df.style.format('{:.'+xdec+'f}').\
set_tooltips(tt, props='visibility: hidden; position: absolute; z-index: 1; border: 1px solid #000066;'
'background-color: white; color: #000066; font-size: 0.8em;width:100%'
'transform: translate(0px, -24px); padding: 0.6em; border-radius: 0.5em;')
return result
else:
return df
def ibsstyle(self,df,description_dict = None, dec=2,
transpose=None,use_tooltip=True,percent=False):
'''
Args:
df (TYPE): Dataframe.
description_dict (TYPE, optional): Defaults to None then the var_description else this dict .
dec (TYPE, optional): decimals. Defaults to 2. Deciu
transpose (TYPE, optional): if Trus then rows are time else thje. Defaults to 0.
Returns:
TYPE: DESCRIPTION.
'''
# breakpoint()
# breakpoint()
if True: # self.in_notebook():
des = self.var_description if type(description_dict)==type(None) else description_dict
keys= set(des.keys())
def get_des(v):
if type(v) == str and '(' in v:
return des.get(v.split('(')[0],v)
else:
return des.get(v,v)
if type(transpose) == type(None):
xtranspose = (df.index[0] in self.lastdf.index)
else:
xtranspose = transpose
if any([i in keys for i in df.index]) :
tt = pd.DataFrame([[get_des(v) for t in df.columns] for v in df.index ],index=df.index,columns=df.columns)
else:
tt = pd.DataFrame([[get_des(v) for v in df.columns ]for t in df.index] ,index=df.index,columns=df.columns)
xdec = f'{dec}'
xpct = '%' if percent else ''
result = df.style\
.set_sticky(axis='columns')\
.set_sticky(axis='index')\
.set_table_attributes('class="table"')
if any(df.dtypes == 'object'):
result = result
else:
result = result.format('{:,.'+xdec+'f}'+xpct)
if use_tooltip:
try:
result=result.set_tooltips(tt, props='visibility: hidden; position: absolute; z-index: 1; border: 1px solid #000066;'
'background-color: white; color: #000066; font-size: 0.8em;width:100%'
'transform: translate(0px, -24px); padding: 0.6em; border-radius: 0.5em;')
except:
...
return result
else:
return df
[docs]
def ibsstyle(self,df,description_dict = None, dec=2,
transpose=None,use_tooltip=True,percent=False):
'''
Args:
df (TYPE): Dataframe.
description_dict (TYPE, optional): Defaults to None then the var_description else this dict .
dec (TYPE, optional): decimals. Defaults to 2. Deciu
transpose (TYPE, optional): if Trus then rows are time else thje. Defaults to 0.
Returns:
TYPE: DESCRIPTION.
'''
# breakpoint()
# breakpoint()
if True: # self.in_notebook():
des = self.var_description if type(description_dict)==type(None) else description_dict
keys= set(des.keys())
def get_des(v):
if type(v) == str and '(' in v:
return des.get(v.split('(')[0],v)
else:
return des.get(v,v)
if type(transpose) == type(None):
xtranspose = (df.index[0] in self.lastdf.index)
else:
xtranspose = transpose
if any([i in keys for i in df.index]) :
tt = pd.DataFrame([[get_des(v) for t in df.columns] for v in df.index ],index=df.index,columns=df.columns)
else:
tt = pd.DataFrame([[get_des(v) for v in df.columns ]for t in df.index] ,index=df.index,columns=df.columns)
xdec = f'{dec}'
xpct = '%' if percent else ''
styles = [
{'selector': '.row_heading, .corner',
'props': [('position', 'sticky'), ('left', '0'), ('background', 'white')]},
{'selector': '.col_heading',
'props': [('position', 'sticky'), ('top', '0'), ('background', 'white')]}
]
result = df.style.set_table_styles(styles)
if any(df.dtypes == 'object'):
result = result
else:
result = result.format('{:,.'+xdec+'f}'+xpct+' ')
if use_tooltip:
try:
result=result.set_tooltips(tt, props='visibility: hidden; position: absolute; z-index: 1; border: 1px solid #000066;'
'background-color: white; color: #000066; font-size: 0.8em;width:100%'
'transform: translate(0px, -24px); padding: 0.6em; border-radius: 0.5em;')
except:
...
return result
else:
return df
[docs]
@staticmethod
def compstyle(df):
'''
returns a styled dataframe of complex numners
Parameters
----------
df : dataframe .
Returns
-------
None.
'''
out = df.style.format(lambda v: ' ' if abs(v) <= 0.00000000001 else
( f'{v.real:.2f}' if v.imag == 0 else f'{abs(v):.2f} ~ {v:.2f}' ))
return out
##xx = mpak.ibsstyle(mpak.get_att_pct('PAKNYGDPMKTPKN',lag=True,threshold=0),dec=0,percent=1)
##display(HTML("<div style='min-width: 80%; max-width: 600%; width: 600%; overflow: auto;'>" +
# xx.to_html() +
# "</div>"))
[docs]
def write_eq(self, name='My_model.fru', lf=True):
''' writes the formulas to file, can be input into model
lf=True -> new lines are put after each frml '''
with open(name, 'w') as out:
outfrml = self.equations.replace(
'$', '$\n') if lf else self.equations
out.write(outfrml)
[docs]
def print_eq(self, varnavn, data='', start='', end=''):
#from pandabank import get_var
'''Print all variables that determines input variable (varnavn)
optional -- enter period and databank to get var values for chosen period'''
print_per = self.smpl(start, end, data)
minliste = [(term.var, term.lag if term.lag else '0')
for term in self.allvar[varnavn]['terms'] if term.var]
print(self.allvar[varnavn]['frml'])
print('{0:50}{1:>5}'.format('Variabel', 'Lag'), end='')
print(''.join(['{:>20}'.format(str(per)) for per in print_per]))
for var, lag in sorted(set(minliste), key=lambda x: minliste.index(x)):
endoexo = 'E' if self.allvar[var]['endo'] else 'X'
print(endoexo+': {0:50}{1:>5}'.format(var, lag), end='')
print(
''.join(['{:>20.4f}'.format(data.loc[per+int(lag), var]) for per in print_per]))
print('\n')
return
[docs]
def print_eq_values(self, varname, databank=None, all=False, dec=1, lprint=1, per=None):
''' for an endogeneous variable, this function prints out the frml and input variale
for each periode in the current_per.
The function takes special acount of dataframes and series '''
res = self.get_eq_values(
varname, showvar=True, databank=databank, per=per)
out = ''
if type(res) != type(None):
varlist = res.index.tolist()
maxlen = max(len(v) for v in varlist)
out += f'\nCalculations of {varname} \n{self.allvar[varname]["frml"]}'
for per in res.columns:
out += f'\n\nLooking at period:{per}'
for v in varlist:
this = res.loc[v, per]
if type(this) == pd.DataFrame:
vv = this if all else this.loc[(this != 0.0).any(
axis=1), (this != 0.0).any(axis=0)]
out += f'\n: {v:{maxlen}} = \n{vv.to_string()}\n'
elif type(this) == pd.Series:
ff = this.astype('float')
vv = ff if all else ff.iloc[ff.nonzero()[0]]
out += f'\n{v:{maxlen}} = \n{ff.to_string()}\n'
else:
out += f'\n{v:{maxlen}} = {this:>20}'
if lprint:
print(out)
else:
return out
[docs]
def print_all_eq_values(self, databank=None, dec=1):
for v in self.solveorder:
self.print_eq_values(v, databank, dec=dec)
[docs]
def print_eq_mul(self, varnavn, grund='', mul='', start='', end='', impact=False):
#from pandabank import get_var
'''Print all variables that determines input variable (varnavn)
optional -- enter period and databank to get var values for chosen period'''
grund.smpl(start, end)
minliste = [[term.var, term.lag if term.lag else '0']
for term in self.allvar[varnavn]['terms'] if term.var]
print(self.allvar[varnavn]['frml'])
print('{0:50}{1:>5}'.format('Variabel', 'Lag'), end='')
for per in grund.current_per:
per = str(per)
print('{:>20}'.format(per), end='')
print('')
diff = mul.data-grund.data
endo = minliste[0]
for item in minliste:
target_column = diff.columns.get_loc(item[0])
print('{0:50}{1:>5}'.format(item[0], item[1]), end='')
for per in grund.current_per:
target_index = diff.index.get_loc(per) + int(item[1])
tal = diff.iloc[target_index, target_column]
tal2 = tal*self.diffvalue_d3d if impact else 1
print('{:>20.4f}'.format(tal2), end='')
print(' ')
[docs]
def print_all_equations(self, inputdata, start, end):
'''Print values and formulas for alle equations in the model, based input database and period \n
Example: stress.print_all_equations(bankdata,'2013Q3')'''
for var in self.solveorder:
# if var.find('DANSKE')<>-2:
self.print_eq(var, inputdata, start, end)
print('\n' * 3)
[docs]
def print_lister(self):
''' prints the lists used in defining the model '''
for i in self.lister:
print(i)
for j in self.lister[i]:
print(' ' * 5, j, '\n', ' ' * 10,
[xx for xx in self.lister[i][j]])
[docs]
def keep_print(self, pat='*', start='', end='', start_ofset=0, end_ofset=0, diff=True):
""" prints variables from experiments look at keep_get_dict for options
"""
try:
res = self.keep_get_dict(
pat, start, end, start_ofset, end_ofset, diff)
for v, outdf in res.items():
if diff:
print(f'\nVariable {v} Difference to first column')
else:
print(f'\nVariable {v}')
print(outdf)
except:
print('no keept solutions')
[docs]
def keep_get_df(self, pat='*'):
if len(self.keep_solutions) == 0:
print('No keept solutions')
return
allvars = list({c for k, df in self.keep_solutions.items()
for c in df.columns})
vars = self.list_names(allvars, pat)
res = []
for v in vars:
outv = pd.concat([solution.loc[self.current_per, v]
for solver, solution in self.keep_solutions.items()], axis=1)
outv.columns = pd.MultiIndex.from_tuples(
[(v, k) for k in self.keep_solutions.keys()])
res.append(outv)
out = pd.concat(res, axis=1)
out.columns.names = ('Variable', 'Experiment')
return out
# def var_get_df(self, pat='*', start='', end='', start_ofset=0, end_ofset=0):
# varlist = self.vlist(pat)
# res = {}
# with self.set_smpl(start, end) as a, self.set_smpl_relative(start_ofset, end_ofset):res = []
# res['basedf'] = self.basedf.loc[self.current_per,varlist]
# res['lastdf'] = self.lastdf.loc[self.current_per,varlist]
# return out
[docs]
def keep_var_dict(self, pat='*', start='', end='', start_ofset=0, end_ofset=0, diff=False,trans=True):
"""
Returns a dict of the keept experiments. Key is the scrnario names, values are a dataframe with values for each variable
Args:
pat (TYPE, optional): variable selection. Defaults to '*'.
start (TYPE, optional): start period. Defaults to ''.
end (TYPE, optional): end period. Defaults to ''.
start_ofset (TYPE, optional): start ofset period. Defaults to 0.
end_ofset (TYPE, optional): end ofste period. Defaults to 0.
Returns:
res (dictionary): a dict with a dataframe for each experiment
"""
if len(self.keep_solutions) == 0:
print('No keept solutions')
return
allvars = list({c for k, df in self.keep_solutions.items()
for c in df.columns})
vars = self.vlist_names(allvars, pat)
res = {}
with self.set_smpl(start, end) as a, self.set_smpl_relative(start_ofset, end_ofset):
# print(f'{self.current_per[-1]=}')
for solver, solution in self.keep_solutions.items():
outv = pd.concat([solution.loc[self.current_per, v] for v in vars ], axis=1)
if trans:
outv.columns = [self.var_description[v] for v in vars]
outv.columns.names = ['Variable']
res[solver] = outv
return res
[docs]
def keep_get_dict(self, pat='*', start='', end='', start_ofset=0, end_ofset=0, diff=False,trans=False):
"""
Returns a dict of the keept experiments. Key is the variable names, values are a dataframe with variable values for each experiment
Args:
pat (TYPE, optional): variable selection. Defaults to '*'.
start (TYPE, optional): start period. Defaults to ''.
end (TYPE, optional): end period. Defaults to ''.
start_ofset (TYPE, optional): start ofset period. Defaults to 0.
end_ofset (TYPE, optional): end ofste period. Defaults to 0.
Returns:
res (dictionary): a dict with a dataframe for each experiment
"""
if len(self.keep_solutions) == 0:
print('No keept solutions')
return
allvars = list({c for k, df in self.keep_solutions.items()
for c in df.columns})
vars = self.vlist_names(allvars, pat)
res = {}
with self.set_smpl(start, end) as a, self.set_smpl_relative(start_ofset, end_ofset):
for v in vars:
# print(f'{v=}')
outv = pd.concat([solution.loc[self.current_per, v]
for solver, solution in self.keep_solutions.items()], axis=1)
outv.columns = [k for k in self.keep_solutions.keys()]
outv.columns.names = ['Experiment']
res[v] = outv.subtract(
outv.iloc[:, 0], axis=0) if diff else outv
return res
[docs]
def keep_get_plotdict(self, pat='*', start='', end='', start_ofset=0, end_ofset=0,
showtype='level',
diff=False, diffpct = False, by_var=1):
"""
returns
- a dict of {variable in pat :dfs scenarios as columnns } if by_var = 1
- a dict of {scenarios :dfs with variables in pat as columnns } if by_var = 1
Args:
pat (string, optional): Variable selection. Defaults to '*'.
start (TYPE, optional): start periode. Defaults to ''.
end (TYPE, optional): end periode. Defaults to ''.
showtype (str, optional): 'level','growth' or change' transformation of data. Defaults to 'level'.
diff (Logical, optional): if True shows the difference to the
first experiment or the first scenario. Defaults to False.
diffpct (logical,optional) : if True shows the difference in percent instead of level
mul (float, optional): multiplier of data. Defaults to 1.0.
Returns:
figs :a dict with data
"""
# breakpoint()
if diff and diffpct:
raise Exception("keep_plot can't be called with both diff and diffpct")
if by_var:
dfs = self.keep_get_dict(pat, start, end, start_ofset, end_ofset)
else:
dfs = self.keep_var_dict(pat, start, end, start_ofset, end_ofset)
if showtype == 'growth':
dfs = {v: vdf.pct_change()*100. for v, vdf in dfs.items()}
elif showtype == 'change':
dfs = {v: vdf.diff() for v, vdf in dfs.items()}
else:
...
if by_var:
if diff:
dfsres = {v: (vdf.subtract(
vdf.iloc[:, 0], axis=0)).iloc[:, 1:] for v, vdf in dfs.items()}
elif diffpct:
dfsres = {v: (vdf.subtract(
vdf.iloc[:, 0], axis=0).divide(
vdf.iloc[:, 0], axis=0)*100).iloc[:, 1:] for v, vdf in dfs.items()}
else:
dfsres = dfs
else:
first_scenario = dfs[list(dfs.keys())[0]]
if diff:
dfsres = {v: vdf - first_scenario
for i,(v, vdf) in enumerate(dfs.items()) if i >= 1}
elif diffpct:
dfsres = {v: (vdf/first_scenario-1.)*100
for i,(v, vdf) in enumerate(dfs.items()) if i >= 1}
else:
dfsres = dfs
assert not(diff and diffpct) ,"keep_plot can't be called with both diff and diffpct"
return dfsres
[docs]
def keep_get_plotdict_new(self, pat='*', start='', end='',
showtype='level',diftype='nodif', by_var = True):
"""
returns
- a dict of {variable in pat :dfs scenarios as columnns } if by_var = 1
- a dict of {scenarios :dfs with variables in pat as columnns } if by_var = 0
Args:
pat (string, optional): Variable selection. Defaults to '*'.
start (TYPE, optional): start periode. Defaults to ''.
end (TYPE, optional): end periode. Defaults to ''.
start_ofset (integer,optional ): relativ start ofset
end_ofset (integer,optional ): relativ end ofset
showtype (str, optional): 'level','growth', 'gdppct' or change' transformation of data. Defaults to 'level'.
diftype (str, optional): 'nodif','dif' or 'difpct' transformation of data.
"""
# breakpoint()
if showtype == 'growth':
start_ofset = -1
else:
start_ofset = 0
if by_var:
dfs = self.keep_get_dict(pat, start, end, start_ofset )
else:
dfs = self.keep_var_dict(pat, start, end, start_ofset, trans=False)
if showtype == 'growth':
dfs = {v: vdf.pct_change()*100. for v, vdf in dfs.items()}
elif showtype == 'qoq_ar':
dfs = {v: ((1.+vdf.pct_change())**4.-1.)*100. for v, vdf in dfs.items()}
elif showtype == 'change':
dfs = {v: vdf.diff() for v, vdf in dfs.items()}
elif showtype in ['level'] :
dfs = dfs
elif showtype == 'gdppct':
if by_var:
linevars = list(dfs.keys())
gdpvars = [self.findgdpvar(v) for v in linevars]
dfgdp = self.keep_get_dict(gdpvars, start, end, start_ofset,trans=False)
denominvalues = [dfgdp[v] for v in gdpvars ]
else:
firstdf = next(iter(dfs.values()))
linevars = list(firstdf.columns)
gdpvars = [self.findgdpvar(v) for v in linevars]
dfgdp = self.keep_var_dict(gdpvars, start, end, start_ofset,trans=False)
denominvalues = [df for df in dfgdp.values() ]
nummeratorvalues = [df for df in dfs.values() ]
in_percent = [n.values/ d.values * 100 for
n,d in zip(nummeratorvalues,denominvalues)]
indexnames = [df.index for df in dfs.values() ]
colnames = [df.columns for df in dfs.values() ]
dfs = { v: pd.DataFrame(ip,index=i, columns=c)
for ip,i,c,v in zip(in_percent,indexnames,colnames,dfs.keys() )}
else:
raise Exception('Wrong showtype in keep_get')
if by_var:
if diftype == 'dif':
dfsres = {v: (vdf.subtract(
vdf.iloc[:, 0], axis=0)).iloc[:, 1:] for v, vdf in dfs.items()}
elif diftype == 'difpct':
dfsres = {v: (vdf.subtract(
vdf.iloc[:, 0], axis=0).divide(
vdf.iloc[:, 0], axis=0)*100).iloc[:, 1:] for v, vdf in dfs.items()}
elif diftype in ['nodif', 'basedf', 'lastdf']: # for use in tabledf
dfsres = dfs
else:
raise Exception('Wrong diftype in keep_get-')
else:
first_scenario = dfs[list(dfs.keys())[0]]
if diftype == 'dif':
dfsres = {v: vdf - first_scenario
for i,(v, vdf) in enumerate(dfs.items()) if i >= 1}
elif diftype == 'difpct':
dfsres = {v: (vdf/first_scenario-1.)*100
for i,(v, vdf) in enumerate(dfs.items()) if i >= 1}
elif diftype == diftype in ['nodif', 'basedf', 'lastdf']: # for use in tabledf
dfsres = dfs
else:
raise Exception('Wrong diftype in keep_get-')
return dfsres
[docs]
def findgdpvar(self,varname):
'''Find matching GDP variable - for worldbank models
'''
gdpname = varname[:3]+'NYGDPMKTP'+varname[-2:]
if gdpname in self.endogene :
return gdpname
else:
raise Exception(f"{varname} don't have a matching GDP variable ")
[docs]
def plot_basis(self,var, df, title='', suptitle='', legend=True, scale='linear', trans={}, dec='',
ylabel='', yunit='', xlabel='',kind='line',multi=False):
ibs = {k:v for k,v in locals().items() if k not in {'self'}}
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(10, 6))
# print(ibs)
ax = self.plot_basis_ax(ax,**ibs)
# fig.suptitle(suptitle, fontsize=16)
return fig
[docs]
@staticmethod
def plot_basis_ax(ax,var, df, ax_title='', suptitle='', legend=True, scale='linear', trans={}, dec='',
ylabel='', yunit='', xlabel='',kind='line',samefig=False):
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.ticker as ticker
import numpy as np
import matplotlib.dates as mdates
import matplotlib.cbook as cbook
import seaborn as sns
from modelhelp import finddec
# print(f'{legend=}')
years = mdates.YearLocator() # every year
months = mdates.MonthLocator() # every month
years_fmt = mdates.DateFormatter('%Y')
if df.index.dtype == 'int64':
fmtr = ticker.StrMethodFormatter('{x:.0f}')
ax.xaxis.set_major_formatter(fmtr)
this_title = ax_title if ax_title else f'{trans.get(var,var)}'
ax.set_title(this_title, fontsize=14)
if legend:
ax.spines['right'].set_visible(False)
else:
ax.spines['right'].set_visible(False)
index_len = len(df.index)
xlabel__ = xlabel if xlabel or df.index.name == None else df.index.name
if 0:
#df.set_index(pd.date_range('2020-1-1',periods = index_len,freq = 'q'))
df.index = pd.date_range(
f'{df.index[0]}-1-1', periods=index_len, freq='Y')
ax.xaxis.set_minor_locator(years)
# df.index = [20 + i for i in range(index_len)]
ax.spines['top'].set_visible(False)
try:
xval = df.index.to_timestamp()
except:
xval = df.index
if kind == 'line':
for i, col in enumerate(df.loc[:, df.columns]):
yval = df[col]
ax.plot(xval, yval, label=col, linewidth=3.0)
if not legend:
x_pos = xval[-1]
ax.text(x_pos, yval.iloc[-1], f' {col}', fontsize=14)
elif kind == 'bar_stacked':
df.plot(ax=ax, stacked=True, kind='bar')
elif kind == 'bar':
df.plot(ax=ax, stacked=False, kind='bar')
else:
raise Exception(f'Error illegal kind:{kind}')
if legend and not samefig:
ax.legend()
# ax.xaxis.set_minor_locator(ticker.MultipleLocator(years))
# breakpoint()
if len(df.index) <= 5:
ax.set_xticks(df.index)
ax.set_yscale(scale)
ax.set_xlabel(f'{xlabel__}', fontsize=15)
ax.set_ylabel(f'{ylabel}', fontsize=15,
rotation='horizontal', ha='left', va='baseline')
xdec = str(dec) if dec else finddec(df)
ax.yaxis.set_label_coords(-0.1, 1.02)
if scale == 'linear' or 1:
pass
ax.yaxis.set_major_formatter(ticker.FuncFormatter(
lambda value, number: f'{value:,.{xdec}f} {yunit}'))
else:
formatter = ticker.ScalarFormatter()
formatter.set_scientific(False)
ax.yaxis.set_major_formatter(formatter)
return ax
[docs]
@staticmethod
def savefigs(figs, location=r'./graph', experimentname=r'experiment1', addname=r'', extensions=['svg'], xopen=False):
"""
Saves a collection of matplotlib figures to a specified directory.
Parameters:
- figs (dict): A dictionary of matplotlib figures where the key is the figure name.
- location (str): The base folder in which to save the charts. Defaults to './graph'.
- experimentname (str): A subfolder under 'location' where charts are saved. Defaults to 'experiment1'.
- addname (str): An additional name added to each figure filename. Defaults to an empty string.
- extensions (list): A list of string file extensions for saving the figures. Defaults to ['svg'].
- xopen (bool): If True, open the saved figure locations in a web browser.
Returns:
str: The absolute path to the folder where figures are saved.
Raises:
Exception: If the folder cannot be created or a figure cannot be saved/opened.
"""
folder = Path(location) / experimentname
try:
folder.mkdir(parents=True, exist_ok=True)
except Exception as e:
raise Exception(f"!!!Can't create folder: {folder}. Error: {e}")
for variable, fig in figs.items():
for extension in extensions:
filename = f'{variable}{addname}.{extension}'
file_path = folder / filename
try:
fig.savefig(file_path)
if xopen:
wb.open(f'file://{file_path.resolve()}', new=2)
except Exception as e:
raise Exception(f"!!!Can't save/open file: {file_path}. Error: {e}")
if xopen:
wb.open(f'file://{folder.absolute()}', new=1)
return f'Saved at: {folder}'.replace('\\','/')
[docs]
def keep_plot(self, pat='*', start='', end='', start_ofset=0, end_ofset=0, showtype='level',
diff=False, diffpct=False, mul=1.0, title='Scenarios', legend=False, scale='linear',
yunit='', ylabel='', dec='', trans=None, showfig=True, kind='line', size=(10, 6),
vline=None, savefig='', by_var=True, dataonly=False, samefig=False, ncol=2):
"""
Generate and display plots for specified scenarios and variables.
Args:
pat (str, optional): Pattern to select variables for plotting. Defaults to '*'.
start (str, optional): Start period for the plot. Defaults to '' .
end (str, optional): End period for the plot. Defaults to '' .
start_ofset (int, optional): Offset to shift the start period, relative to current. Defaults to 0.
end_ofset (int, optional): Offset to shift the end period, relative to current. Defaults to 0.
showtype (str, optional): Type of data transformation for plotting ('level', 'growth', 'change'). Defaults to 'level'.
diff (bool, optional): If True, shows the difference relative to the first experiment. Defaults to False.
diffpct (bool, optional): If True, shows the percentage difference relative to the first experiment. Defaults to False.
mul (float, optional): Multiplier to scale the data. Defaults to 1.0.
title (str, optional): Title of the plot. Defaults to 'Scenarios'.
legend (bool, optional): If True, displays a legend. Defaults to False.
scale (str, optional): Y-axis scale ('linear' or 'log'). Defaults to 'linear'.
yunit (str, optional): Units for the Y-axis. Defaults to ''.
ylabel (str, optional): Label for the Y-axis. Defaults to ''.
dec (str, optional): String format for decimal places. If '' then automatically determined. Defaults to ''.
trans (dict, optional): Alternative dictionary for translating variable names to desciptions. Defaults to None.
showfig (bool, optional): If True, displays the figure. Defaults to True.
kind (str, optional): Type of plot ('line', 'bar', etc.). Defaults to 'line'.
size (tuple, optional): Figure size as (width, height). Defaults to (10, 6).
vline (list, optional): List of tuples (time, text) for vertical lines in the plot. vline is persistent, to reset vline=None. Defaults to an empty list.
by_var (bool, optional): If True, each line represents a scenario, else each line represents a variable. Defaults to True.
dataonly (bool, optional): If True, only the dataframes are returned, no plot is generated. Defaults to False.
samefig (bool, optional): If True, all plots are displayed in the same figure. Defaults to False.
ncol (int, optional): Number of columns for subplots when using samefig. Defaults to 2.
Returns:
dict: A dictionary of Matplotlib figures, with keys being the variable names and values being the figure objects.
Raises:
ZeroDivisionError: If no kept solution is available for plotting.
"""
# Function implementation...
# Function implementation...
plt.close('all')
plt.ioff()
# print(f'{self.current_per[-1]=}')
if not len(self.keep_solutions):
raise Exception('No keept solution')
dfsres = self.keep_get_plotdict(pat=pat, start=start, end=end,
start_ofset = start_ofset, end_ofset = end_ofset,
showtype=showtype,
diff=diff, diffpct = diffpct, by_var=by_var)
if dataonly:
return dfsres
aspct = ' as pct ' if diffpct else ' '
dftype = showtype.capitalize()
xtrans = self.var_description if type(trans) == type(None) else trans
number = len(dfsres)
if samefig:
...
xcol = ncol
xrow=-((-number )//ncol)
figsize = (xcol*size[0],xrow*size[1])
# print(f'{size=} {figsize=}')
fig = plt.figure(figsize=figsize)
#gs = gridspec.GridSpec(xrow + 1, xcol, figure=fig) # One additional row for the legend
row_heights = [1] * xrow + [0.5] # Assuming equal height for all plot rows, and half for the legend
if legend:
row_heights = row_heights+ [0.5]
extra_row = 1
row_heights = [1] * xrow + [0.5] # Assuming equal height for all plot rows, and half for the legend
else:
extra_row = 0
row_heights = [1] * xrow # Assuming equal height for all plot rows,
gs = gridspec.GridSpec(xrow + extra_row , xcol, figure=fig, height_ratios=row_heights)
fig.set_constrained_layout(True)
# Create axes for the plots
axes = [fig.add_subplot(gs[i, j]) for i in range(xrow) for j in range(xcol)]
if legend:
legend_ax = fig.add_subplot(gs[-1, :]) # Span the legend axis across the bottom
figs = {'onefig' : fig}
else:
...
figs_and_ax = {v : plt.subplots(figsize=size) for v in dfsres.keys()}
figs = {v : fig for v,(fig,ax) in figs_and_ax.items() }
axes = [ ax for fig,ax in figs_and_ax.values() ]
for i,(v, df) in enumerate(dfsres.items()):
ax_title= (f'Difference{aspct}to "{df.columns[0] if not by_var else list(self.keep_solutions.keys())[0] }" for {dftype}:' if (diff or diffpct) else f'{dftype}:')+f'{xtrans.get(v,v)}'
self.plot_basis_ax(axes[i], v , df*mul, legend=legend,
scale=scale, trans=xtrans,
ax_title=ax_title,
yunit=yunit,
ylabel='Percent' if (showtype == 'growth' or diffpct == True )else ylabel,
xlabel='',kind = kind,samefig=samefig,
dec=2 if (showtype == 'growth' or diffpct) and not dec else dec)
for ax in axes[number:]:
ax.set_visible(False)
if samefig:
fig.suptitle(title ,fontsize=20)
if legend:
handles, labels = axes[0].get_legend_handles_labels() # Assuming the first ax has the handles and labels
legend_ax.legend(handles, labels, loc='center', ncol=3 if True else len(labels), fontsize='large')
legend_ax.axis('off') # Hide the axis
if type(vline) == type(None): # to delete vline
if hasattr(self, 'vline'):
del self.vline
else:
if vline or hasattr(self, 'vline'):
if vline:
self.vline = vline
for xtime, text in self.vline:
model.keep_add_vline(figs, xtime, text)
if showfig:
...
for f in figs.values():
display(f)
plt.close(f)
plt.ion()
return figs
[docs]
@staticmethod
def keep_add_vline(figs, time, text=' Calibration time'):
''' adds a vertical line with text to figs a dict with matplotlib figures) from keep_plot'''
# breakpoint()
for keep, fig in figs.items():
for ax in fig.axes:
ymax =ax.get_ylim()[1]
try:
ax.axvline(time, linewidth=3, color='r', ls='dashed')
ax.annotate(text, xy=(time, ymax),
fontsize=13, va='top')
except:
ax.axvline(pd.to_datetime(
time), linewidth=3, color='r', ls='dashed')
ax.annotate(
text, xy=(pd.to_datetime(time), ymax), fontsize=13, va='top')
[docs]
def keep_viz(self, pat='*', smpl=('', ''), selectfrom={}, legend=1, dec='', use_descriptions=True,
select_width='', select_height='200px', vline=[]):
"""
Plots the keept dataframes
Args:
pat (str, optional): a string of variables to select pr default. Defaults to '*'.
smpl (tuple with 2 elements, optional): the selected smpl, has to match the dataframe index used. Defaults to ('','').
selectfrom (list, optional): the variables to select from, Defaults to [] -> all endogeneous variables .
legend (bool, optional)c: DESCRIPTION. legends or to the right of the curve. Defaults to 1.
dec (string, optional): decimals on the y-axis. Defaults to '0'.
use_descriptions : Use the variable descriptions from the model
Returns:
None.
self.keep_wiz_figs is set to a dictionary containing the figures. Can be used to produce publication
quality files.
"""
from ipywidgets import interact, Dropdown, Checkbox, IntRangeSlider, SelectMultiple, Layout
from ipywidgets import interactive, ToggleButtons, SelectionRangeSlider, RadioButtons
from ipywidgets import interactive_output, HBox, VBox, link, Dropdown,Output
minper = self.lastdf.index[0]
maxper = self.lastdf.index[-1]
options = [(ind, nr) for nr, ind in enumerate(self.lastdf.index)]
with self.set_smpl(*smpl):
show_per = self.current_per[:]
init_start = self.lastdf.index.get_loc(show_per[0])
init_end = self.lastdf.index.get_loc(show_per[-1])
defaultvar = self.vlist(pat)
_selectfrom = [s.upper() for s in selectfrom] if selectfrom else sorted(
list(list(self.keep_solutions.values())[0].columns))
var_maxlen = max(len(v) for v in _selectfrom)
if use_descriptions and self.var_description:
select_display = [
f'{v} :{self.var_description[v]}' for v in _selectfrom]
defaultvar = [
f'{v} :{self.var_description[v]}' for v in self.vlist(pat)]
width = select_width if select_width else '90%'
else:
select_display = [fr'{v}' for v in _selectfrom]
defaultvar = [fr'{v}' for v in self.vlist(pat)]
width = select_width if select_width else '50%'
def explain(i_smpl, selected_vars, diff, showtype, scale, legend):
vars = ' '.join(v.split(' ', 1)[0] for v in selected_vars)
smpl = (self.lastdf.index[i_smpl[0]], self.lastdf.index[i_smpl[1]])
if type(diff) == str:
diffpct = True
ldiff = False
else:
ldiff = diff
diffpct = False
# print(ldiff,diffpct)
with self.set_smpl(*smpl):
self.keep_wiz_figs = self.keep_plot(vars, diff=ldiff, diffpct = diffpct, scale=scale, showtype=showtype,
legend=legend, dec=dec, vline=vline)
# plt.show()
description_width = 'initial'
description_width_long = 'initial'
keep_keys = list(self.keep_solutions.keys())
keep_first = keep_keys[0]
# breakpoint()
i_smpl = SelectionRangeSlider(value=[init_start, init_end], continuous_update=False, options=options, min=minper,
max=maxper, layout=Layout(width='75%'), description='Show interval')
selected_vars = SelectMultiple(value=defaultvar, options=select_display, layout=Layout(width=width, height=select_height, font="monospace"),
description='Select one or more', style={'description_width': description_width})
selected_vars2 = SelectMultiple(value=defaultvar, options=select_display, layout=Layout(width=width, height=select_height),
description='Select one or more', style={'description_width': description_width})
diff = RadioButtons(options=[('No', False), ('Yes', True), ('In percent', 'pct')], description=fr'Difference to: "{keep_first}"',
value=False, style={'description_width': 'auto'}, layout=Layout(width='auto'))
# diff_select = Dropdown(options=keep_keys,value=keep_first, description = fr'to:')
showtype = RadioButtons(options=[('Level', 'level'), ('Growth', 'growth')],
description='Data type', value='level', style={'description_width': description_width})
scale = RadioButtons(options=[('Linear', 'linear'), ('Log', 'log')], description='Y-scale',
value='linear', style={'description_width': description_width})
# legend = ToggleButtons(options=[('Yes',1),('No',0)], description = 'Legends',value=1,style={'description_width': description_width})
legend = RadioButtons(options=[('Yes', 1), ('No', 0)], description='Legends', value=legend, style={
'description_width': description_width})
# breakpoint()
l = link((selected_vars, 'value'),
(selected_vars2, 'value')) # not used
select = HBox([selected_vars])
options1 = diff
options2 = HBox([scale, legend, showtype])
ui = VBox([select, options1, options2, i_smpl])
show = interactive_output(explain, {'i_smpl': i_smpl, 'selected_vars': selected_vars, 'diff': diff, 'showtype': showtype,
'scale': scale, 'legend': legend})
# display(ui, show)
display(ui)
display(show)
return
[docs]
def keep_viz_prefix(self, pat='*', smpl=('', ''), selectfrom={}, legend=1, dec='', use_descriptions=True,
select_width='', select_height='200px', vline=[],prefix_dict={},add_var_name=False,short=False):
"""
Plots the keept dataframes
Args:
pat (str, optional): a string of variables to select pr default. Defaults to '*'.
smpl (tuple with 2 elements, optional): the selected smpl, has to match the dataframe index used. Defaults to ('','').
selectfrom (list, optional): the variables to select from, Defaults to [] -> all keept variables .
legend (bool, optional)c: DESCRIPTION. legends or to the right of the curve. Defaults to 1.
dec (string, optional): decimals on the y-axis. Defaults to '0'.
use_descriptions : Use the variable descriptions from the model
Returns:
None.
self.keep_wiz_figs is set to a dictionary containing the figures. Can be used to produce publication
quality files.
"""
from ipywidgets import interact, Dropdown, Checkbox, IntRangeSlider, SelectMultiple, Layout
from ipywidgets import Select
from ipywidgets import interactive, ToggleButtons, SelectionRangeSlider, RadioButtons
from ipywidgets import interactive_output, HBox, VBox, link, Dropdown,Output
minper = self.lastdf.index[0]
maxper = self.lastdf.index[-1]
options = [(ind, nr) for nr, ind in enumerate(self.lastdf.index)]
with self.set_smpl(*smpl):
show_per = self.current_per[:]
init_start = self.lastdf.index.get_loc(show_per[0])
init_end = self.lastdf.index.get_loc(show_per[-1])
keepvar = sorted (list(self.keep_solutions.values())[0].columns)
defaultvar = [v for v in self.vlist(pat) if v in keepvar]
_selectfrom = [s.upper() for s in selectfrom] if selectfrom else keepvar
gross_selectfrom = [(f'{(v+" ") if add_var_name else ""}{self.var_description[v] if use_descriptions else v}',v) for v in _selectfrom]
width = select_width if select_width else '50%' if use_descriptions else '50%'
def explain(i_smpl, selected_vars, diff, showtype, scale, legend):
vars = ' '.join(v for v in selected_vars)
smpl = (self.lastdf.index[i_smpl[0]], self.lastdf.index[i_smpl[1]])
if type(diff) == str:
diffpct = True
ldiff = False
else:
ldiff = diff
diffpct = False
with self.set_smpl(*smpl):
self.keep_wiz_figs = self.keep_plot(vars, diff=ldiff, diffpct = diffpct, scale=scale, showtype=showtype,
legend=legend, dec=dec, vline=vline)
plt.show()
description_width = 'initial'
description_width_long = 'initial'
keep_keys = list(self.keep_solutions.keys())
keep_first = keep_keys[0]
select_prefix = [(c,iso) for iso,c in prefix_dict.items()]
# breakpoint()
i_smpl = SelectionRangeSlider(value=[init_start, init_end], continuous_update=False, options=options, min=minper,
max=maxper, layout=Layout(width='75%'), description='Show interval')
selected_vars = SelectMultiple(value=defaultvar, options=gross_selectfrom, layout=Layout(width=width, height=select_height, font="monospace"),
description='Select one or more', style={'description_width': description_width})
diff = RadioButtons(options=[('No', False), ('Yes', True), ('In percent', 'pct')], description=fr'Difference to: "{keep_first}"',
value=False, style={'description_width': 'auto'}, layout=Layout(width='auto'))
showtype = RadioButtons(options=[('Level', 'level'), ('Growth', 'growth')],
description='Data type', value='level', style={'description_width': description_width})
scale = RadioButtons(options=[('Linear', 'linear'), ('Log', 'log')], description='Y-scale',
value='linear', style={'description_width': description_width})
legend = RadioButtons(options=[('Yes', 1), ('No', 0)], description='Legends', value=legend, style={
'description_width': description_width})
# breakpoint()
def get_prefix(g):
try:
current_suffix = {v[len(g['old'][0]):] for v in selected_vars.value}
except:
current_suffix = ''
new_prefix = g['new']
selected_prefix_var = [(des,variable) for des,variable in gross_selectfrom
if any([variable.startswith(n) for n in new_prefix])]
selected_vars.options = selected_prefix_var
if current_suffix:
new_selection = [f'{n}{c}' for c in current_suffix for n in new_prefix
if f'{n}{c}' in {s for p,s in selected_prefix_var}]
selected_vars.value = new_selection
# print(f"{new_selection=}{current_suffix=}{g['old']=}")
else:
selected_vars.value = [selected_prefix_var[0][1]]
if len(prefix_dict):
selected_prefix = SelectMultiple(value=[select_prefix[0][1]], options=select_prefix,
layout=Layout(width='25%', height=select_height, font="monospace"),
description='')
selected_prefix.observe(get_prefix,names='value',type='change')
select = HBox([selected_vars,selected_prefix])
get_prefix({'new':select_prefix[0]})
else:
select = VBox([selected_vars])
options1 = HBox([diff]) if short >=2 else HBox([diff,legend])
options2 = HBox([scale, showtype])
if short:
vui = [select, options1, i_smpl]
else:
vui = [select, options1, options2, i_smpl]
vui = vui[:-1] if short >= 2 else vui
ui = VBox(vui)
show = interactive_output(explain, {'i_smpl': i_smpl, 'selected_vars': selected_vars, 'diff': diff, 'showtype': showtype,
'scale': scale, 'legend': legend})
# display(ui, show)
display(ui)
display(show)
return
[docs]
def keep_show(self,*args,**kwargs):
import modelwidget_input
widget = modelwidget_input.keep_plot_widget(self,*args,**kwargs)
# print(f'widget called {kwargs=}')
return widget
[docs]
def df_show(self,*args,**kwargs):
import modelwidget_input
widget = modelwidget_input.keep_plot_widget(self,*args,switch=True,**kwargs)
return widget
[docs]
def df_plot(self,*args,**kwargs):
with self.keepswitch(switch=True):
figs = self.keep_plot(*args,**kwargs)
return figs
[docs]
@staticmethod
def display_toc(text='**Jupyter notebooks**',folder='.',all=False,nocwd=False):
'''In a jupyter notebook this function displays a clickable table of content of all
jupyter notebooks in this and sub folders'''
from IPython.display import display, Markdown, HTML
from pathlib import Path
display(Markdown(text))
for dir in sorted(Path(folder).glob('**')):
# print(f'{dir=} {nocwd=}')
if len(dir.parts) and str(dir.parts[-1]).startswith('.'):
continue
if dir == Path('.') and nocwd :
continue
filelist = (list(dir.glob('*readme.ipynb'))
+ [f for f in sorted(dir.glob('*.ipynb'))
if not f.stem.endswith('readme')])
for i, notebook in enumerate(filelist):
# print(notebook)
if (not all) and (notebook.name.startswith('test') or notebook.name.startswith('Overview')):
continue
if i == 0:
blanks = ''.join(
[' ']*len(dir.parts))
if len(dir.parts):
display(HTML(f'{blanks}<b>{str(dir)}</b>'))
else:
display(
HTML(f'{blanks}<b>{str(Path.cwd().parts[-1])} (.)</b>'))
name = notebook.name.split('.')[0]
# print(notebook)
display(HTML(
f' {blanks} <a href="{notebook}" target="_blank">{name}</a>'))
[docs]
@staticmethod
def display_toc_this(pat='*',text='**Jupyter notebooks**',path='.',ext='ipynb',showext=False):
'''In a jupyter notebook this function displays a clickable table of content in the folder pat with name in path'''
from IPython.display import display, Markdown, HTML
from pathlib import Path
display(Markdown(text))
dir = Path(path)
# print(dir,':')
for fname in sorted(dir.glob(pat+'.'+ext)):
name = fname.name if showext else fname.name.split('.')[0]
display(HTML(
f' <a href="{fname}" target="_blank">{name}</a>'))
[docs]
@staticmethod
def widescreen():
'''Makes a jupyter notebook use all the avaiable real estate
'''
from IPython.display import HTML, display
display(HTML(data="""
<style>
div#notebook-container { width: 95%; }
div#menubar-container { width: 65%; }
div#maintoolbar-container { width: 99%; }
</style>
"""))
[docs]
@staticmethod
def modelflow_auto(run=True):
'''In a jupyter notebook this function activate autorun of the notebook.
Also it makes Jupyter use a larger portion of the browser width
The function should be run before the notebook is saved, and the output should not be cleared
'''
if not run:
return
try:
from IPython.display import HTML, display
display(HTML(data="""
<style>
div#notebook-container { width: 95%; }
div#menubar-container { width: 65%; }
div#maintoolbar-container { width: 99%; }
</style>
"""))
display(HTML("""\
<script>
// AUTORUN ALL CELLS ON NOTEBOOK-LOAD!
require(
['base/js/namespace', 'jquery'],
function(jupyter, $) {
$(jupyter.events).on('kernel_ready.Kernel', function () {
console.log('Auto-running all cells-below...');
jupyter.actions.call('jupyter-notebook:run-all-cells-below');
jupyter.actions.call('jupyter-notebook:save-notebook');
});
}
);
</script>"""))
except:
print('modelflow_auto not run')
import modelwidget_input
Display_Mixin.keep_plot_widget.__doc__ = modelwidget_input.keep_plot_widget.__doc__
Display_Mixin.keep_show.__doc__ = modelwidget_input.keep_plot_widget.__doc__
Display_Mixin.df_plot.__doc__ = Display_Mixin.keep_plot.__doc__
[docs]
class Json_Mixin():
'''This mixin class can dump a model and solution
as json serialiation to a file.
allows the precooking of a model and solution, so
a user can use a model without specifying it in
a session.
'''
[docs]
def modeldump_base(self, outfile='',keep=False):
'''Dumps a model and its lastdf to a json file
if keep=True the model.keep_solutions will alse be dumped'''
# print('dump ready')
def ibs_to_json(series):
def period_to_dict(period):
# This function converts a pandas Period object to a dictionary similar to the JSON structure
# provided in pandas versions before 2.0 when using to_json()
return {
"day": period.day,
"day_of_week": period.day_of_week,
"day_of_year": period.day_of_year,
"dayofweek": period.day_of_week,
"dayofyear": period.day_of_year,
"days_in_month": period.days_in_month,
"daysinmonth": period.days_in_month,
"end_time": int(period.end_time.timestamp() * 1000), # Convert to milliseconds
"freqstr": period.freqstr,
"hour": period.hour,
"is_leap_year": period.is_leap_year,
"minute": period.minute,
"month": period.month,
"ordinal": period.ordinal,
"quarter": period.quarter,
"qyear": period.qyear,
"second": period.second,
"start_time": int(period.start_time.timestamp() * 1000), # Convert to milliseconds
"week": period.week,
"weekday": period.weekday,
"weekofyear": period.week,
"year": period.year
}
# Creating a dictionary from the series where each period is converted to a dictionary
result_dict = {str(idx): period_to_dict(period) for idx, period in enumerate(series)}
# Converting the dictionary to a JSON string
return json.dumps(result_dict)
# to manage error in pandas 2.
try:
current_per_json = pd.Series(self.current_per).to_json()
except:
print( 'Pandas 2. error handled ')
current_per_json = ibs_to_json(pd.Series(self.current_per))
dumpjson = {
'version': '1.00',
'frml': self.equations,
'lastdf': self.lastdf.to_json(double_precision=15),
'current_per': current_per_json ,
'modelname': self.name,
'oldkwargs': self.oldkwargs,
'var_description': self.var_description,
'equations_latex': self.equations_latex ,
'keep_solutions': {k:v.to_json(double_precision=15) for k,v in self.keep_solutions.items()} if keep else {},
'wb_MFMSAOPTIONS': self.wb_MFMSAOPTIONS if hasattr(self, 'wb_MFMSAOPTIONS') else '',
'var_groups' : self.var_groups,
'reports' : self.reports ,
'model_description' : self.model_description,
'eviews_dict' : self.eviews_dict,
}
# print('dump ready to write')
if outfile != '':
pathname = Path(outfile)
pathname.parent.mkdir(parents=True, exist_ok=True)
with open(outfile, 'wt') as f:
json.dump(dumpjson, f)
else:
return json.dumps(dumpjson)
[docs]
@classmethod
def modelload(cls, infile, funks=[], run=False, keep_json=False,
default_url=r'https://raw.githubusercontent.com/IbHansen/modelflow-manual/main/model_repo/',**kwargs):
'''
Parameters
----------
infile : A file name or an url with a .pcim file
funks : Functions to use in the resulting model
run : simulate the model with the saved time and options
keep_json : save a dict (self.json_keep) in the model instance
default_url : TYPE, optional
Where to look if the file is not avaiable. The default is r'https://raw.githubusercontent.com/IbHansen/modelflow-manual/main/model_repo/'.
**kwargs : These options are used by the simulation if run=True
Returns
-------
(<a model instance>,<a dataframe>) .
'''
'''Loads a model and an solution '''
import io
import urllib.request
def make_current_from_quarters(base, json_current_per):
''' Handle json for quarterly data to recover the right date index
'''
import datetime
start, end = json_current_per[[0, -1]]
start_per = datetime.datetime(
start['qyear'], start['month'], start['day'])
end_per = datetime.datetime(
end['qyear'], end['month'], end['day'])
current_dates = pd.period_range(
start_per, end_per, freq=start['freqstr'])
base_dates = pd.period_range(
base.index[0], base.index[-1], freq=start['freqstr'])
base.index = base_dates
return base, current_dates
pinfile = Path(nname:=infile.replace('\\','/'))
if not pinfile.suffix:
pinfile = pinfile.with_suffix('.pcim')
def read_file(pinfile):
try:
# First, try reading the file as a regular text file
with open(pinfile, 'rt', encoding='utf-8') as f:
out= f.read()
print(f'file read: {pinfile}')
return out
except UnicodeDecodeError:
# print(f'An UnicodeDecodeError occurs, it might be a gzip file')
pass
try:
# Try reading the file as a gzip file
with gzip.open(pinfile, 'rt', encoding='utf-8') as f:
out = f.read()
print(f'Zipped file read: {pinfile}')
return out
except (OSError, IOError):
# Handle the case where the file is neither plain text nor gzip
raise Exception( f"Error reading the file {nname}")
def read_from_url(url):
with urllib.request.urlopen(url) as response:
content = response.read() # Read the content once
try:
# Try to read as a gzip file first
with gzip.open(io.BytesIO(content), 'rt', encoding='utf-8') as f:
return f.read()
except (OSError, IOError):
# If not gzip, then read as regular file
return content.decode('utf-8')
if pinfile.exists():
json_string = read_file(pinfile)
else:
if infile.startswith(r'https:'):
urlfile = infile
else:
urlfile = (Path(default_url) / pinfile.name).as_posix().replace('https:/','https://')
print(f'Open file from URL: {urlfile}')
json_string = read_from_url(urlfile)
input = json.loads(json_string)
version = input['version']
frml = input['frml']
lastdf = pd.read_json(StringIO(input['lastdf']))
current_per = pd.read_json(StringIO(input['current_per']), typ='series').values
modelname = input['modelname']
# breakpoint()
mmodel = cls(frml, modelname=modelname, funks=funks,**kwargs)
mmodel.oldkwargs = input['oldkwargs']
mmodel.json_current_per = current_per
mmodel.set_var_description(input.get('var_description', {}))
mmodel.equations_latex = input.get('equations_latex', '')
if input.get('wb_MFMSAOPTIONS', None) : mmodel.wb_MFMSAOPTIONS = input.get('wb_MFMSAOPTIONS', None)
mmodel.keep_solutions = {k : pd.read_json(StringIO(jdf)) for k,jdf in input.get('keep_solutions',{}).items()}
mmodel.var_groups = input.get('var_groups', {})
mmodel.reports = input.get('reports',{} )
mmodel.model_description = input.get('model_description', '')
mmodel.eviews_dict = input.get('eviews_dict', {})
# mmodel.json_string = json_string
if keep_json:
mmodel.json_keep = input
try:
lastdf, current_per = make_current_from_quarters(
lastdf, current_per)
except Exception as e:
# print('makecurrent',e)
pass
if mmodel.model_description:
print(f'Model:{mmodel.model_description}')
if run:
if (start:= kwargs.get('start',False)) and (end:=kwargs.get('end',False)):
current_per = mmodel.smpl(start,end,lastdf)
# to avoid dublicate start and end
newkwargs = {k:v for k,v in kwargs.items() if not k in {'start','end'}}
else:
newkwargs = kwargs
res = mmodel(lastdf, current_per[0], current_per[-1], **newkwargs)
return mmodel, res
else:
mmodel.current_per = current_per
mmodel.basedf = lastdf.copy()
return mmodel, lastdf
[docs]
class Zip_Mixin():
'''This experimental class zips a dumped file '''
[docs]
def modeldump(self, file_path='',keep=False,zip_file=True):
pathname = Path(file_path)
if not pathname.suffix:
pathname = pathname.with_suffix('.pcim')
pathname.parent.mkdir(parents=True, exist_ok=True)
json_string = self.modeldump_base(keep=keep)
if zip_file:
with gzip.open(pathname , 'wt') as zipped_file:
zipped_file.write(json_string)
else:
with open(pathname, 'w') as file:
file.write(json_string)
[docs]
class Excel_Mixin():
'''This Mixin handels dumps and loads models into excel '''
[docs]
def modeldump_excel(self, file, fromfile='control.xlsm', keep_open=False):
'''
Dump model and dataframe to excel workbook
Parameters
----------
file : TYPE
filename.
keep_open : TYPE, optional
Keep the workbook open in excel after returning, The default is False.
Returns
-------
wb : TYPE
xlwings instance of workbook .
'''
thispath = Path(file)
# breakpoint()
if thispath.suffix.upper() == '.XLSM':
wb = xw.Book(thispath.parent / Path(fromfile))
else:
wb = xw.Book()
wb.sheets.add()
wb.app.screen_updating = 1
me.obj_to_sheet('frml', {v: self.allvar[v]['frml']
for v in sorted(self.allvar.keys()) if self.allvar[v]['endo']}, wb)
me.obj_to_sheet('var_description', dict(self.var_description) if len(
self.var_description) else {'empty': 'empty'}, wb)
me.obj_to_sheet('oldkwargs', self.oldkwargs, wb)
me.obj_to_sheet('modelname', {'name': self.name}, wb)
if hasattr(self, 'current_per'):
me.obj_to_sheet('current_per', me.indextrans(
self.current_per), wb, after='frml')
if hasattr(self, 'lastdf'):
me.df_to_sheet('lastdf', self.lastdf.loc[:, sorted(
self.allvar)], wb, after='frml')
wb.app.screen_updating = 1
try:
wb.save(Path(file).absolute())
except Exception as e:
wb.close()
print(f'{Path(file).absolute()} not saved\n', str(e))
return
if not keep_open:
wb.close()
return
return wb
[docs]
@classmethod
def modelload_excel(cls, infile='pak', funks=[], run=False, keep_open=False, **kwargs):
'''
Loads a model from a excel workbook dumped by :any:`modeldump_excel`
Args:
cls (TYPE): DESCRIPTION.
infile (TYPE, optional): DESCRIPTION. Defaults to 'pak'.
funks (TYPE, optional): DESCRIPTION. Defaults to [].
run (TYPE, optional): DESCRIPTION. Defaults to False.
keep_open (TYPE, optional): DESCRIPTION. Defaults to False.
**kwargs (TYPE): DESCRIPTION.
Returns:
mmodel (TYPE): DESCRIPTION.
res (TYPE): DESCRIPTION.
TYPE: DESCRIPTION.
'''
if isinstance(infile, xw.main.Book):
wb = infile
else:
wb = xw.Book(Path(infile).absolute())
wb.app.screen_updating = 0
frml = '\n'.join(f for f in me.sheet_to_dict(wb, 'frml').values())
modelname = me.sheet_to_dict(wb, 'modelname')['name']
var_description = me.sheet_to_dict(wb, 'var_description')
mmodel = cls(frml, modelname=modelname, funks=funks,
var_description=var_description)
mmodel.oldkwargs = me.sheet_to_dict(wb, 'oldkwargs')
try:
mmodel.current_per = me.sheet_to_df(wb, 'current_per').index
except:
pass
try:
lastdf = me.sheet_to_df(wb, 'lastdf')
except:
lastdf = pd.DataFrame()
if run:
res = mmodel(lastdf, **kwargs)
else:
res = lastdf
wb.app.screen_updating = 1
if not keep_open:
wb.close()
return mmodel, res, None
else:
return mmodel, res, wb
[docs]
class Solver_Mixin():
'''This Mixin handels the solving of models. '''
DEFAULT_relconv = 0.0000001
[docs]
def __call__(self, *args, **kwargs):
''' Runs a model.
Default a straight model is calculated by *xgenr* a simultaneous model is solved by *sim*
:sim: If False forces a model to be calculated (not solved) if True force simulation
:setbase: If True, place the result in model.basedf
:setlast: if False don't place the results in model.lastdf
if the modelproperty previousbase is true, the previous run is used as basedf.
Parameters
----------
sim : bool, optional
If False, forces a model to be calculated (not solved); if True, forces simulation.
The default behavior is determined by the model's properties.
setbase : bool, optional
If True, places the result in model.basedf. Default is determined by the model's state.
setlast : bool, optional
If False, doesn't place the results in model.lastdf. Default is True.
antal : not applicable, optional
This option is not valid. If provided, an assertion error is raised.
do_calc_add_factor : bool, optional
Determines whether to calculate the adjustment factor if the calc adjust model is present.
Default is True.
reset_options : bool, optional
If True, the previous options will be reset. Default is False.
save : bool, optional
If True, saves the current state. The default behavior is determined by the model's properties.
solver : str, optional
Specifies the solver to be used. The default solver is chosen based on the model's properties.
silent : bool, optional
If True, the solver runs silently without printing output to the console. Default is True.
cache_clear : bool, optional
If True, clears the cache after solving. Default is True.
keep : str, optional
If provided, keeps the solutions. The exact behavior depends on the 'keep_variables' option.
keep_variables : str or list of str, optional
Specifies which variables to keep if the 'keep' option is provided. Default is to keep all variables.
*args
Variable length argument list.
**kwargs
Arbitrary keyword arguments.
Returns
-------
outdf : pandas.DataFrame
The DataFrame containing the results of the model run.
Raises
------
AssertionError
If the 'antal' option is provided, as it is not a valid simulation option.
'''
if kwargs.get('antal', False):
assert 1 == 2, 'Antal is not a valid simulation option, Use max_iterations'
self.dumpdf = None
do_calc_add_factor = kwargs.get('do_calc_add_factor',True)
if kwargs.get('reset_options', False):
self.oldkwargs = {}
if hasattr(self, 'oldkwargs'):
newkwargs = {**self.oldkwargs, **kwargs}
else:
newkwargs = kwargs
self.oldkwargs = newkwargs.copy()
self.save = newkwargs.get('save', self.save)
if self.save:
if self.previousbase and hasattr(self, 'lastdf'):
self.basedf = self.lastdf.copy(deep=True)
if self.maxlead >= 1:
if self.normalized:
solverguess = 'newtonstack'
else:
solverguess = 'newtonstack_un_normalized'
else:
if self.normalized:
if self.istopo:
solverguess = 'xgenr'
else:
solverguess = 'sim'
else:
solverguess = 'newton_un_normalized'
solver = newkwargs.get('solver', solverguess)
silent = newkwargs.get('silent', True)
self.model_solver = getattr(self, solver)
# print(f'solver:{solver},solverkwargs:{newkwargs}')
# breakpoint()
outdf = self.model_solver(*args, **newkwargs)
# now calculate adjustment factors if the calc adjust model is there
if self.split_calc_add_factor and do_calc_add_factor:
# breakpoint()
outdf = self.calc_add_factor(outdf,silent)
# but only calculate if dummies are set
# if exogenizing factors we can calculate an adjust factor model
if newkwargs.get('cache_clear', True):
self.dekomp.cache_clear()
if newkwargs.get('keep', ''):
if newkwargs.get('keep_variables', ''):
keepvar = self.vlist(newkwargs.get('keep_variables', ''))
self.keep_solutions[newkwargs.get(
'keep', '')] = outdf.loc[:, keepvar].copy()
else:
self.keep_solutions[newkwargs.get('keep', '')] = outdf.copy()
if self.save:
if (not hasattr(self, 'basedf')) or newkwargs.get('setbase', False):
self.basedf = outdf.copy(deep=True)
if newkwargs.get('setlast', True):
self.lastdf = outdf.copy(deep=True)
return outdf
[docs]
def calc_add_factor(self,outdf,silent=True):
if (select:= outdf.loc[self.current_per,self.fix_dummy] != 0.0).any().any():
# breakpoint()
if not silent:
print('Running calc_adjust_model ')
print(f'Dummies set {self.find_fix_dummy_fixed(outdf)}')
self.calc_add_factor_model.current_per=self.current_per
out_add_factor = self.calc_add_factor_model(outdf,silent=silent) # calculate the adjustment factors
calc_add_factordf = out_add_factor.loc[self.current_per,self.fix_add_factor] # new adjust values
add_factordf = outdf.loc [self.current_per,self.fix_add_factor].copy() # old adjust values
new_add_factordf = add_factordf.mask(select.values,calc_add_factordf) # update the old adjust values with the new ones, only when dummy is != 0
outdf.loc[self.current_per,new_add_factordf.columns] = new_add_factordf # plug all adjust values into the result
return outdf
@property
def showstartnr(self):
self.findpos()
variabler = [x for x in sorted(self.allvar.keys())]
return {v: self.allvar[v]['startnr'] for v in variabler}
[docs]
def makelos(self, databank, ljit=0, stringjit=False,
solvename='sim', chunk=30, transpile_reset=False, newdata=False,
silent=True, **kwargs):
jitname = f'{self.name}_{solvename}_jit'
nojitname = f'{self.name}_{solvename}_nojit'
if solvename == 'sim':
solveout = partial(self.outsolve2dcunk, databank,
chunk=chunk, ljit=ljit, debug=kwargs.get('debug', 1))
elif solvename == 'sim1d':
solveout = partial(self.outsolve1dcunk, chunk=chunk, ljit=ljit, debug=kwargs.get(
'debug', 1), cache=kwargs.get('cache', 'False'))
elif solvename == 'newton':
solveout = partial(self.outsolve2dcunk, databank, chunk=chunk,
ljit=ljit, debug=kwargs.get('debug', 1), type='res')
elif solvename == 'res':
solveout = partial(self.outsolve2dcunk, databank, chunk=chunk,
ljit=ljit, debug=kwargs.get('debug', 1), type='res')
if not silent:
if newdata or transpile_reset or (ljit and not hasattr(self, f'pro_{jitname}')):
print('New data or transpile_reset')
print(f'Create compiled solving function for {self.name}')
print(f'{ljit=} {stringjit=} {transpile_reset=} {hasattr(self, f"pro_{jitname}")=}')
else:
print('Reusing the solver as no new data ')
if ljit:
if newdata or transpile_reset or not hasattr(self, f'pro_{jitname}'):
if stringjit:
if not silent:
print(f'now makelos makes a {solvename} jit function')
self.make_los_text_jit = solveout()
# creates the los function
exec(self.make_los_text_jit, globals())
pro_jit, core_jit, epi_jit = make_los(
self.funks, self.errfunk)
else:
# breakpoint()
# if we import from a cache, we assume that the dataframe is in the same order
if transpile_reset or not hasattr(self, f'pro_{jitname}'):
jitfilename= f'modelsource/{jitname}_jitsolver.py'.replace(' ','_')
jitfile = Path(jitfilename)
jitfile.parent.mkdir(parents=True, exist_ok=True)
if not silent:
print(f'{transpile_reset=} {hasattr(self, f"pro_{jitname}")=} {jitfile.is_file()=}')
initfile = jitfile.parent /'__init__.py'
if not initfile.exists():
with open(initfile,'wt') as i:
i.write('#')
if transpile_reset or not jitfile.is_file():
solvetext0 = solveout()
solvetext = '\n'.join(
[l[4:] for l in solvetext0.split('\n')[1:-2]])
solvetext = solvetext.replace(
'cache=False', 'cache=True')
with open(jitfile, 'wt') as f:
f.write(solvetext)
importlib.invalidate_caches()
if not silent:
print(f'Writes the evaluation functon to {jitfile.is_file()=}')
if not silent:
print(f'Importing {jitfile}')
m1 = importlib.import_module('.'+jitfile.stem,jitfile.parent.name)
pro_jit, core_jit, epi_jit = m1.prolog, m1.core, m1.epilog
setattr(self, f'pro_{jitname}', pro_jit)
setattr(self, f'core_{jitname}', core_jit)
setattr(self, f'epi_{jitname}', epi_jit)
return getattr(self, f'pro_{jitname}'), getattr(self, f'core_{jitname}'), getattr(self, f'epi_{jitname}')
else:
if newdata or transpile_reset or not hasattr(self, f'pro_{nojitname}'):
if not silent:
print(f'now makelos makes a {solvename} solvefunction')
make_los_text = solveout()
self.make_los_text = make_los_text
exec(make_los_text, globals()) # creates the los function
pro, core, epi = make_los(self.funks, self.errfunk)
setattr(self, f'pro_{nojitname}', pro)
setattr(self, f'core_{nojitname}', core)
setattr(self, f'epi_{nojitname}', epi)
return getattr(self, f'pro_{nojitname}'), getattr(self, f'core_{nojitname}'), getattr(self, f'epi_{nojitname}')
[docs]
def is_newdata(self, databank):
'''Determins if thius is the same databank as in the previous solution '''
if not self.eqcolumns(self.genrcolumns, databank.columns):
# fill all Missing value with 0.0
databank = insertModelVar(databank, self)
for i in [j for j in self.allvar.keys() if self.allvar[j]['matrix']]:
# Make sure columns with matrixes are of this type
databank.loc[:, i] = databank.loc[:, i].astype('O')
newdata = True
else:
newdata = False
return newdata, databank
[docs]
def sim(self, databank, start='', end='', silent=1, samedata=0, alfa=1.0, stats=False, first_test=5,
max_iterations=200, conv='*', absconv=0.01, relconv=DEFAULT_relconv,
stringjit=False, transpile_reset=False,
dumpvar='*', init=False, ldumpvar=False, dumpwith=15, dumpdecimal=5, chunk=30, ljit=False, timeon=False,
fairopt={'fair_max_iterations ': 1}, progressbar=False,**kwargs):
'''
Evaluates this model on a databank from start to end (means end in Danish).
First it finds the values in the Dataframe, then creates the evaluater function through the *outeval* function
Then it evaluates the function and returns the values to a the Dataframe in the databank.
The text for the evaluater function is placed in the model property **make_los_text**
where it can be inspected
in case of problems.
Solves using Gauss-Seidle
:param databank: Input dataframe
:type databank: dataframe
:param start: start of simulation, defaults to ''
:type start: optional
:param end: end of simulation, defaults to ''
:type end: optional
:param silent: keep simulation silent , defaults to 1
:type silent: bool, optional
:param samedata: the inputdata has exactly same structure as last simulation , defaults to 0
:type samedata: bool, optional
:param alfa: Dampeing factor, defaults to 1.0
:type alfa: float, optional
:param stats: Show statistic after finish, defaults to False
:type stats: bool, optional
:param first_test: Start testing af number og simulation, defaults to 5
:type first_test: int, optional
:param max_iterations: Max iterations, defaults to 200
:type max_iterations: int, optional
:param conv: variables to test for convergence, defaults to '*'
:type conv: str, optional
:param absconv: Test convergence for values above this, defaults to 0.01
:type absconv: float, optional
:param relconv: If relative movement is less, then convergence , defaults to DEFAULT_relconv
:type relconv: float, optional
:param stringjit: If just in time compilation do it on a string not a file to import, defaults to True
:type stringjit: bool, optional
:param transpile_reset: Ingnore previous transpiled model, defaults to False
:type transpile_reset: bool, optional
:param dumpvar: Variables for which to dump the iterations, defaults to '*'
:type dumpvar: str, optional
:param init: If True take previous periods value as starting value, defaults to False
:type init: bool, optional
:param ldumpvar: Dump iterations, defaults to False
:type ldumpvar: bool, optional
:param dumpwith: DESCRIPTION, defaults to 15
:type dumpwith: int, optional
:param dumpdecimal: DESCRIPTION, defaults to 5
:type dumpdecimal: int, optional
:param chunk: Chunk size of transpiled model, defaults to 30
:type chunk: int, optional
:param ljit: Use just in time compilation, defaults to False
:type ljit: bool, optional
:param timeon: Time the elements, defaults to False
:type timeon: bool, optional
:param fairopt: Fair taylor options, defaults to {'fair_max_iterations': 1}
:type fairopt: TYPE, optional
:param progressbar: Show progress bar , defaults to False
:type progressbar: TYPE, optional
:return: A dataframe wilt the results
'''
starttimesetup = time.time()
fair_max_iterations = {**fairopt, **
kwargs}.get('fair_max_iterations ', 1)
sol_periode = self.smpl(start, end, databank)
# breakpoint()
self.check_sim_smpl(databank)
if not silent:
print('Will start solving: ' + self.name)
# if not self.eqcolumns(self.genrcolumns,databank.columns):
# databank=insertModelVar(databank,self) # fill all Missing value with 0.0
# for i in [j for j in self.allvar.keys() if self.allvar[j]['matrix']]:
# databank.loc[:,i]=databank.loc[:,i].astype('O') # Make sure columns with matrixes are of this type
# newdata = True
# else:
# newdata = False
newdata, databank = self.is_newdata(databank)
self.pro2d, self.solve2d, self.epi2d = self.makelos(
databank, solvename='sim', ljit=ljit, stringjit=stringjit, transpile_reset=transpile_reset,
chunk=chunk, newdata=newdata,silent=silent)
values = databank.values.copy() #
self.genrcolumns = databank.columns.copy()
self.genrindex = databank.index.copy()
# convvar = [conv.upper()] if isinstance(conv,str) else [c.upper() for c in conv] if conv != [] else list(self.endogene)
convvar = self.list_names(self.coreorder, conv)
# this is how convergence is measured
convplace = [databank.columns.get_loc(c) for c in convvar]
convergence = True
endoplace = [databank.columns.get_loc(c) for c in list(self.endogene)]
# breakpoint()
if ldumpvar:
self.dumplist = []
self.dump = self.list_names(self.coreorder, dumpvar)
dumpplac = [databank.columns.get_loc(v) for v in self.dump]
ittotal = 0
endtimesetup = time.time()
starttime = time.time()
bars = '{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}'
for fairiteration in range(fair_max_iterations):
if fair_max_iterations >= 2:
print(f'Fair-Taylor iteration: {fairiteration}')
with tqdm(total=len(sol_periode),disable = not progressbar,desc=f'Solving {self.name}',bar_format=bars) as pbar:
for self.periode in sol_periode:
row = databank.index.get_loc(self.periode)
if ldumpvar:
self.dumplist.append([fairiteration, self.periode, int(0)]+[values[row, p]
for p in dumpplac])
if init:
for c in endoplace:
values[row, c] = values[row-1, c]
itbefore = values[row, convplace]
self.pro2d(values, values, row, 1.0)
for iteration in range(max_iterations):
with self.timer(f'Evaluate {self.periode}/{iteration} ', timeon) as t:
self.solve2d(values, values, row, alfa)
ittotal += 1
if ldumpvar:
self.dumplist.append([fairiteration, self.periode, int(iteration+1)]+[values[row, p]
for p in dumpplac])
if iteration > first_test:
itafter = values[row, convplace]
select = absconv <= np.abs(itbefore)
convergence = (
np.abs((itafter-itbefore)[select])/np.abs(itbefore[select]) <= relconv).all()
if convergence:
if not silent:
print(
f'{self.periode} Solved in {iteration} iterations')
break
itbefore = itafter
else:
print(f'{self.periode} not converged in {iteration} iterations')
self.epi2d(values, values, row, 1.0)
pbar.update()
endtime = time.time()
if ldumpvar:
self.dumpdf = pd.DataFrame(self.dumplist)
del self.dumplist
self.dumpdf.columns = ['fair', 'per', 'iteration']+self.dump
if fair_max_iterations <= 2:
self.dumpdf.drop('fair', axis=1, inplace=True)
outdf = pd.DataFrame(values, index=databank.index,
columns=databank.columns)
if stats:
self.simtime = endtime-starttime
self.setuptime = endtimesetup - starttimesetup
numberfloats = (self.flop_get['core'][-1][1]*ittotal+
len(sol_periode)*(self.flop_get['prolog'][-1][1]+self.flop_get['epilog'][-1][1]))
print(
f'Setup time (seconds) :{self.setuptime:>15,.2f}')
print(
f'Foating point operations core :{self.flop_get["core"][-1][1]:>15,}')
print(
f'Foating point operations prolog :{self.flop_get["prolog"][-1][1]:>15,}')
print(
f'Foating point operations epilog :{self.flop_get["epilog"][-1][1]:>15,}')
print(f'Simulation period :{len(sol_periode):>15,}')
print(f'Total iterations :{ittotal:>15,}')
print(f'Total floating point operations :{numberfloats:>15,}')
print(
f'Simulation time (seconds) :{self.simtime:>15,.2f}')
if self.simtime > 0.0:
print(
f'Floating point operations per second :{numberfloats/self.simtime:>15,.1f}')
if not silent:
print(self.name + ' solved ')
return outdf
[docs]
@staticmethod
def grouper(iterable, n, fillvalue=''):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
[docs]
def outsolve2dcunk(self, databank,
debug=1, chunk=None,
ljit=False, type='gauss', cache=False):
''' takes a list of terms and translates to a evaluater function called los
The model axcess the data through:Dataframe.value[rowindex+lag,coloumnindex] which is very efficient
'''
short, long, longer = 4*' ', 8*' ', 12 * ' '
columnsnr = self.get_columnsnr(databank)
if ljit:
thisdebug = False
else:
thisdebug = debug
#print(f'Generating source for {self.name} using ljit = {ljit} ')
def make_gaussline2(vx, nodamp=False):
''' takes a list of terms and translates to a line in a gauss-seidel solver for
simultanius models
the variables
New version to take hand of several lhs variables. Dampning is not allowed for
this. But can easely be implemented by makeing a function to multiply tupels
nodamp is for pre and epilog solutions, which should not be dampened.
'''
termer = self.allvar[vx]['terms']
assigpos = self.allvar[vx]['assigpos']
if nodamp:
ldamp = False
else:
# convention for damping equations
if pt.kw_frml_name(self.allvar[vx]['frmlname'], 'DAMP') or 'Z' in self.allvar[vx]['frmlname']:
assert assigpos == 1, 'You can not dampen equations with several left hand sides:'+vx
endovar = [t.op if t.op else (
'values[row,'+str(columnsnr[t.var])+']') for j, t in enumerate(termer) if j <= assigpos-1]
# to implemet dampning of solution
damp = '(1-alfa)*('+''.join(endovar)+')+alfa*('
ldamp = True
else:
ldamp = False
out = []
for i, t in enumerate(termer[:-1]): # drop the trailing $
if t.op:
out.append(t.op.lower())
if i == assigpos and ldamp:
out.append(damp)
if t.number:
out.append(t.number)
elif t.var:
if i > assigpos:
out.append(
'values[row'+t.lag+','+str(columnsnr[t.var])+']')
else:
out.append(
'values[row'+t.lag+','+str(columnsnr[t.var])+']')
if ldamp:
out.append(')') # the last ) in the dampening
res = ''.join(out)
return res+'\n'
def make_resline2(vx, nodamp):
''' takes a list of terms and translates to a line calculating linne
'''
termer = self.allvar[vx]['terms']
assigpos = self.allvar[vx]['assigpos']
out = []
for i, t in enumerate(termer[:-1]): # drop the trailing $
if t.op:
out.append(t.op.lower())
if t.number:
out.append(t.number)
elif t.var:
lag = int(t.lag) if t.lag else 0
if i < assigpos:
out.append(
'outvalues[row'+t.lag+','+str(columnsnr[t.var])+']')
else:
out.append(
'values[row'+t.lag+','+str(columnsnr[t.var])+']')
res = ''.join(out)
return res+'\n'
def makeafunk(name, order, linemake, chunknumber, debug=False, overhead=0, oldeqs=0, nodamp=False, ljit=False, totalchunk=1):
''' creates the source of an evaluation function
keeps tap of how many equations and lines is in the functions abowe.
This allows the errorfunction to retriewe the variable for which a math error is thrown
'''
fib1 = []
fib2 = []
if ljit:
# fib1.append((short+'print("'+f"Compiling chunk {chunknumber+1}/{totalchunk} "+'",time.strftime("%H:%M:%S")) \n') if ljit else '')
fib1.append(
short+'@jit("(f8[:,:],f8[:,:],i8,f8)",fastmath=True,cache=False,nopython=True)\n')
fib1.append(short + 'def '+name +
'(values,outvalues,row,alfa=1.0):\n')
# fib1.append(long + 'outvalues = values \n')
if debug:
fib1.append(long+'try :\n')
fib1.append(longer+'pass\n')
newoverhead = len(fib1) + overhead
content = [longer + ('pass # '+v + '\n' if self.allvar[v]['dropfrml']
else linemake(v, nodamp))
for v in order if len(v)]
if debug:
fib2.append(long + 'except :\n')
fib2.append(
longer + f'errorfunk(values,sys.exc_info()[2].tb_lineno,overhead={newoverhead},overeq={oldeqs})'+'\n')
fib2.append(longer + 'raise\n')
fib2.append((long if debug else longer) + 'return \n')
neweq = oldeqs + len(content)
return list(chain(fib1, content, fib2)), newoverhead+len(content)+len(fib2), neweq
def makechunkedfunk(name, order, linemake, debug=False, overhead=0, oldeqs=0, nodamp=False, chunk=None, ljit=False):
''' makes the complete function to evaluate the model.
keeps the tab on previous overhead lines and equations, to helt the error function '''
newoverhead = overhead
neweqs = oldeqs
if chunk == None:
orderlist = [order]
else:
orderlist = list(self.grouper(order, chunk))
fib = []
fib2 = []
if ljit:
fib.append(short+f"pbar = tqdm.tqdm(total={len(orderlist)},"
+ f"desc='Compile {name:6}'"+",unit='code chunk',bar_format ='{l_bar}{bar}| {n_fmt}/{total_fmt} {rate_fmt}{postfix}')\n")
for i, o in enumerate(orderlist):
lines, head, eques = makeafunk(name+str(i), o, linemake, i, debug=debug, overhead=newoverhead, nodamp=nodamp,
ljit=ljit, oldeqs=neweqs, totalchunk=len(orderlist))
fib.extend(lines)
newoverhead = head
neweqs = eques
if ljit:
fib.append(short + f"pbar.update(1)\n")
if ljit:
# fib2.append((short+'print("'+f"Compiling a mastersolver "+'",time.strftime("%H:%M:%S")) \n') if ljit else '')
fib2.append(
short+'@jit("(f8[:,:],f8[:,:],i8,f8)",fastmath=True,cache=False,nopython=True)\n')
fib.append(short+f"pbar.close()\n")
fib2.append(short + 'def '+name +
'(values,outvalues,row,alfa=1.0):\n')
# fib2.append(long + 'outvalues = values \n')
tt = [
long+name+str(i)+'(values,outvalues,row,alfa=alfa)\n' for (i, ch) in enumerate(orderlist)]
fib2.extend(tt)
fib2.append(long+'return \n')
return fib+fib2, newoverhead+len(fib2), neweqs
linemake = make_resline2 if type == 'res' else make_gaussline2
fib2 = []
fib1 = ['def make_los(funks=[],errorfunk=None):\n']
fib1.append(short + 'import time' + '\n')
fib1.append(short + 'import tqdm' + '\n')
fib1.append(short + 'from numba import jit' + '\n')
fib1.append(short + 'from modeluserfunk import ' +
(', '.join(pt.userfunk)).lower()+'\n')
fib1.append(short + 'from modelBLfunk import ' +
(', '.join(pt.BLfunk)).lower()+'\n')
funktext = [short+f.__name__ + ' = funks[' +
str(i)+']\n' for i, f in enumerate(self.funks)]
fib1.extend(funktext)
with self.timer('make model text', False):
if self.use_preorder:
procontent, prooverhead, proeqs = makechunkedfunk('prolog', self.preorder, linemake, overhead=len(
fib1), oldeqs=0, debug=thisdebug, nodamp=True, ljit=ljit, chunk=chunk)
content, conoverhead, coneqs = makechunkedfunk(
'core', self.coreorder, linemake, overhead=prooverhead, oldeqs=proeqs, debug=thisdebug, ljit=ljit, chunk=chunk)
epilog, epioverhead, epieqs = makechunkedfunk(
'epilog', self.epiorder, linemake, overhead=conoverhead, oldeqs=coneqs, debug=thisdebug, nodamp=True, ljit=ljit, chunk=chunk)
else:
procontent, prooverhead, proeqs = makechunkedfunk('prolog', [], linemake, overhead=len(
fib1), oldeqs=0, ljit=ljit, debug=thisdebug, chunk=chunk)
content, conoverhead, coneqs = makechunkedfunk(
'core', self.solveorder, linemake, overhead=prooverhead, oldeqs=proeqs, ljit=ljit, debug=thisdebug, chunk=chunk)
epilog, epioverhead, epieqs = makechunkedfunk(
'epilog', [], linemake, ljit=ljit, debug=thisdebug, chunk=chunk, overhead=conoverhead, oldeqs=coneqs)
fib2.append(short + 'return prolog,core,epilog\n')
return ''.join(chain(fib1, procontent, content, epilog, fib2))
[docs]
def sim1d(self, databank, start='', end='', silent=1, samedata=0, alfa=1.0, stats=False, first_test=1,
max_iterations=100, conv='*', absconv=1.0, relconv=DEFAULT_relconv, init=False,
dumpvar='*', ldumpvar=False, dumpwith=15, dumpdecimal=5, chunk=30, ljit=False, stringjit=True, transpile_reset=False,
fairopt={'fair_max_iterations ': 1}, timeon=0, **kwargs):
'''Evaluates this model on a databank from start to end (means end in Danish).
First it finds the values in the Dataframe, then creates the evaluater function through the *outeval* function
(:func:`modelclass.model.fouteval`)
then it evaluates the function and returns the values to a the Dataframe in the databank.
The text for the evaluater function is placed in the model property **make_los_text**
where it can be inspected
in case of problems.
'''
starttimesetup = time.time()
fair_max_iterations = {**fairopt, **
kwargs}.get('fair_max_iterations ', 1)
sol_periode = self.smpl(start, end, databank)
self.check_sim_smpl(databank)
self.findpos()
# fill all Missing value with 0.0
databank = insertModelVar(databank, self)
with self.timer('create stuffer and gauss lines ', timeon) as t:
if (not hasattr(self, 'stuff3')) or (not self.eqcolumns(self.simcolumns, databank.columns)):
self.stuff3, self.saveeval3 = self.createstuff3(databank)
self.simcolumns = databank.columns.copy()
with self.timer('Create solver function', timeon) as t:
this_pro1d, this_solve1d, this_epi1d = self.makelos(
None, solvename='sim1d', ljit=ljit, stringjit=stringjit, transpile_reset=transpile_reset, chunk=chunk)
values = databank.values.copy()
self.values_ = values # for use in errdump
self.genrcolumns = databank.columns.copy()
self.genrindex = databank.index.copy()
convvar = self.list_names(self.coreorder, conv)
# convplace=[databank.columns.get_loc(c) for c in convvar] # this is how convergence is measured
convplace = [self.allvar[c]['startnr'] -
self.allvar[c]['maxlead'] for c in convvar]
endoplace = [databank.columns.get_loc(c) for c in list(self.endogene)]
convergence = True
if ldumpvar:
self.dumplist = []
self.dump = self.list_names(self.coreorder, dumpvar)
dumpplac = [self.allvar[v]['startnr'] -
self.allvar[v]['maxlead'] for v in self.dump]
ittotal = 0
endtimesetup = time.time()
starttime = time.time()
for fairiteration in range(fair_max_iterations):
if fair_max_iterations >= 2:
if not silent:
print(f'Fair-Taylor iteration: {fairiteration}')
for self.periode in sol_periode:
row = databank.index.get_loc(self.periode)
self.row_ = row
if init:
for c in endoplace:
values[row, c] = values[row-1, c]
with self.timer(f'stuff {self.periode} ', timeon) as t:
a = self.stuff3(values, row, ljit)
#
if ldumpvar:
self.dumplist.append([fairiteration, self.periode, int(0)]+[a[p]
for p in dumpplac])
itbeforeold = [a[c] for c in convplace]
itbefore = a[convplace]
this_pro1d(a, 1.0)
for iteration in range(max_iterations):
with self.timer(f'Evaluate {self.periode}/{iteration} ', timeon) as t:
this_solve1d(a, alfa)
ittotal += 1
if ldumpvar:
self.dumplist.append([fairiteration, self.periode, int(iteration+1)]+[a[p]
for p in dumpplac])
if iteration > first_test:
itafter = a[convplace]
# breakpoint()
select = absconv <= np.abs(itbefore)
convergence = (
np.abs((itafter-itbefore)[select])/np.abs(itbefore[select]) <= relconv).all()
if convergence:
if not silent:
print(
f'{self.periode} Solved in {iteration} iterations')
break
itbefore = itafter
else:
print(f'{self.periode} not converged in {iteration} iterations')
this_epi1d(a, 1.0)
self.saveeval3(values, row, a)
if ldumpvar:
self.dumpdf = pd.DataFrame(self.dumplist)
del self.dumplist
self.dumpdf.columns = ['fair', 'per', 'iteration']+self.dump
self.dumpdf = self.dumpdf.sort_values(['per', 'fair', 'iteration'])
if fair_max_iterations <= 2:
self.dumpdf.drop('fair', axis=1, inplace=True)
outdf = pd.DataFrame(values, index=databank.index,
columns=databank.columns)
del self.values_ # not needed any more
if stats:
numberfloats = self.calculate_freq[-1][1]*ittotal
endtime = time.time()
self.simtime = endtime-starttime
self.setuptime = endtimesetup - starttimesetup
print(
f'Setup time (seconds) :{self.setuptime:>15,.2f}')
print(
f'Foating point operations :{self.calculate_freq[-1][1]:>15,}')
print(f'Total iterations :{ittotal:>15,}')
print(f'Total floating point operations :{numberfloats:>15,}')
print(
f'Simulation time (seconds) :{self.simtime:>15,.2f}')
if self.simtime > 0.0:
print(
f'Floating point operations per second : {numberfloats/self.simtime:>15,.1f}')
if not silent:
print(self.name + ' finished ')
return outdf
[docs]
def outsolve1dcunk(self, debug=0, chunk=None, ljit=False, cache='False'):
''' takes a list of terms and translates to a evaluater function called los
The model axcess the data through:Dataframe.value[rowindex+lag,coloumnindex] which is very efficient
'''
short, long, longer = 4*' ', 8*' ', 12 * ' '
self.findpos()
if ljit:
thisdebug = False
else:
thisdebug = debug
def makeafunk(name, order, linemake, chunknumber, debug=False, overhead=0, oldeqs=0, nodamp=False, ljit=False, totalchunk=1):
''' creates the source of an evaluation function
keeps tap of how many equations and lines is in the functions abowe.
This allows the errorfunction to retriewe the variable for which a math error is thrown
'''
fib1 = []
fib2 = []
if ljit:
# fib1.append((short+'print("'+f"Compiling chunk {chunknumber+1}/{totalchunk} "+'",time.strftime("%H:%M:%S")) \n') if ljit else '')
fib1.append(
short+f'@jit("(f8[:],f8)",fastmath=True,cache={cache},nopython=True)\n')
fib1.append(short + 'def '+name+'(a,alfa=1.0):\n')
# fib1.append(long + 'outvalues = values \n')
if debug:
fib1.append(long+'try :\n')
fib1.append(longer+'pass\n')
newoverhead = len(fib1) + overhead
content = [longer + ('pass # '+v + '\n' if self.allvar[v]['dropfrml']
else linemake(v, nodamp)+'\n')
for v in order if len(v)]
if debug:
fib2.append(long + 'except :\n')
fib2.append(
longer + f'errorfunk(a,sys.exc_info()[2].tb_lineno,overhead={newoverhead},overeq={oldeqs})'+'\n')
fib2.append(longer + 'raise\n')
fib2.append((long if debug else longer) + 'return \n')
neweq = oldeqs + len(content)
return list(chain(fib1, content, fib2)), newoverhead+len(content)+len(fib2), neweq
def makechunkedfunk(name, order, linemake, debug=False, overhead=0, oldeqs=0, nodamp=False, chunk=None, ljit=False):
''' makes the complete function to evaluate the model.
keeps the tab on previous overhead lines and equations, to helt the error function '''
newoverhead = overhead
neweqs = oldeqs
if chunk == None:
orderlist = [order]
else:
orderlist = list(self.grouper(order, chunk))
fib = []
fib2 = []
if ljit:
fib.append(short+f"pbar = tqdm.tqdm(total={len(orderlist)},"
+ f"desc='Compile {name:6}'"+",unit='code chunk',bar_format ='{l_bar}{bar}| {n_fmt}/{total_fmt} {rate_fmt}{postfix}')\n")
for i, o in enumerate(orderlist):
lines, head, eques = makeafunk(name+str(i), o, linemake, i, debug=debug, overhead=newoverhead, nodamp=nodamp,
ljit=ljit, oldeqs=neweqs, totalchunk=len(orderlist))
fib.extend(lines)
newoverhead = head
neweqs = eques
if ljit:
fib.append(short + f"pbar.update(1)\n")
if ljit:
# fib2.append((short+'print("'+f"Compiling a mastersolver "+'",time.strftime("%H:%M:%S")) \n') if ljit else '')
fib2.append(
short+f'@jit("(f8[:],f8)",fastmath=True,cache={cache},nopython=True)\n')
fib.append(short+f"pbar.close()\n")
fib2.append(short + 'def '+name+'(a,alfa=1.0):\n')
# fib2.append(long + 'outvalues = values \n')
tt = [long+name+str(i)+'(a,alfa=alfa)\n' for (i,
ch) in enumerate(orderlist)]
fib2.extend(tt)
fib2.append(long+'return \n')
return fib+fib2, newoverhead+len(fib2), neweqs
linemake = self.make_gaussline
fib2 = []
fib1 = ['def make_los(funks=[],errorfunk=None):\n']
fib1.append(short + 'import time' + '\n')
fib1.append(short + 'import tqdm' + '\n')
fib1.append(short + 'from numba import jit' + '\n')
fib1.append(short + 'from modeluserfunk import ' +
(', '.join(pt.userfunk)).lower()+'\n')
fib1.append(short + 'from modelBLfunk import ' +
(', '.join(pt.BLfunk)).lower()+'\n')
funktext = [short+f.__name__ + ' = funks[' +
str(i)+']\n' for i, f in enumerate(self.funks)]
fib1.extend(funktext)
if self.use_preorder:
procontent, prooverhead, proeqs = makechunkedfunk('prolog', self.preorder, linemake, overhead=len(
fib1), oldeqs=0, ljit=ljit, debug=thisdebug, nodamp=True, chunk=chunk)
content, conoverhead, coneqs = makechunkedfunk(
'core', self.coreorder, linemake, overhead=prooverhead, oldeqs=proeqs, ljit=ljit, debug=thisdebug, chunk=chunk)
epilog, epioverhead, epieqs = makechunkedfunk(
'epilog', self.epiorder, linemake, overhead=conoverhead, oldeqs=coneqs, ljit=ljit, debug=thisdebug, nodamp=True, chunk=chunk)
else:
procontent, prooverhead, proeqs = makechunkedfunk('prolog', [], linemake, overhead=len(
fib1), oldeqs=0, ljit=ljit, debug=thisdebug, chunk=chunk)
content, conoverhead, coneqs = makechunkedfunk(
'core', self.solveorder, linemake, overhead=prooverhead, oldeqs=proeqs, ljit=ljit, debug=thisdebug, chunk=chunk)
epilog, epioverhead, epieqs = makechunkedfunk(
'epilog', [], linemake, overhead=conoverhead, oldeqs=coneqs, ljit=ljit, debug=thisdebug, chunk=chunk)
fib2.append(short + 'return prolog,core,epilog\n')
return ''.join(chain(fib1, procontent, content, epilog, fib2))
[docs]
def newton(self, databank, start='', end='', silent=1, samedata=0, alfa=1.0, stats=False, first_test=1, newton_absconv=0.001,
max_iterations=20, conv='*', absconv=1.0, relconv=DEFAULT_relconv, nonlin=False, timeit=False, newton_reset=1,
dumpvar='*', ldumpvar=False, dumpwith=15, dumpdecimal=5, chunk=30, ljit=False, stringjit=False, transpile_reset=False, lnjit=False, init=False,
newtonalfa=1.0, newtonnodamp=0, forcenum=True,
fairopt={'fair_max_iterations ': 1}, **kwargs):
'''Evaluates this model on a databank from start to end (means end in Danish).
First it finds the values in the Dataframe, then creates the evaluater function through the *outeval* function
(:func:`modelclass.model.fouteval`)
then it evaluates the function and returns the values to a the Dataframe in the databank.
The text for the evaluater function is placed in the model property **make_los_text**
where it can be inspected
in case of problems.
'''
# print('new nwwton')
starttimesetup = time.time()
fair_max_iterations = {**fairopt, **
kwargs}.get('fair_max_iterations ', 1)
sol_periode = self.smpl(start, end, databank)
self.check_sim_smpl(databank)
# if not self.eqcolumns(self.genrcolumns,databank.columns):
# databank=insertModelVar(databank,self) # fill all Missing value with 0.0
# for i in [j for j in self.allvar.keys() if self.allvar[j]['matrix']]:
# databank.loc[:,i]=databank.loc[:,i].astype('O') # Make sure columns with matrixes are of this type
# newdata = True
# else:
# newdata = False
newdata, databank = self.is_newdata(databank)
self.pronew2d, self.solvenew2d, self.epinew2d = self.makelos(databank, solvename='newton',
ljit=ljit, stringjit=stringjit, transpile_reset=transpile_reset, chunk=chunk, newdata=newdata)
values = databank.values.copy()
outvalues = np.empty_like(values)
if not hasattr(self, 'newton_1per_diff'):
endovar = self.coreorder if self.use_preorder else self.solveorder
self.newton_1per_diff = newton_diff(self, forcenum=forcenum, df=databank,
endovar=endovar, ljit=lnjit, nchunk=chunk, onlyendocur=True, silent=silent)
if not hasattr(self, 'newton_1per_solver') or newton_reset:
# breakpoint()
self.newton_1per_solver = self.newton_1per_diff.get_solve1per(
df=databank, periode=[self.current_per[0]])[self.current_per[0]]
newton_col = [databank.columns.get_loc(
c) for c in self.newton_1per_diff.endovar]
self.genrcolumns = databank.columns.copy()
self.genrindex = databank.index.copy()
convvar = self.list_names(self.coreorder, conv)
# this is how convergence is measured
convplace = [databank.columns.get_loc(c) for c in convvar]
endoplace = [databank.columns.get_loc(c) for c in list(self.endogene)]
convergence = True
if ldumpvar:
self.dumplist = []
self.dump = self.list_names(self.coreorder, dumpvar)
dumpplac = [databank.columns.get_loc(v) for v in self.dump]
ittotal = 0
endtimesetup = time.time()
starttime = time.time()
for fairiteration in range(fair_max_iterations):
if fair_max_iterations >= 2:
print(f'Fair-Taylor iteration: {fairiteration}')
for self.periode in sol_periode:
row = databank.index.get_loc(self.periode)
if init:
for c in endoplace:
values[row, c] = values[row-1, c]
if ldumpvar:
self.dumplist.append([fairiteration, self.periode, int(0)]+[values[row, p]
for p in dumpplac])
itbefore = [values[row, c] for c in convplace]
self.pronew2d(values, values, row, alfa)
for iteration in range(max_iterations):
with self.timer(f'sim per:{self.periode} it:{iteration}', timeit) as xxtt:
before = values[row, newton_col]
self.solvenew2d(values, outvalues, row, alfa)
now = outvalues[row, newton_col]
distance = now-before
newton_conv = np.abs(distance).sum()
if not silent:
print(
f'Iteration {iteration} Sum of distances {newton_conv:>{15},.{6}f}')
if newton_conv <= newton_absconv:
break
# breakpoint()
if iteration != 0 and nonlin and not (iteration % nonlin):
with self.timer('Updating solver', timeit) as t3:
if not silent:
print(
f'Updating solver, iteration {iteration}')
df_now = pd.DataFrame(
values, index=databank.index, columns=databank.columns)
self.newton_1per_solver = self.newton_1per_diff.get_solve1per(
df=df_now, periode=[self.periode])[self.periode]
with self.timer('Update solution', timeit):
# update = self.solveinv(distance)
update = self.newton_1per_solver(distance)
damp = newtonalfa if iteration <= newtonnodamp else 1.0
values[row, newton_col] = before - update * damp
ittotal += 1
if ldumpvar:
self.dumplist.append([fairiteration, self.periode, int(iteration+1)]+[values[row, p]
for p in dumpplac])
# if iteration > first_test:
# itafter=[values[row,c] for c in convplace]
# convergence = True
# for after,before in zip(itafter,itbefore):
# print(before,after)
# if before > absconv and abs(after-before)/abs(before) > relconv:
# convergence = False
# break
# if convergence:
# break
# else:
# itbefore=itafter
self.epinew2d(values, values, row, alfa)
if not silent:
if not convergence:
print(
f'{self.periode} not converged in {iteration} iterations')
else:
print(f'{self.periode} Solved in {iteration} iterations')
if ldumpvar:
self.dumpdf = pd.DataFrame(self.dumplist)
del self.dumplist
self.dumpdf.columns = ['fair', 'per', 'iteration']+self.dump
if fair_max_iterations <= 2:
self.dumpdf.drop('fair', axis=1, inplace=True)
outdf = pd.DataFrame(values, index=databank.index,
columns=databank.columns)
if stats:
numberfloats = self.calculate_freq[-1][1]*ittotal
endtime = time.time()
self.simtime = endtime-starttime
self.setuptime = endtimesetup - starttimesetup
print(
f'Setup time (seconds) :{self.setuptime:>15,.2f}')
print(
f'Foating point operations :{self.calculate_freq[-1][1]:>15,}')
print(f'Total iterations :{ittotal:>15,}')
print(f'Total floating point operations :{numberfloats:>15,}')
print(
f'Simulation time (seconds) :{self.simtime:>15,.2f}')
if self.simtime > 0.0:
print(
f'Floating point operations per second : {numberfloats/self.simtime:>15,.1f}')
if not silent:
print(self.name + ' solved ')
return outdf
[docs]
def newtonstack(self, databank, start='', end='', silent=1, samedata=0, alfa=1.0, stats=False, first_test=1, newton_absconv=0.001,
max_iterations=20, conv='*', absconv=1., relconv=DEFAULT_relconv,
dumpvar='*', ldumpvar=False, dumpwith=15, dumpdecimal=5, chunk=30, nchunk=30, ljit=False, stringjit=False, transpile_reset=False, nljit=0,
fairopt={'fair_max_iterations ': 1}, debug=False, timeit=False, nonlin=False,
newtonalfa=1.0, newtonnodamp=0, forcenum=True, newton_reset=False, **kwargs):
'''Evaluates this model on a databank from start to end (means end in Danish).
First it finds the values in the Dataframe, then creates the evaluater function through the *outeval* function
(:func:`modelclass.model.fouteval`)
then it evaluates the function and returns the values to a the Dataframe in the databank.
The text for the evaluater function is placed in the model property **make_los_text**
where it can be inspected
in case of problems.
'''
# print('new nwwton')
ittotal = 0
diffcount = 0
starttimesetup = time.time()
fair_max_iterations = {**fairopt, **
kwargs}.get('fair_max_iterations ', 1)
sol_periode = self.smpl(start, end, databank)
self.check_sim_smpl(databank)
if not silent:
print('Will start calculating: ' + self.name)
newdata, databank = self.is_newdata(databank)
self.pronew2d, self.solvenew2d, self.epinew2d = self.makelos(databank, solvename='newton',
ljit=ljit, stringjit=stringjit, transpile_reset=transpile_reset, chunk=chunk, newdata=newdata)
values = databank.values.copy()
outvalues = np.empty_like(values)
if not hasattr(self, 'newton_diff_stack'):
self.newton_diff_stack = newton_diff(
self, forcenum=forcenum, df=databank, ljit=nljit, nchunk=nchunk, silent=silent)
if not hasattr(self, 'stacksolver'):
self.getsolver = self.newton_diff_stack.get_solvestacked
diffcount += 1
self.stacksolver = self.getsolver(databank)
if not silent:
print(f'Creating new derivatives and new solver')
self.old_stack_periode = sol_periode.copy()
elif newton_reset or not all(self.old_stack_periode[[0, -1]] == sol_periode[[0, -1]]):
# breakpoint()
print(f'Creating new solver')
diffcount += 1
self.stacksolver = self.getsolver(databank)
self.old_stack_periode = sol_periode.copy()
newton_col = [databank.columns.get_loc(
c) for c in self.newton_diff_stack.endovar]
self.newton_diff_stack.timeit = timeit
self.genrcolumns = databank.columns.copy()
self.genrindex = databank.index.copy()
convvar = self.list_names(self.coreorder, conv)
# this is how convergence is measured
convplace = [databank.columns.get_loc(c) for c in convvar]
convergence = False
if ldumpvar:
self.dumplist = []
self.dump = self.list_names(self.coreorder, dumpvar)
dumpplac = [databank.columns.get_loc(v) for v in self.dump]
ittotal = 0
endtimesetup = time.time()
starttime = time.time()
self.stackrows = [databank.index.get_loc(p) for p in sol_periode]
self.stackrowindex = np.array(
[[r]*len(newton_col) for r in self.stackrows]).flatten()
self.stackcolindex = np.array(
[newton_col for r in self.stackrows]).flatten()
# breakpoint()
# if ldumpvar:
# self.dumplist.append([fairiteration,self.periode,int(0)]+[values[row,p]
# for p in dumpplac])
# itbefore = values[self.stackrows,convplace]
# self.pro2d(values, values, row , alfa )
for iteration in range(max_iterations):
with self.timer(f'\nNewton it:{iteration}', timeit) as xxtt:
before = values[self.stackrowindex, self.stackcolindex]
with self.timer('calculate new solution', timeit) as t2:
for self.periode, row in zip(sol_periode, self.stackrows):
self.pronew2d(values, outvalues, row, alfa)
self.solvenew2d(values, outvalues, row, alfa)
self.epinew2d(values, outvalues, row, alfa)
ittotal += 1
if ldumpvar:
self.dumplist.append([1.0, databank.index[row], int(iteration+1)]+[values[row, p]
for p in dumpplac])
with self.timer('extract new solution', timeit) as t2:
now = outvalues[self.stackrowindex, self.stackcolindex]
distance = now-before
newton_conv = np.abs(distance).sum()
if not silent:
print(
f'Iteration {iteration} Sum of distances {newton_conv:>{15},.{6}f}')
if newton_conv <= newton_absconv:
convergence = True
break
if iteration != 0 and nonlin and not (iteration % nonlin):
with self.timer('Updating solver', timeit) as t3:
if not silent:
print(f'Updating solver, iteration {iteration}')
df_now = pd.DataFrame(
values, index=databank.index, columns=databank.columns)
self.stacksolver = self.getsolver(df=df_now)
diffcount += 1
with self.timer('Update solution', timeit):
# update = self.solveinv(distance)
update = self.stacksolver(distance)
damp = newtonalfa if iteration <= newtonnodamp else 1.0
values[self.stackrowindex,
self.stackcolindex] = before - damp * update
# if iteration > first_test:
# itafter=[values[row,c] for c in convplace]
# convergence = True
# for after,before in zip(itafter,itbefore):
# print(before,after)
# if before > absconv and abs(after-before)/abs(before) > relconv:
# convergence = False
# break
# if convergence:
# break
# else:
# itbefore=itafter
# self.epistack2d(values, values, row , alfa )
if not silent:
if not convergence:
print(f'Not converged in {iteration} iterations')
else:
print(f'Solved in {iteration} iterations')
if ldumpvar:
self.dumpdf = pd.DataFrame(self.dumplist)
del self.dumplist
self.dumpdf.columns = ['fair', 'per', 'iteration']+self.dump
self.dumpdf.sort_values(['per', 'iteration'], inplace=True)
if fair_max_iterations <= 2:
self.dumpdf.drop('fair', axis=1, inplace=True)
outdf = pd.DataFrame(values, index=databank.index,
columns=databank.columns)
if stats:
numberfloats = self.calculate_freq[-1][1]*ittotal
endtime = time.time()
self.simtime = endtime-starttime
self.setuptime = endtimesetup - starttimesetup
print(
f'Setup time (seconds) :{self.setuptime:>15,.4f}')
print(f'Total model evaluations :{ittotal:>15,}')
print(f'Number of solver update :{diffcount:>15,}')
print(
f'Simulation time (seconds) :{self.simtime:>15,.4f}')
if self.simtime > 0.0:
print(
f'Floating point operations per second : {numberfloats/self.simtime:>15,.1f}')
if not silent:
print(self.name + ' solved ')
return outdf
[docs]
def newton_un_normalized(self, databank, start='', end='', silent=1, samedata=0, alfa=1.0, stats=False, first_test=1, newton_absconv=0.001,
max_iterations=20, conv='*', absconv=1.0, relconv=DEFAULT_relconv, nonlin=False, timeit=False, newton_reset=1,
dumpvar='*', ldumpvar=False, dumpwith=15, dumpdecimal=5, chunk=30, ljit=False, stringjit=False, transpile_reset=False, lnjit=False,
fairopt={'fair_max_iterations ': 1},
newtonalfa=1.0, newtonnodamp=0, forcenum=True, **kwargs):
'''Evaluates this model on a databank from start to end (means end in Danish).
First it finds the values in the Dataframe, then creates the evaluater function through the *outeval* function
(:func:`modelclass.model.fouteval`)
then it evaluates the function and returns the values to a the Dataframe in the databank.
The text for the evaluater function is placed in the model property **make_los_text**
where it can be inspected
in case of problems.
'''
# print('new nwwton')
starttimesetup = time.time()
fair_max_iterations = {**fairopt, **
kwargs}.get('fair_max_iterations ', 1)
sol_periode = self.smpl(start, end, databank)
# breakpoint()
self.check_sim_smpl(databank)
if not silent:
print('Will start calculating: ' + self.name)
newdata, databank = self.is_newdata(databank)
self.pronew2d, self.solvenew2d, self.epinew2d = self.makelos(databank, solvename='newton',
ljit=ljit, stringjit=stringjit, transpile_reset=transpile_reset, chunk=chunk, newdata=newdata)
values = databank.values.copy()
outvalues = np.empty_like(values)
# endovar = self.solveorder
# breakpoint()
if not hasattr(self, 'newton_diff'):
self.newton_diff = newton_diff(self, forcenum=forcenum, df=databank,
endovar=None, ljit=lnjit, nchunk=chunk, onlyendocur=True, silent=silent)
if not hasattr(self, 'newun1persolver') or newton_reset:
# breakpoint()
self.newun1persolver = self.newton_diff.get_solve1per(
df=databank, periode=[self.current_per[0]])[self.current_per[0]]
newton_col = [databank.columns.get_loc(
c) for c in self.newton_diff.endovar]
newton_col_endo = [databank.columns.get_loc(
c) for c in self.newton_diff.declared_endo_list]
self.genrcolumns = databank.columns.copy()
self.genrindex = databank.index.copy()
convvar = self.list_names(self.newton_diff.declared_endo_list, conv)
# this is how convergence is measured
convplace = [databank.columns.get_loc(c) for c in convvar]
convergence = True
if ldumpvar:
self.dumplist = []
self.dump = self.list_names(
self.newton_diff.declared_endo_list, dumpvar)
dumpplac = [databank.columns.get_loc(v) for v in self.dump]
# breakpoint()
ittotal = 0
endtimesetup = time.time()
starttime = time.time()
for fairiteration in range(fair_max_iterations):
if fair_max_iterations >= 2:
print(f'Fair-Taylor iteration: {fairiteration}')
for self.periode in sol_periode:
row = databank.index.get_loc(self.periode)
if ldumpvar:
self.dumplist.append([fairiteration, self.periode, int(0)]+[values[row, p]
for p in dumpplac])
itbefore = [values[row, c] for c in convplace]
self.pronew2d(values, values, row, alfa)
for iteration in range(max_iterations):
with self.timer(f'sim per:{self.periode} it:{iteration}', 0) as xxtt:
before = values[row, newton_col_endo]
self.pronew2d(values, outvalues, row, alfa)
self.solvenew2d(values, outvalues, row, alfa)
self.epinew2d(values, outvalues, row, alfa)
now = outvalues[row, newton_col]
distance = now
newton_conv = np.abs(distance).sum()
if not silent:
print(
f'Iteration {iteration} Sum of distances {newton_conv:>{15},.{6}f}')
if newton_conv <= newton_absconv:
break
if iteration != 0 and nonlin and not (iteration % nonlin):
with self.timer('Updating solver', timeit) as t3:
if not silent:
print(
f'Updating solver, iteration {iteration}')
df_now = pd.DataFrame(
values, index=databank.index, columns=databank.columns)
self.newun1persolver = self.newton_diff.get_solve1per(
df=df_now, periode=[self.periode])[self.periode]
# breakpoint()
with self.timer('Update solution', 0):
# update = self.solveinv(distance)
update = self.newun1persolver(distance)
# breakpoint()
damp = newtonalfa if iteration <= newtonnodamp else 1.0
values[row, newton_col_endo] = before - damp*update
ittotal += 1
if ldumpvar:
self.dumplist.append([fairiteration, self.periode, int(iteration+1)]+[values[row, p]
for p in dumpplac])
# if iteration > first_test:
# itafter=[values[row,c] for c in convplace]
# convergence = True
# for after,before in zip(itafter,itbefore):
# print(before,after)
# if before > absconv and abs(after-before)/abs(before) > relconv:
# convergence = False
# break
# if convergence:
# break
# else:
# itbefore=itafter
self.epinew2d(values, values, row, alfa)
if not silent:
if not convergence:
print(
f'{self.periode} not converged in {iteration} iterations')
else:
print(f'{self.periode} Solved in {iteration} iterations')
if ldumpvar:
self.dumpdf = pd.DataFrame(self.dumplist)
del self.dumplist
self.dumpdf.columns = ['fair', 'per', 'iteration']+self.dump
if fair_max_iterations <= 2:
self.dumpdf.drop('fair', axis=1, inplace=True)
outdf = pd.DataFrame(values, index=databank.index,
columns=databank.columns)
if stats:
numberfloats = self.calculate_freq[-1][1]*ittotal
endtime = time.time()
self.simtime = endtime-starttime
self.setuptime = endtimesetup - starttimesetup
print(
f'Setup time (seconds) :{self.setuptime:>15,.2f}')
print(
f'Foating point operations :{self.calculate_freq[-1][1]:>15,}')
print(f'Total iterations :{ittotal:>15,}')
print(f'Total floating point operations :{numberfloats:>15,}')
print(
f'Simulation time (seconds) :{self.simtime:>15,.2f}')
if self.simtime > 0.0:
print(
f'Floating point operations per second : {numberfloats/self.simtime:>15,.1f}')
if not silent:
print(self.name + ' solved ')
return outdf
[docs]
def newtonstack_un_normalized(self, databank, start='', end='', silent=1, samedata=0, alfa=1.0, stats=False, first_test=1, newton_absconv=0.001,
max_iterations=20, conv='*', absconv=1.0, relconv=DEFAULT_relconv,
dumpvar='*', ldumpvar=False, dumpwith=15, dumpdecimal=5, chunk=30, nchunk=None, ljit=False, nljit=0, stringjit=False, transpile_reset=False,
fairopt={'fair_max_iterations ': 1}, debug=False, timeit=False, nonlin=False,
newtonalfa=1.0, newtonnodamp=0, forcenum=True, newton_reset=False, **kwargs):
'''Evaluates this model on a databank from start to end (means end in Danish).
First it finds the values in the Dataframe, then creates the evaluater function through the *outeval* function
(:func:`modelclass.model.fouteval`)
then it evaluates the function and returns the values to a the Dataframe in the databank.
The text for the evaluater function is placed in the model property **make_los_text**
where it can be inspected
in case of problems.
'''
# print('new nwwton')
ittotal = 0
diffcount = 0
starttimesetup = time.time()
fair_max_iterations = {**fairopt, **
kwargs}.get('fair_max_iterations ', 1)
sol_periode = self.smpl(start, end, databank)
self.check_sim_smpl(databank)
newdata, databank = self.is_newdata(databank)
self.pronew2d, self.solvenew2d, self.epinew2d = self.makelos(databank, solvename='newton',
ljit=ljit, stringjit=stringjit, transpile_reset=transpile_reset, chunk=chunk, newdata=newdata)
values = databank.values.copy()
outvalues = np.empty_like(values)
if not hasattr(self, 'newton_diff_stack'):
self.newton_diff_stack = newton_diff(
self, forcenum=forcenum, df=databank, ljit=nljit, nchunk=nchunk, timeit=timeit, silent=silent)
if not hasattr(self, 'stackunsolver'):
if not silent:
print(
f'Calculating new derivatives and create new stacked Newton solver')
self.getstackunsolver = self.newton_diff_stack.get_solvestacked
diffcount += 1
self.stackunsolver = self.getstackunsolver(databank)
self.old_stack_periode = sol_periode.copy()
elif newton_reset or not all(self.old_stack_periode[[0, -1]] == sol_periode[[0, -1]]):
print(f'Creating new stacked Newton solver')
diffcount += 1
self.stackunsolver = self.getstackunsolver(databank)
self.old_stack_periode = sol_periode.copy()
newton_col = [databank.columns.get_loc(
c) for c in self.newton_diff_stack.endovar]
# breakpoint()
newton_col_endo = [databank.columns.get_loc(
c) for c in self.newton_diff_stack.declared_endo_list]
self.newton_diff_stack.timeit = timeit
self.genrcolumns = databank.columns.copy()
self.genrindex = databank.index.copy()
convvar = self.list_names(self.coreorder, conv)
# this is how convergence is measured
convplace = [databank.columns.get_loc(c) for c in convvar]
convergence = True
if ldumpvar:
self.dumplist = []
self.dump = self.list_names(
self.newton_diff_stack.declared_endo_list, dumpvar)
dumpplac = [databank.columns.get_loc(v) for v in self.dump]
ittotal = 0
endtimesetup = time.time()
starttime = time.time()
self.stackrows = [databank.index.get_loc(p) for p in sol_periode]
self.stackrowindex = np.array(
[[r]*len(newton_col) for r in self.stackrows]).flatten()
self.stackcolindex = np.array(
[newton_col for r in self.stackrows]).flatten()
self.stackcolindex_endo = np.array(
[newton_col_endo for r in self.stackrows]).flatten()
# if ldumpvar:
# self.dumplist.append([fairiteration,self.periode,int(0)]+[values[row,p]
# for p in dumpplac])
# itbefore = values[self.stackrows,convplace]
# self.pro2d(values, values, row , alfa )
for iteration in range(max_iterations):
with self.timer(f'\nNewton it:{iteration}', timeit) as xxtt:
before = values[self.stackrowindex, self.stackcolindex_endo]
with self.timer('calculate new solution', timeit) as t2:
for self.periode, row in zip(sol_periode, self.stackrows):
self.pronew2d(values, outvalues, row, alfa)
self.solvenew2d(values, outvalues, row, alfa)
self.epinew2d(values, outvalues, row, alfa)
ittotal += 1
with self.timer('extract new solution', timeit) as t2:
now = outvalues[self.stackrowindex, self.stackcolindex]
distance = now
newton_conv = np.abs(distance).sum()
# breakpoint()
if not silent:
print(
f'Iteration {iteration} Sum of distances {newton_conv:>{25},.{12}f}')
if newton_conv <= newton_absconv:
convergence = True
break
if iteration != 0 and nonlin and not (iteration % nonlin):
with self.timer('Updating solver', timeit) as t3:
if not silent:
print(f'Updating solver, iteration {iteration}')
df_now = pd.DataFrame(
values, index=databank.index, columns=databank.columns)
self.stackunsolver = self.getstackunsolver(df=df_now)
diffcount += 1
with self.timer('Update solution', timeit):
# update = self.solveinv(distance)
update = self.stackunsolver(distance)
damp = newtonalfa if iteration <= newtonnodamp else 1.0
values[self.stackrowindex,
self.stackcolindex_endo] = before - damp * update
if ldumpvar:
for periode, row in zip(self.current_per, self.stackrows):
self.dumplist.append([0, periode, int(iteration+1)]+[values[row, p]
for p in dumpplac])
# if iteration > first_test:
# itafter=[values[row,c] for c in convplace]
# convergence = True
# for after,before in zip(itafter,itbefore):
# print(before,after)
# if before > absconv and abs(after-before)/abs(before) > relconv:
# convergence = False
# break
# if convergence:
# break
# else:
# itbefore=itafter
# self.epistack2d(values, values, row , alfa )
if not silent:
if not convergence:
print(f'Not converged in {iteration} iterations')
else:
print(f'Solved in {iteration} iterations')
if ldumpvar:
self.dumpdf = pd.DataFrame(self.dumplist)
del self.dumplist
self.dumpdf.columns = ['fair', 'per', 'iteration']+self.dump
if fair_max_iterations <= 2:
self.dumpdf.drop('fair', axis=1, inplace=True)
outdf = pd.DataFrame(values, index=databank.index,
columns=databank.columns)
if stats:
numberfloats = self.calculate_freq[-1][1]*ittotal
diff_numberfloats = self.newton_diff_stack.diff_model.calculate_freq[-1][-1]*len(
self.current_per)*diffcount
endtime = time.time()
self.simtime = endtime-starttime
self.setuptime = endtimesetup - starttimesetup
print(
f'Setup time (seconds) :{self.setuptime:>15,.4f}')
print(
f'Total model evaluations :{ittotal:>15,}')
print(
f'Number of solver update :{diffcount:>15,}')
print(
f'Simulation time (seconds) :{self.simtime:>15,.4f}')
print(
f'Floating point operations in model : {numberfloats:>15,}')
print(
f'Floating point operations in jacobi model : {diff_numberfloats:>15,}')
if not silent:
print(self.name + ' solved ')
return outdf
[docs]
def res(self, databank, start='', end='', debug=False, timeit=False, silent=False,
chunk=None, ljit=0, stringjit=False, transpile_reset=False, alfa=1, stats=0, samedata=False, **kwargs):
'''calculates the result of a model, no iteration or interaction
The text for the evaluater function is placed in the model property **make_res_text**
where it can be inspected
in case of problems.
'''
# print('new nwwton')
starttimesetup = time.time()
sol_periode = self.smpl(start, end, databank)
# breakpoint()
self.check_sim_smpl(databank)
# fill all Missing value with 0.0
databank = insertModelVar(databank, self)
newdata, databank = self.is_newdata(databank)
self.prores2d, self.solveres2d, self.epires2d = self.makelos(databank, solvename='res',
ljit=ljit, stringjit=stringjit, transpile_reset=transpile_reset, chunk=chunk, newdata=newdata)
values = databank.values.copy()
outvalues = values.copy()
res_col = [databank.columns.get_loc(c) for c in self.solveorder]
self.genrcolumns = databank.columns.copy()
self.genrindex = databank.index.copy()
endtimesetup = time.time()
starttime = time.time()
self.stackrows = [databank.index.get_loc(p) for p in sol_periode]
with self.timer(f'\nres calculation', timeit) as xxtt:
for row in self.stackrows:
self.periode = databank.index[row]
self.prores2d(values, outvalues, row, alfa)
self.solveres2d(values, outvalues, row, alfa)
self.epires2d(values, outvalues, row, alfa)
outdf = pd.DataFrame(outvalues, index=databank.index,
columns=databank.columns)
if stats:
numberfloats = self.calculate_freq[-1][1]*len(self.stackrows)
endtime = time.time()
self.simtime = endtime-starttime
self.setuptime = endtimesetup - starttimesetup
print(
f'Setup time (seconds) :{self.setuptime:>15,.2f}')
print(f'Foating point operations :{numberfloats:>15,}')
print(
f'Simulation time (seconds) :{self.simtime:>15,.2f}')
if not silent:
print(self.name + ' calculated ')
return outdf
[docs]
def invert(self, databank, targets, instruments, silent=1,
defaultimpuls=0.01, defaultconv=0.001, nonlin=False, maxiter=30, delay=0,varimpulse=False,progressbar = True,
debug = False):
'''
Solves a target instrument problem
Number of targets should be equal to number of instruments
An instrument can comprice of severeral variables
**Instruments** are inputtet as a list of instruments
To calculate the jacobian each instrument variable has a impuls,
which is used as delta when evaluating the jacobi matrix::
[ 'QO_J','TG'] Simple list each variable are shocked by the default impulse
[ ('QO_J',0.5), 'TG'] Here QO_J is getting its own impuls (0.5)
[ [('QO_J',0.5),('ORLOV',1.)] , ('TG',0.01)] here an impuls is given for each variable, and the first instrument consiste of two variables
**Targets** are list of variables
Convergence is achieved when all targets are within convergens distance from the target value
Convergencedistance can be set individual for a target variable by setting a value in <modelinstance>.targetconv
Targets and target values are provided by a dataframe.
Parameters
----------
databank : TYPE
values to run on .
targets : TYPE
dataframe with a column for each target.
instruments : TYPE
list of instruments .
model : TYPE
the model to use .
DefaultImpuls : TYPE, optional
default delta . The default is 0.01.
defaultconv : TYPE, optional
default convergence . The default is 0.01.
delay : TYPE, optional
delay in effects . The default is 0.
nonlin : TYPE, optional
if a number the number of iterations to trigger recalculation of jacobi. The default is False.
silent : TYPE, optional
show iterations if false. The default is True.
maxiter : TYPE, optional
max newton iteration. The default is 30.
solveopt : TYPE, optional
options to bring to the solver. The default is {}.
varimpulse : TYPE, optional
if True only update the current period, else update into the future. The default is False.
Returns
-------
a dataframe with the result .
'''
from modelinvert import targets_instruments
t_i = targets_instruments(databank, targets, instruments, self, silent=silent,delay=delay,varimpulse=varimpulse,
defaultimpuls=defaultimpuls, defaultconv=defaultconv, nonlin=nonlin, maxiter=maxiter,
progressbar=progressbar )
self.t_i = t_i
self.t_i.debug = debug
res = t_i(delay=delay)
return res
[docs]
def errfunk1d(self, a, linenr, overhead=4, overeq=0):
''' Handle errors in sim1d '''
self.saveeval3(self.values_, self.row_, a)
self.errfunk(self.values_, linenr, overhead, overeq)
[docs]
def errfunk(self, values, linenr, overhead=4, overeq=0):
''' developement function
to handle run time errors in model calculations'''
# winsound.Beep(500,1000)
self.errdump = pd.DataFrame(
values, columns=self.genrcolumns, index=self.genrindex)
self.lastdf = self.errdump
print('>> Error in :', self.name)
print('>> In :', self.periode)
if 0:
print('>> Linenr :', linenr)
print('>> Overhead :', overhead)
print('>> over eq :', overeq)
varposition = linenr-overhead - 1 + overeq
print('>> varposition :', varposition)
errvar = self.solveorder[varposition]
outeq = self.allvar[errvar]['frml']
print('>> Equation :', outeq)
print('A snapshot of the data at the error point is at .errdump ')
print('Also the .lastdf contains .errdump, for inspecting ')
self.print_eq_values(errvar, self.errdump, per=[self.periode])
if hasattr(self, 'dumplist'):
self.dumpdf = pd.DataFrame(self.dumplist)
del self.dumplist
self.dumpdf.columns = ['fair', 'per', 'iteration']+self.dump
pass
[docs]
def show_iterations(self, pat='*', per='', last=0, change=False,top=.9):
'''
shows the last iterations for the most recent simulation.
iterations are dumped if ldumpvar is set to True
variables can be selceted by: dumpvar = '<pattern>'
Args:
pat (TYPE, optional): Variables for which to show iterations . Defaults to '*'.
per (TYPE, optional): The time frame for which to show iterations, Defaults to the last projection .
last (TYPE, optional): Only show the last iterations . Defaults to 0.
change (TYPE, optional): show the changes from iteration to iterations instead of the levels. Defaults to False.
top (float,optional): top of chartss between 1 and 0
Returns:
fig (TYPE): DESCRIPTION.
'''
try:
if per == '':
per_ = self.current_per[-1]
else:
relevantindex = pd.Index(self.dumpdf['per']).unique()
per_ = relevantindex[relevantindex.get_loc(per)]
# breakpoint()
indf = self.dumpdf.query('per == @per_')
out0 = indf.query('per == @per_').set_index('iteration',
drop=True).drop('per', axis=1).copy()
out = out0.diff() if change else out0
varnames = self.list_names(out.columns, pat)
number = len(varnames)
# breakpoint()
tchange = 'delta pr iteration ' if change else ' levels '
if last:
axes = out.iloc[last:-1, :].loc[:, varnames].rename(columns=self.var_description).plot(kind='line', subplots=True, layout=(number, 1), figsize=(10, number*3),
use_index=True, title=f'Iterations in {per_} {tchange}', sharey=0)
else:
axes = out[varnames].rename(columns=self.var_description).plot(kind='line', subplots=True, layout=(number, 1), figsize=(10, number*3),
use_index=True, title=f'Iterations in {per_} {tchange}', sharey=0)
fig = axes.flatten()[0].get_figure()
fig.set_constrained_layout(True)
# fig.tight_layout()
# fig.subplots_adjust(top=top)
return fig
except:
print('No iteration dump')
# try:
# from modeldashsidebar import Dash_Mixin
# except:
# ...
# print('The DASH dashboard is not loaded')
# class Dash_Mixin:
# ''' a dummy class if the dash dependencies are not installed.
# remember:
# conda install pip --y
# pip install dash_interactive_graphviz
# '''
# ...
[docs]
class Dash_Mixin():
'''This mixin wraps call the Dash dashboard '''
[docs]
def modeldash(self,var,**kwargs):
try:
from modeldashsidebar import Dash_graph
...
out = Dash_graph(self,var.upper() ,**kwargs)
return out
except Exception as e:
print(f'No Dash, {str(e)}')
return None
[docs]
class Fix_Mixin():
'''This mixin handles a number of world bank enhancements '''
@cached_property
def wb_behavioral(self):
''' returns endogeneous where the frml name contains a Z which signals a stocastic equation
'''
out1 = {vx for vx in self.endogene if 'Z' in self.allvar[vx]['frmlname']}
alt = {vx for vx in self.endogene if ('EXO' in self.allvar[vx]['frmlname'] or 'FIXABLE' in self.allvar[vx]['frmlname'] or 'STOC' in self.allvar[vx]['frmlname'])
and vx+'_X' in self.exogene and vx+'_D' in self.exogene }
# breakpoint()
# assert out1 == alt, 'Problems wih wb behavioral '
return alt
@cached_property
def wb_ident(self):
'''returns endogeneous variables not in wb_behavioral '''
return self.endogene-self.wb_behavioral
[docs]
def fix(self,df,pat='*',start='',end='',silent=0):
'''
Fixes variables to the current values.
for variables where the equation looks like::
var = (rhs)*(1-var_d)+var_x*var_d
The values in the smpl set by *start* and *end* will be set to::
var_x = var
var_d = 1
The variables fulfilling this are elements of .self.fix_endo
Args:
df (TYPE): Input dataframe should contain a solution and all variables ..
pat (TYPE, optional): Select variables to endogenize. Defaults to '*'.
start (TYPE, optional): start periode. Defaults to ''.
end (TYPE, optional): end periode. Defaults to ''.
Returns:
dataframe (TYPE): the resulting daaframe .
'''
'''Fix all variables which can be exogenized to their value '''
dataframe=df.copy()
beh = sorted(self.fix_endo )
selected = [v for up in pat.split() for v in fnmatch.filter(beh, up.upper())]
exo = [v+'_X' for v in selected ]
dummy = [v+'_D' for v in selected ]
# breakpoint()
with self.set_smpl(start,end,df=dataframe):
dataframe.loc[self.current_per,dummy] = 1.0
selected_values = dataframe.loc[self.current_per,selected]
selected_values.columns = exo
# breakpoint()
dataframe.loc[self.current_per,exo] = selected_values.loc[self.current_per,exo]
if not silent:
print('The folowing variables are fixed')
print(*selected,sep='\n')
return dataframe
[docs]
def unfix(self,df,pat='*',start='',end=''):
'''
Unfix (endogenize) variables
Args:
df (Dataframe): Input dataframe, should contain a solution and all variables .
pat (string, optional): Select variables to endogenize. Defaults to '*'.
start (TYPE, optional): start periode. Defaults to ''.
end (TYPE, optional): end periode. Defaults to ''.
Returns:
dataframe (TYPE): A dratframe with all dummies for the selected variablse set to 0 .
'''
dataframe=df.copy()
beh = sorted(self.fix_endo )
selected = [v for up in pat.split() for v in fnmatch.filter(beh, up.upper())]
dummy = [v+'_D' for v in selected ]
with self.set_smpl(start,end):
dataframe.loc[self.current_per,dummy] = 0.0
return dataframe
[docs]
def find_fix_dummy_fixed(self,df=None):
''' returns names of actiove exogenizing dummies
sets the property self. exodummy_per which defines the time over which the dummies are defined'''
dmat = df.loc[self.current_per,self.fix_dummy].T
dmatset = dmat.loc[(dmat != 0.0).any(axis=1), (dmat != 0.0).any(axis=0)]
# breakpoint()
dummyselected = list(dmatset.index)
if len(dmatset.columns):
start = dmatset.columns[0]
end = dmatset.columns[-1]
# breakpoint()
per1,per2 = df.index.slice_locs(start, end)
self.fix_dummy_per = self.lastdf.index[per1:per2]
# print(self.exodummy_per)
return dummyselected
else:
return []
@property
def fix_dummy_fixed(self):
''' returns names of actiove exogenizing dummies
sets the property self. exodummy_per which defines the time over which the dummies are defined'''
return self.find_fix_dummy_fixed(self.lastdf)
@property
def fix_dummy_fixed_old(self):
''' returns names of actiove exogenizing dummies
sets the property self. exodummy_per which defines the time over which the dummies are defined'''
dmat = self.lastdf.loc[self.current_per,self.fix_dummy].T
dmatset = dmat.loc[(dmat != 0.0).any(axis=1), (dmat != 0.0).any(axis=0)]
# breakpoint()
dummyselected = list(dmatset.index)
if len(dmatset.columns):
start = dmatset.columns[0]
end = dmatset.columns[-1]
# breakpoint()
per1,per2 = self.lastdf.index.slice_locs(start, end)
self.fix_dummy_per = self.lastdf.index[per1:per2]
# print(self.exodummy_per)
return dummyselected
else:
return []
@property
def fix_add_factor_fixed(self):
'''Returns the add factors corrosponding to the active exogenizing dummies'''
return [v[:-2]+'_A' for v in self.fix_dummy_fixed if v[:-2]+'_A' in self.exogene]
@property
def fix_value_fixed(self):
'''Returns the exogenizing values corrosponding to the active exogenizing dummies'''
return [v[:-2]+'_X' for v in self.fix_dummy_fixed]
@property
def fix_endo_fixed(self):
'''Returns the endogeneous variables corrosponding to the active exogenizing dummies'''
return [v[:-2] for v in self.fix_dummy_fixed]
[docs]
def fix_inf(self,df=None):
''' Display information regarding exogenizing
'''
thisdf = self.lastdf if type(df) == type(None) else df
if not len(self.fix_dummy_fixed) :
raise Exception('No active exogenixed variables ')
if len(self.fix_add_factor_fixed):
varnameslist = zip(self.fix_endo_fixed,self.fix_value_fixed,self.fix_dummy_fixed,self.fix_add_factor_fixed)
else:
varnameslist = zip(self.fix_endo_fixed,self.fix_value_fixed,self.fix_dummy_fixed)
for varnames in varnameslist:
out = thisdf.loc[self.fix_dummy_per,varnames]
out.style.set_caption(self.var_description[varnames[0]])
print(f'\n{self.var_description[varnames[0]]}')
print(f'\n{self.allvar[varnames[0]]["frml"]}')
try:
print(f'\n{self.calc_add_factor_model.allvar[varnames[3]]["frml"]}')
except:
...
try:
res = out.map(lambda value: " " if value == 0.0 else " 1 " if value == 1.0 else value )
except:
res = out.applymap(lambda value: " " if value == 0.0 else " 1 " if value == 1.0 else value )
display(self.ibsstyle(res) )
[docs]
class Stability_Mixin():
[docs]
def get_newton(self,**kwargs):
res = newton_diff(self,**kwargs)
return res
[docs]
def get_eigenvalues(self,forcenum = False, silent = True ,dropvar=None,progressbar = False):
if not hasattr(self,'stability_newton') or not hasattr(self.stability_newton,'eigen_values_and_vectors'):
print('Finding eigenvalues and vectors')
self.stability_newton = newton_diff(self,forcenum = forcenum, silent = silent )
self.eigenvalues =self.stability_newton.get_eigenvalues(silent = silent,progressbar=progressbar)
else:
# print('Eigenvalues and vectors already avaiable')
self.eigenvalues = self.stability_newton.eig_dic
return self.eigenvalues
[docs]
def get_eigenvectors(self,forcenum = False, silent = True ,dropvar=None,progressbar = True):
_ = self.get_eigenvalues(forcenum = forcenum, silent =silent ,dropvar=dropvar, progressbar = progressbar)
return self.stability_newton.get_df_eigen_dict()
[docs]
def eigenvalues_show(self,*args,**kwargs):
vectors = self.get_eigenvectors(*args,**kwargs)
self.stability_newton.eigenvalues_show(vectors)
[docs]
def eigenvalues_plot(self,periode=None,size=(4,3),maxfig=6,progressbar=False):
values = self.get_eigenvalues(progressbar=progressbar)
return self.stability_newton.eigplot_all(values,periode=periode,size=size,maxfig=6)
[docs]
def get_eigen_jackknife_df(self,periode=None,maxnames=200_000):
_ = self.get_eigenvalues()
jackdf = self.stability_newton.get_eigen_jackknife_df(periode=periode,maxnames=maxnames)
return jackdf
[docs]
def jack_largest_reduction(self,jackdf, eigenvalue_row=0, periode=None,imag_only=False):
return self.stability_newton.jack_largest_reduction(jackdf, eigenvalue_row=eigenvalue_row, periode=periode,imag_only=imag_only )
[docs]
def jack_largest_reduction_plot(self,jackdf,eigenvalue_row=0,periode=None,imag_only=False):
self.stability_newton.jack_largest_reduction_plot(jackdf, eigenvalue_row=eigenvalue_row, periode=periode,imag_only=imag_only )
Stability_Mixin.eigenvalues_plot.__doc__ = newton_diff.eigplot_all.__doc__
Stability_Mixin.eigenvalues_show.__doc__ = newton_diff.eigenvalues_show.__doc__
Stability_Mixin.jack_largest_reduction.__doc__ = newton_diff.jack_largest_reduction.__doc__
Stability_Mixin.jack_largest_reduction_plot.__doc__ = newton_diff.jack_largest_reduction_plot.__doc__
[docs]
class Report_Mixin:
[docs]
def table(self, pat='#Headline',title='Table',datatype='growth',col_desc='',custom_description = {},dec=2,heading='',mul=1.0,**kwargs):
"""
Generates a table display configuration based on specified parameters and data types, including dynamic
adjustments of display options using both standard and keyword arguments.
Parameters:
pat (str): Pattern or identifier used to select data for the line, defaulting to '#Headline'.
title (str): Title of the table, passed directly to Options, defaulting to 'Table'.
datatype (str): Type of data transformation to apply (e.g., 'growth', 'level'), defaulting to 'growth'.
custom_description (dict): Custom descriptions to augment or override default descriptions, empty by default.
dec (int): Number of decimal places for numerical output, passed directly to Line configuration, defaulting to 2.
heading (str): Optional heading line for the table, empty by default.
name (str): Name for the display, defaults to 'A_small_table'.
foot (str): Footer text.
rename (bool): Allows renaming of data columns
decorate (bool): Decorates row descriptions based on the showtype, defaulting to False.
width (int): Specifies the width for formatting output in characters, efaulting to 5.
chunk_size (int): Number of columns per chunk in the display output, defaulting to 0.
timeslice (List[int]): Time slice for data display, empty by default.
max_cols (int): Maximum columns when displayed as a string, faulting to the system wide setting.
last_cols (int): Specifies the number of last columns to include in a display slice, particularly in Latex.
col_desc (str): text centered on columns
Returns:
DisplayVarTableDef: Configured table definition object ready for rendering, which includes detailed specifications
such as units and type of transformation based on the datatype.
"""
from modelreport import DisplayVarTableDef, DisplayDef, LatexRepo, DatatypeAccessor
from modelreport import Line, Options,DisplaySpec,DisplayFigWrapDef
config = DatatypeAccessor(datatype)
this_col_desc = col_desc if col_desc else config.col_desc
headingline = [Line(textlinetype='textline',centertext=heading)] if heading else []
unitline = [] if this_col_desc.strip() == '' else [ Line(textlinetype='textline',centertext=f'--- {this_col_desc} ---')]
tabspec = DisplaySpec(
options = Options(decorate=False,name='A_small_table',
custom_description=custom_description,title =title,width=5) + kwargs,
lines = headingline + unitline + [
Line(datatype=datatype ,pat=pat,dec=dec, mul=mul ) ,
]
)
tab = DisplayVarTableDef (mmodel=self, spec = tabspec)
return tab
[docs]
def plot(self, pat='#Headline',title='',datatype='growth',custom_description = {},by_var = True,mul=1.0 ,
ax_title_template='',**kwargs):
"""
Generates a table display configuration based on specified parameters and data types, including dynamic
adjustments of display options using both standard and keyword arguments.
Parameters:
pat (str): Pattern or identifier used to select data for the line, defaulting to '#Headline'.
datatype (str): Type of data transformation to apply (e.g., 'growth', 'level'), defaulting to 'growth'.
title (str): Title of the table, passed directly to Options, defaulting to 'Table'.
custom_description (dict): Custom descriptions to augment or override default descriptions, empty by default.
ncol (int):
Returns:
DisplayVarTableDef: Configured table definition object ready for rendering, which includes detailed specifications
such as units and type of transformation based on the datatype.
"""
from modelreport import DisplayVarTableDef, DisplayDef, LatexRepo, DatatypeAccessor
from modelreport import Line, Options,DisplaySpec,DisplayFigWrapDef,DisplayKeepFigDef
config = DatatypeAccessor(datatype, **kwargs)
# print(config.showtype,config.diftype,config.ax_title_template_df)
figspec = DisplaySpec(
options = Options(decorate=False,name='A_plot',
custom_description=custom_description,title =title,width=5) + kwargs,
lines = [Line(pat=pat, datatype=datatype,
by_var = by_var,mul=mul,ax_title_template=ax_title_template) ,
]
)
figs = DisplayKeepFigDef (mmodel=self, spec = figspec)
return figs
[docs]
def text(self,input= ''):
from modelreport import get_DisplayTextDef
return get_DisplayTextDef(input)
[docs]
def report_from_spec(self,json_str):
from modelreport import create_instance_from_json
return create_instance_from_json(self,json_str)
[docs]
def add_report(self,a_DisplayDef_list,*args):
this_DisplayDef_list = a_DisplayDef_list if type(a_DisplayDef_list) ==list else [a_DisplayDef_list]
this_DisplayDef_list = this_DisplayDef_list + list(args)
new_reports = {r.name:r.save_spec for r in this_DisplayDef_list }
self.reports = self.reports | new_reports
print(f'Reports added to report repo: {", ".join(n for n in new_reports.keys() )}')
[docs]
def get_report(self,key):
if not key in self.reports.keys():
print(f'No stored report with the name {key}')
print(f'Possible values: {", ".join(n for n in self.reports.keys() )}')
raise Exception('Wrong report key try again')
else:
out = self.report_from_spec(self.reports[key])
return out
[docs]
class model(Zip_Mixin, Json_Mixin, Model_help_Mixin, Solver_Mixin, Display_Mixin, Graph_Draw_Mixin, Graph_Mixin,
Dekomp_Mixin, Org_model_Mixin, BaseModel, Description_Mixin, Excel_Mixin, Dash_Mixin, Modify_Mixin,
Fix_Mixin,Stability_Mixin,Report_Mixin):
'''This is the main model definition'''
pass
# Functions used in calculating
# wrapper'
if not hasattr(pd.DataFrame,'upd'):
[docs]
@pd.api.extensions.register_dataframe_accessor("upd")
class upd():
'''Extend a dataframe to update variables from string
look at :any:`Model_help_Mixin.update` for syntax
'''
def __init__(self, pandas_obj):
# self._validate(pandas_obj)
self._obj = pandas_obj
self.__doc__ = model.update.__doc__
# print(self._obj)
[docs]
def __call__(self,updates, lprint=False,scale = 1.0,create=True,keep_growth=False,):
indf = self._obj
result = model.update(indf, updates=updates, lprint=lprint,scale = scale,create=create,keep_growth=keep_growth,)
return result
node = namedtuple('node', 'lev,parent,child')
'''A named tuple used when to drawing the logical structure. Describes an edge of the dependency graph
:lev: Level from start
:parent: The parent
:child: The child
'''
[docs]
def ttimer(*args, **kwargs):
return model.timer(*args, **kwargs)
[docs]
def create_model(navn, hist=0, name='', new=True, finished=False, xmodel=model, straight=False, funks=[]):
'''Creates either a model instance or a model and a historic model from formulars. \n
The formulars can be in a string or in af file withe the extension .txt
if:
:navn: The model as text or as a file with extension .txt
:name: Name of the model
:new: If True, ! used for comments, else () can also be used. False should be avoided, only for old PCIM models.
:hist: If True, a model with calculations of historic value is also created
:xmodel: The model class used for creating model the model instance. Can be used to create models with model subclasses
:finished: If True, the model exploder is not used.
:straight: If True, the formula sequence in the model will be used.
:funks: A list of user defined funktions used in the model
'''
shortname = navn[0:-
4] if '.txt' in navn else name if name else 'indtastet_udtryk'
modeltext = open(navn).read().upper() if '.txt' in navn else navn.upper()
modeltext = modeltext if new else re.sub(r'\(\)', '!', modeltext)
udrullet = modeltext if finished else mp.explode(modeltext, funks=funks)
pt.check_syntax_model(udrullet)
if hist:
hmodel = mp.find_hist_model(udrullet)
pt.check_syntax_model(hmodel)
mp.modelprint(hmodel, 'Historic calculations ' + shortname,
udfil=shortname + '_hist.fru', short=False)
return xmodel(udrullet, shortname, straight=straight, funks=funks), xmodel(hmodel, shortname + '_hist', straight=straight, funks=funks)
else:
return xmodel(udrullet, shortname, straight=straight, funks=funks)
[docs]
def get_a_value(df, per, var, lag=0):
''' returns a value for row=p+lag, column = var
to take care of non additive row index'''
return df.iat[df.index.get_loc(per)+lag, df.columns.get_loc(var)]
[docs]
def set_a_value(df, per, var, lag=0, value=np.nan):
''' Sets a value for row=p+lag, column = var
to take care of non additive row index'''
df.iat[df.index.get_loc(per)+lag, df.columns.get_loc(var)] = value
[docs]
def insertModelVar(dataframe, model=None):
"""Inserts all variables from model, not already in the dataframe.
Model can be a list of models """
if isinstance(model, list):
imodel = model
else:
imodel = [model]
myList = []
for item in imodel:
myList.extend(item.allvar.keys())
manglervars = list(set(myList)-set(dataframe.columns))
if len(manglervars):
extradf = pd.DataFrame(0.0, index=dataframe.index,
columns=manglervars).astype('float64')
data = pd.concat([dataframe, extradf], axis=1)
return data
else:
return dataframe
[docs]
def lineout(vek, pre='', w=20, d=0, pw=20, endline='\n'):
''' Utility to return formated string of vector '''
fvek = [float(v) for v in vek]
return f'{pre:<{pw}} ' + " ".join([f'{f:{w},.{d}f}' for f in fvek])+endline
# print(lineout([1,2,300000000],'Forspalte',pw=10))
[docs]
def dfout(df, pre='', w=2, d=0, pw=0):
pw2 = pw if pw else max([len(str(i)) for i in df.index])
return ''.join([lineout(row, index, w, d, pw2) for index, row in df.iterrows()])
# print(dfout(df.iloc[:,:4],w=10,d=2,pw=10))
[docs]
def upddfold(base, upd):
''' takes two dataframes. The values from upd is inserted into base '''
rows = [(i, base.index.get_loc(ind)) for (i, ind) in enumerate(upd.index)]
locb = {v: i for i, v in enumerate(base.columns)}
cols = [(i, locb[v]) for i, v in enumerate(upd.columns)]
for cu, cb in cols:
for ru, rb in rows:
t = upd.get_value(ru, cu, takeable=True)
base.values[rb, cb] = t
# base.set_value(rb,cb,t,takeable=True)
return base
[docs]
def upddf(base, upd):
''' takes two dataframes. The values from upd is inserted into base '''
rows = [(i, base.index.get_loc(ind)) for (i, ind) in enumerate(upd.index)]
locb = {v: i for i, v in enumerate(base.columns)}
cols = [(i, locb[v]) for i, v in enumerate(upd.columns)]
for cu, cb in cols:
for ru, rb in rows:
t = upd.values[ru, cu]
base.values[rb, cb] = t
return base
[docs]
def randomdf(df, row=False, col=False, same=False, ran=False, cpre='C', rpre='R'):
''' Randomize and rename the rows and columns of a dataframe, keep the values right:
:ran: If True randomize, if False don't randomize
:col: The columns are renamed and randdomized
:row: The rows are renamed and randdomized
:same: The row and column index are renamed and randdomized the same way
:cpre: Column name prefix
:rpre: Row name prefix
'''
from random import sample
from math import log10
dfout = df.copy(deep=True)
if ran:
if col or same:
colnames = dfout.columns
dec = str(1+int(log10(len(colnames))))
rancol = sample(list(colnames), k=len(colnames))
coldic = {c: cpre+('{0:0'+dec+'d}').format(i)
for i, c in enumerate(rancol)}
dfout = dfout.rename(columns=coldic).sort_index(axis=1)
if same:
dfout = dfout.rename(index=coldic).sort_index(axis=0)
if row and not same:
rownames = dfout.index.tolist()
dec = str(1+int(log10(len(rownames))))
ranrow = sample(list(rownames), k=len(rownames))
rowdic = {c: rpre+('{0:0'+dec+'d}').format(i)
for i, c in enumerate(ranrow)}
dfout = dfout.rename(index=rowdic).sort_index(axis=0)
return dfout
[docs]
def join_name_lag(df):
'''creates a new dataframe where the name and lag from multiindex is joined
as input a dataframe where name and lag are two levels in multiindex
'''
def xl(x): return f"({x[1]})" if x[1] else ""
newindex = [f'{i[0]}{xl(i)}' for i in zip(
df.index.get_level_values(0), df.index.get_level_values(1))]
newdf = df.copy()
newdf.index = newindex
return newdf
[docs]
@contextmanager
def timer_old(input='test', show=True, short=False):
'''
does not catch exceptrions use model.timer
A timer context manager, implemented using a
generator function. This one will report time even if an exception occurs"""
Parameters
----------
input : string, optional
a name. The default is 'test'.
show : bool, optional
show the results. The default is True.
short : bool, optional
. The default is False.
Returns
-------
None.
'''
start = time.time()
if show and not short:
print(f'{input} started at : {time.strftime("%H:%M:%S"):>{15}} ')
try:
yield
finally:
if show:
end = time.time()
seconds = (end - start)
minutes = seconds/60.
if minutes < 2.:
afterdec = '1' if seconds >= 10 else (
'3' if seconds >= 1 else '10')
print(f'{input} took : {seconds:>{15},.{afterdec}f} Seconds')
else:
afterdec = '1' if minutes >= 10 else '4'
print(f'{input} took : {minutes:>{15},.{afterdec}f} Minutes')
import modelmf
if __name__ == '__main__':
#%% test
smallmodel = '''
frml <> a = c + b $
frml <> d1 = x + 3 * a(-1)+ c **2 +a $
frml <> d3 = x + 3 * a(-1)+c **3 $
Frml <> xx = 0.5 * c +a$
frml <CALC_ADJUST> a_a = a-(c+b)$
frml <CALC_ADJUST> b_a = a-(c+b)$'''
des = {'A': 'Bruttonationalprodukt i faste priser',
'X': 'Eksport <æøåÆØÅ>;',
'C': 'Forbrug'}
mmodel = model(smallmodel, var_description=des, svg=1, browser=1)
df = pd.DataFrame(
{'X': [0.2, 0.2, 0.2], 'C': [2.0, 3., 5.],'R': [2., 1., 0.4],
'PPP': [0.4, 0., 0.4]})
df2 = df.copy()
df2.loc[:,'C'] = [5, 10, 20]
xx = mmodel(df)
yy = mmodel(df2)
# zz = model.modelload(r'https://raw.githubusercontent.com/IbHansen/Modelflow2/master/Examples/ADAM/baseline.pcim')
mpak,baseline = model.modelload(r'pak.pcim',run=1,use_fbmin=True)
alternative = baseline.upd("<2020 2100> PAKGGREVCO2CER PAKGGREVCO2GER PAKGGREVCO2OER = 30")
result = mpak(alternative,2020,2100,keep='Carbon tax nominal 30',silent=0) # simulates the model
diff_level, att_level, att_pct, diff_growth, att_growth = tup = mpak.dekomp('PAKNECONOTHRXN',lprint=False,start=2020,end=2027)
fb_nodes,dag_nodes = mpak.get_minimal_feedback_set(mpak.endograph)
print(f'Number of endogenous: {len(mpak.endogene)}, Number of variables in simultaneuous block {len(mpak.coreorder)}')
print(f'Number of variables in "minimum" feedback set: {len(fb_nodes)}')
print(f'Number of variables in "DAG" graph : {len(dag_nodes)}')
# mtest,bk = model.modelload(r'ibstestnozip')