"""
Created on Tue Mar 7 10:38:28 2017
@author: hanseni
utilities for Stampe models
"""
import networkx as nx
import pandas as pd
import numpy as np
import time
from contextlib import contextmanager
import sys
import itertools
import operator as op
[docs]
def update_var(databank,xvar,operator='=',inputval=0,start='',end='',create=1, lprint=False,scale=1.0):
r"""Updates a variable in the databank. Possible update choices are:
\n \= : val = inputval
\n \+ : val = val + inputval
\n \- : val = val - inputval
\n \* : val = val * inputval
\n \=growth : val = val(t-1)+inputval +
\n \% : val = val(1+inputval/100)
\n
\n scale scales the input variables default =1.0
"""
var = xvar.upper()
if var not in databank:
if not create:
errmsg = f'Variable to update not found:{var}, timespan = [{start} {end}] \nSet create=True if you want the variable created: '
raise Exception(errmsg)
else:
if 0:
print('Variable not in databank, created ',var)
databank[var]=0.0
current_per = databank.index[databank.index.get_slice_bound(start,'left')
:databank.index.get_slice_bound(end,'right')]
if operator.upper() == '+GROWTH':
if databank.index.get_loc(start) == 0:
raise Exception(f"+growth update can't start at first row, var:{var}")
orgdata=pd.Series(databank.pct_change().loc[current_per,var]).copy(deep=True)
...
else:
orgdata=pd.Series(databank.loc[current_per,var]).copy(deep=True)
antalper=len(current_per)
# breakpoint()
if isinstance(inputval,float) or isinstance(inputval,int) :
inputliste=[float(inputval)]
elif isinstance(inputval,str):
inputliste=[float(i) for i in inputval.split()]
elif isinstance(inputval,list):
inputliste= [float(i) for i in inputval]
elif isinstance(inputval, pd.Series):
# inputliste= inputval.base
inputliste= list(inputval) #Ib for at håndtere mulitindex serier
else:
print('Fejl i inputdata',type(inputval))
inputdata=inputliste*antalper if len(inputliste) == 1 else inputliste
if len(inputdata) != antalper :
print('** Error, There should be',antalper,'values. There is:',len(inputdata))
print('** Update =',var,'Data=',inputdata,start,end)
raise Exception('wrong number of datapoints')
else:
inputserie=pd.Series(inputdata,current_per)*scale
# print(' Variabel------>',var)
# print( databank[var])
if operator=='=': #changes value to input value
outputserie=inputserie
elif operator == '+':
outputserie=orgdata+inputserie
elif operator == '*':
outputserie=orgdata*inputserie
elif operator == '%':
outputserie=orgdata*(1.0+inputserie/100.0)
elif operator == '=DIFF': # data=data(-1)+inputdata
if databank.index.get_loc(start) == 0:
raise Exception(f"=diff update can't start at first row, var:{var}")
ilocrow =databank.index.get_loc(start)-1
iloccol = databank.columns.get_loc(var)
temp=databank.iloc[ilocrow,iloccol]
addon = list(itertools.accumulate([i for i in inputserie],op.add))
# print(f'{addon=}')
opdater=[temp+add for add in addon]
outputserie=pd.Series(opdater,current_per)
elif operator.upper() == '=GROWTH': # data=data(-1)+inputdata
if databank.index.get_loc(start) == 0:
raise Exception(f"=growth update can't start at first row, var:{var}")
ilocrow =databank.index.get_loc(start)-1
iloccol = databank.columns.get_loc(var)
temp=databank.iloc[ilocrow,iloccol]
factor = list(itertools.accumulate([(1+i/100) for i in inputserie],op.mul))
opdater=[temp * it for it in factor]
outputserie=pd.Series(opdater,current_per)
elif operator.upper() == '+GROWTH': # data=data(-1)+inputdata
if databank.index.get_loc(start) == 0:
raise Exception(f"+growth update can't start at first row, var:{var}")
ilocrow =databank.index.get_loc(start)-1
iloccol = databank.columns.get_loc(var)
temp=databank.iloc[ilocrow,iloccol]
factor = list(itertools.accumulate([(1.0+i/100.0+o) for i,o in zip(inputserie,orgdata)],op.mul))
opdater=[temp * it for it in factor]
outputserie=pd.Series(opdater,current_per)
else:
raise Exception(f'Illegal operator in update:{operator} Variable: {var}')
outputserie=pd.Series(np.NaN,current_per)
outputserie.name=var
databank[var] = databank[var].astype('float') # to prevent error in the future
databank.loc[current_per,var]=outputserie
if lprint:
print('Update',operator,inputdata,start,end)
forspalte=str(max(6,len(var)))
print(('{:<'+forspalte+'} {:>20} {:>20} {:>20}').format(var,'Before', 'After', 'Diff'))
newdata=databank.loc[current_per,var]
diff=newdata-orgdata
for i in current_per:
print(('{:<'+forspalte+'} {:>20.4f} {:>20.4f} {:>20.4f}').format(str(i),orgdata[i],newdata[i],diff[i]))
[docs]
def tovarlag(var,lag):
''' creates a stringof var(lag) if lag else just lag '''
if type(lag)==int:
return f'{var}({lag:+})' if lag else var
else:
return f'{var}({lag})' if lag else var
[docs]
def cutout(input,threshold=0.0):
'''get rid of rows below treshold and returns the dataframe or serie '''
if type(input)==pd.DataFrame:
org_sum = input.sum(axis=0)
new = input.iloc[(abs(input) >= threshold).any(axis=1).values,:]
if len(new) < len(input):
new_sum = new.sum(axis=0)
small = org_sum - new_sum
small.name = 'Small'
# breakpoint()
output = pd.concat([new,pd.DataFrame(small).T],axis=0)
else:
output = input
return output
if type(input)==pd.Series:
org_sum = input.sum()
new = input.iloc[(abs(input) >= threshold).values]
if len(new) < len(input):
new_sum = new.sum()
small = pd.Series(org_sum - new_sum)
small.index = ['Small']
output = pd.concat([new,small])
else:
output=input
return output
[docs]
@contextmanager
def ttimer(input='test',show=True,short=False):
'''
A timer context manager, implemented using a
generator function. This one will report time even if an exception occurs"""
Parameters
----------
input : string, optional
a name. The default is 'test'.
show : bool, optional
show the results. The default is True.
short : bool, optional
. The default is False.
Returns
-------
None.
'''
start = time.time()
if show and not short: print(f'{input} started at : {time.strftime("%H:%M:%S"):>{15}} ')
try:
yield
finally:
if show:
end = time.time()
seconds = (end - start)
minutes = seconds/60.
if minutes < 2.:
afterdec='1' if seconds >= 10 else ('3' if seconds >= 1 else '10')
print(f'{input} took : {seconds:>{15},.{afterdec}f} Seconds')
else:
afterdec='1' if minutes >= 10 else '4'
print(f'{input} took : {minutes:>{15},.{afterdec}f} Minutes')
[docs]
def finddec(df):
''' find a suitable number of decimal places from the magnitudes of a dataframe '''
try:
thismax = df.abs().max().max()
except:
thismax = df.abs().max()
if thismax > 1000:
outdec = 0
elif thismax > 10:
outdec = 3
else:
outdec = 6
return outdec
[docs]
def insertModelVar(dataframe, model=None):
"""Inserts all variables from model, not already in the dataframe.
Model can be a list of models """
if isinstance(model,list):
imodel=model
else:
imodel = [model]
myList=[]
for item in imodel:
myList.extend(item.allvar.keys())
manglervars = list(set(myList)-set(dataframe.columns))
if len(manglervars):
extradf = pd.DataFrame(0.0,index=dataframe.index,columns=manglervars).astype('float64')
data = pd.concat([dataframe,extradf],axis=1)
return data
else:
return dataframe
[docs]
def df_extend(df,add=5):
'''Extends a Dataframe, assumes that the indes is of period_range type'''
newindex = pd.period_range(df.index[0], periods=len(df)+add, freq=df.index.freq)
return df.reindex(newindex,method='ffill')
if __name__ == '__main__':
#%% Test
if not 'baseline' in locals() or 1 :
from modelclass import model
madam,baseline = model.modelload('../Examples/ADAM/baseline.pcim',run=1,silent=0,ljit=1,stringjit=0 )
# make a simpel experimet VAT
scenarie = baseline.copy()
scenarie.TG = scenarie.TG + 0.05
_ = madam(scenarie)
update_var(baseline,'tg','=GROWTH',1,2021,2024,lprint=1)