# -*- coding: utf-8 -*-
"""
Created on Thu Sep 21 12:41:10 2017
@author: IBH
Class to handle general target/instrument problems.
Number of targets should be equal to number of instruments
An instrument can comprice of severeral variables
instruments are inputtet as a list of instruments
"""
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from modelhelp import update_var
[docs]
class targets_instruments():
''' Class to handle general target/instrument problems.
Where the response is delayed specify this with delay.
Number of targets should be equal to number of instruments
An instrument can comprice of severeral variables
**Instruments** are inputtet as a list of instruments
To calculate the jacobian each instrument variable has a impuls,
which is used as delta when evaluating the jacobi matrix::
[ 'QO_J','TG'] Simple list each variable are shocked by the default impulse
[ ('QO_J',0.5), 'TG'] Here QO_J is getting its own impuls (0.5)
[ [('QO_J',0.5),('ORLOV',1.)] , ('TG',0.01)] here an impuls is given for each variable, and the first instrument consiste of two variables
**Targets** are list of variables
Convergence is achieved when all targets are within convergens distance from the target value
Convergencedistance can be set individual for a target variable by setting a value in <modelinstance>.targetconv
Targets and target values are provided by a dataframe.
'''
def __init__(self,databank,targets,instruments,model,defaultimpuls=0.01,defaultconv=0.01, delay=0,
nonlin=False,silent = True, maxiter=30,solveopt={},varimpulse=False,progressbar= True):
'''
Parameters
----------
databank : TYPE
values to run on .
targets : TYPE
dataframe with a column for each target.
instruments : TYPE
list of instruments .
model : TYPE
the model to use .
defaultimpuls : TYPE, optional
default delta . The default is 0.01.
defaultconv : TYPE, optional
default convergence . The default is 0.01.
delay : TYPE, optional
delay in effects . The default is 0.
nonlin : TYPE, optional
if a number the number of iterations to trigger recalculation of jacobi. The default is False.
silent : TYPE, optional
show iterations if false. The default is True.
maxiter : TYPE, optional
max newton iteration. The default is 30.
solveopt : TYPE, optional
options to bring to the solver. The default is {}.
varimpulse : TYPE, optional
if True only update the current period, else update into the future. The default is False.
Returns
-------
None.
'''
self.model = model
self.df = model.lastdf
self.targetvars = targets if isinstance(targets,list) else targets.columns
self.targetconv = {t: defaultconv for t in self.targetvars} # make sure there is a convergence criteria ]
self.targets = targets
self.instruments = {}
self.solveopt = {**solveopt, **{'keep':''}}
self.silent = silent
self.debug=False
self.maxiter = maxiter
self.nonlin=nonlin
self.databank = databank.copy()
self.varimpulse = varimpulse
self.progressbar = progressbar
self.savesolvearg = model.oldkwargs if hasattr(model,'oldkwargs') else {}
for inumber,i in enumerate(instruments):
vars = i if isinstance(i,list) else [i] # make it a list even if one variable
xx = [v if isinstance(v,tuple) else (v,defaultimpuls) for v in vars] # make sure there is a impuls
name = ','.join(n for n,i in xx)
name = f'Instrument_{inumber}' if len(name)>500 else name
self.instruments[inumber] = {'name':name , 'vars': xx, }
self.defaultimpuls = defaultimpuls
# breakpoint()
[docs]
def jacobi(self,per,delay=0):
''' Calculates a jecobi matrix of derivatives based on the instruments and targets
returns a dataframe '''
# breakpoint()
iper = self.df.index.get_loc(per)
iper_delayed = iper-delay
per_delayed = self.df.index[iper_delayed]
self.jac = jac=pd.DataFrame(0.000,index=self.targetvars, columns=[v['name'] for v in self.instruments.values()])
with self.model.set_smpl(per_delayed,per):
mul = self.model(self.df,setlast=False, **self.solveopt) # start point for this quarter
basis = mul.copy(deep=True) # make a reference point for the calculation the derivatives.
if not self.silent: print(f'Update jacobi: {per} effects from {per_delayed}')
oldsave = self.model.save
for instrument in self.instruments.values():
# print('instrument: ',instrument['name'])
# set the instrument
for var,impuls in instrument['vars']:
if self.varimpulse:
mul.loc[per_delayed,var] = mul.loc[per_delayed,var] + impuls # increase loan growth
else:
mul.loc[per_delayed:,var] = mul.loc[per_delayed:,var] + impuls # increase loan growth
# calculate the effect
with self.model.set_smpl(per_delayed,per):
res = self.model(mul,save=False,**self.solveopt) #antal=600,first_test=20,ljit=1) # solve model
# breakpoint()
jac.loc[self.targetvars,instrument['name']] = res.loc[per,self.targetvars]-basis.loc[per,self.targetvars] # store difference in original bank
# reset the instrument
for var,impuls in instrument['vars']:
mul.loc[per_delayed:,var]=basis.loc[per_delayed:,var]
self.model.oldkwargs = self.savesolvearg
self.model.save = oldsave
return jac
[docs]
def invjacobi(self,per,diag=False,delay=0):
''' Calculates the inverted jacobi matrix
returns a dataframe '''
x = self.jacobi(per,delay=delay)
# print(x)
if diag:
out = pd.DataFrame(np.diag(1.0/x.values.diagonal()),x.columns,x.index)
else:
out= pd.DataFrame(np.linalg.inv(x),x.columns,x.index)
return out
[docs]
def targetseek(self,databank=None,shortfall=False,ti_damp=1.0,delay=0,**kwargs):
''' Calculates the instruments as a function of targets '''
silent = kwargs.get('silent',self.silent)
self.maxiter = kwargs.get('maxiter',self.maxiter)
self.nonlin = kwargs.get('nonlin',self.nonlin)
progressbar = kwargs.get('progressbar',self.progressbar)
tindex = self.model.current_per.copy()
res = self.databank.copy()
# self.inv = inv = self.invjacobi(self.targets.index[0])
self.inv = inv = self.invjacobi(self.targets.index[0],diag=shortfall,delay=delay)
self.conv = pd.Series([self.targetconv[v] for v in self.targetvars],self.targetvars)
# print(inv)
oldsave = self.model.save
bars = '{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}'
with tqdm(total=len(self.targets.index),disable = not progressbar,desc=f'Finding instruments ',bar_format=bars) as pbar:
for per in self.targets.index:
iper = self.df.index.get_loc(per)
iper_delayed = iper-delay
per_delayed = self.df.index[iper_delayed]
if not silent or self.debug:
print('Period:',per)
res = self.model(res,per_delayed ,per ,save=False, **self.solveopt)
orgdistance = self.targets.loc[per,self.targetvars] - res.loc[per,self.targetvars]
shortfallvar = (shortfall * orgdistance) >= 0
for iterations in range(self.maxiter):
startdistance = self.targets.loc[per,self.targetvars] - res.loc[per,self.targetvars]
self.distance = distance = startdistance*shortfallvar if shortfall else startdistance
if not silent: print('Period:',per,' Target instrument iteration:',iterations,
f' Max distance: {distance.abs().max():.3f}')
if self.debug:
print(f'\nTarget instrument {per}, iteration {iterations}')
print(f'Distance to target :\n{startdistance}\n')
if shortfall:
print(f'Distance to shortfall target :\n{distance}\n')
if (distance.abs()>=self.conv).any():
if self.nonlin:
if type(self.nonlin) == int:
if iterations >= self.nonlin:
if not iterations%self.nonlin:
self.inv = inv = self.invjacobi(per,diag=shortfall,delay=delay)
else:
self.inv = inv = self.invjacobi(per,diag=shortfall,delay=delay)
update = inv.dot(distance) * ti_damp
if self.debug :
print('Update instruments:')
print(update)
for instrument in self.instruments.values():
for var,impuls in instrument['vars']:
if self.varimpulse:
res.loc[per_delayed,var] = res.loc[per_delayed,var] + update[instrument['name']] * impuls # increase loan growth
else:
res.loc[per_delayed:,var] = res.loc[per_delayed:,var] + update[instrument['name']] * impuls # increase loan growth
res = self.model(res,per_delayed ,per ,setlast=False,**self.solveopt)
else:
break
else:
print(f'No convergense in target instrument in {per}, maxiter={self.maxiter}')
raise Exception('No convergence ')
pbar.update()
self.model.lastdf = res
self.model.oldkwargs = self.savesolvearg
self.model.save = oldsave
self.model.current_per = tindex
return res
[docs]
def __call__(self, *args, **kwargs ):
'''Uses :any:`targetseek` '''
return self.targetseek( *args, **kwargs)
#%% running
if __name__ == '__main__':
pass