Source code for modelgrab

# -*- coding: utf-8 -*-
"""
Created on Mon Jun 10 21:11:08 2019

@author: hanseni

    modules to grab models with different specifications and make them ModelFlow conforme 

   **GrabWbModel** will take a eviews model  and transform it to Business logic
  - Create a normalized model, add dampning for the stocastic equations 
  - Add add-factors to the stocastic equations 
  - Generate BL for a model which calculates add-factors so a solution will match teh existing values 
  - Generate BL for the model   

  -grap data from excel sheet 

  - Make model instance for model and add-factor model 

  - Run the model, check that the results match. 

For debuggging valuesthe last part checs value in the order, in which they are calculated,
and then displays the input to off mark equations  
"""

import pandas as pd
import re
from dataclasses import dataclass
import functools
from tqdm import tqdm 
import json 
from pathlib import Path


from modelclass import model 
import modelmf 
import modelmanipulation as mp 

import modelnormalize as nz

assert 1==1


[docs] @dataclass class GrapWbModel(): '''This class takes a world bank model specification, variable data and variable description and transform it to ModelFlow business language''' frml : str = '' # path to model data : str = '' # path to data des : any = '' # path to descriptions scalars : str = '' # path to scalars modelname : str = 'No Name' # modelname start : int = 2017 end : int = 2040 country_trans : any = lambda x:x[:] # function which transform model specification country_df_trans : any = lambda x:x # function which transforms initial dataframe from_wf2 : bool = False make_fitted : bool = False # if True, a clean equation for fittet variables is created fit_start : int =2000 # start of fittet model unless overruled by mfmsa fit_end : int = 2100 # end of fittet model unless overruled by mfmsa do_add_factor_calc : bool = True # calculate the add factors mfmsa : str = '' # path to mfsa options def __post_init__(self): # breakpoint() print(f'\nProcessing the model:{self.modelname}',flush=True) self.rawmodel_org = open(self.frml).read() eviewsline = [l for l in self.rawmodel_org.split('\n') if len(l.strip()) >=2 and not ('*******' in l or '----------' in l)] # breakpoint() self.rawmodel = self.country_trans(self.rawmodel_org) rawmodel6 = self.trans_eviews(self.rawmodel) # breakpoint() bars = '{desc}: {percentage:3.0f}%|{bar}|{n_fmt}/{total_fmt}' if self.from_wf2: orgline = [l for l in rawmodel6.split('\n')] line_type = ['ident' if l.startswith('@IDENTITY') else 'stoc' for l in orgline] line = [l.replace('@IDENTITY ','').replace(' ','') for l in orgline] else: line_type = [] line =[] with tqdm(total=len(rawmodel6.split('\n')),desc='Reading original model',bar_format=bars) as pbar: for l in rawmodel6.split('\n'): if ('*******' in l or '----------' in l) and 'IDEN' in l.upper(): sec='iden' #print(l) elif '*******' in l and 'STOC' in l: sec='stoc' #print(l) else: line_type.append(sec) line.append(l) # print(f' {sec} {l[:30]} ....') pbar.update(1) errline1 = [(l,o,e) for l,o,e in zip(line,orgline,eviewsline) if '@' in l or '.coef' in l ] if len(errline1): print('Probably errors as @ in lines:') for l,o,e in errline1: print(f'\nEviews line :{e}') print(f'Original line :{o}') print(f'New modelflow line:{l}') raise Exception('@ in lines ') self.all_frml = [nz.normal(l,add_add_factor=(typ=='stoc'),make_fitted=(typ=='stoc'),make_fixable =(typ=='stoc')) for l,typ in tqdm(zip(line,line_type),desc='Normalizing model',total=len(line),bar_format=bars)] self.all_frml_dict = {f.endo_var: f for f in self.all_frml} lfname = ["<Z,EXO> " if typ == 'stoc' else '' for typ in line_type ] self.rorg = [fname + f.normalized for f,fname in zip(self.all_frml,lfname) ] if self.make_fitted: self.rfitmodel = ['<FIT> ' + f.fitted for f in self.all_frml if len(f.fitted)] self.mfitmodel = model('\n'.join(self.rfitmodel)) self.mfitmodel.modelname = self.modelname + ' calc fittet values' else: self.rfitmodel = [] self.rres = [f'{f.calc_add_factor}' for f in self.all_frml if len(f.calc_add_factor)] self.rres_tomodel ='\n'.join([f'FRML <CALC_ADD_FACTOR> {f.calc_add_factor}$' for f in self.all_frml if len(f.calc_add_factor)]) # self.fmodel = mp.exounroll(mp.tofrml ('\n'.join(self.rorg+self.rfitmodel)))+self.rres_tomodel self.fmodel = mp.tofrml ('\n'.join(self.rorg+self.rfitmodel))+self.rres_tomodel # breakpoint() self.fres = ('\n'.join(self.rres)) self.mmodel = model(self.fmodel,modelname = self.modelname) self.mmodel.set_var_description(self.var_description) self.mmodel.wb_MFMSAOPTIONS = self.mfmsa_options self.mres = model(self.fres,modelname = f'Calculation of add factors for {self.modelname}') # breakpoint() self.start,self.end = self.mfmsa_start_end if self.do_add_factor_calc: self.base_input = self.mres.res(self.dfmodel,self.start,self.end) else: self.base_input = self.dfmodel
[docs] @staticmethod def trans_eviews(rawmodel): rawmodel0 = '\n'.join(l for l in rawmodel.upper().split('\n') if len(l.strip()) >=2) # trailing and leading " rawmodel1 = '\n'.join(l[1:-1] if l.startswith('"') else l for l in rawmodel0.split('\n')) # powers rawmodel2 = rawmodel1.replace('^','**').replace('""',' ').replace('"',' ').\ replace('@EXP','exp').replace('@RECODE','recode').replace('@MOVAV','movavg').replace('@LOGIT','logit_inverse') \ .replace('@MEAN(@PC(','@AVERAGE_GROWTH((').replace('@PCY','PCT_GROWTH').replace('@PC','PCT_GROWTH')\ .replace('@PMAX','MAX').replace('@TREND','EVIEWS_TREND') # @ELEM and @DURING # @ELEM and @DURING rawmodel3 = nz.elem_trans(rawmodel2) rawmodel4 = re.sub(r'@DURING\( *([0-9]+) *\)', r'during_\1',rawmodel3) rawmodel5 = re.sub(r'@DURING\( *([0-9]+) *([0-9]+) *\)', r'during_\1_\2',rawmodel4) # during check ldur = '\n'.join(l for l in rawmodel5.split('\n') if '@DURING' in l) ldur2 = '\n'.join(l for l in rawmodel5.split('\n') if 'during' in l) # check D( ld = '\n'.join(l for l in rawmodel5.split('\n') if re.search(r'([^A-Z]|^)D\(',l) ) ld1 = '\n'.join(l for l in rawmodel5.split('\n') if re.search(r'([^A-Z0-9_]|^)D\(',l) ) # breakpoint() rawmodel6 = nz.funk_replace('D','DIFF',rawmodel5) # did we get all the lines ldif = '\n'.join(l for l in rawmodel6.split('\n') if 'DIFF(' in l ) return rawmodel6
@property def var_description(self): ''' Adds var descriptions for add factors, exogenizing dummies and exoggenizing values ''' if isinstance(self.des,dict): return self.des try: # breakpoint() des_file = Path(self.des) if des_file.suffix == '.xlsx': trans0 = pd.read_excel(self.des).loc[:,['mnem','Excel']].set_index('mnem').to_dict(orient = 'dict')['Excel'] var_description = {str(k) : str(v) for k,v in trans0.items() if 'nan' != str(v)} else: with open(des_file,'rt') as f: var_description = json.load(f) add_d = { newname : 'Add factor:'+ var_description.get(v,v) for v in self.mmodel.endogene if (newname := v+'_A') in self.mmodel.exogene } dummy_d = { newname : 'Exo dummy:'+ var_description.get(v,v) for v in self.mmodel.endogene if (newname := v+'_D') in self.mmodel.exogene } exo_d = { newname : 'Exo value:'+ var_description.get(v,v) for v in self.mmodel.endogene if (newname := v+'_X') in self.mmodel.exogene } fitted_d = { newname : 'Fitted value:'+ var_description.get(v,v) for v in self.mmodel.endogene if (newname := v+'_FITTED') in self.mmodel.endogene } var_description = {**var_description,**add_d,**dummy_d,**exo_d,**fitted_d} self.mmodel.set_var_description(var_description) except: print('*** No variable description',flush=True) var_description = {} return var_description @functools.cached_property def mfmsa_options(self): '''Grab the mfmsa options, a world bank speciality''' if self.mfmsa: with open(self.mfmsa,'rt') as f: options = f.read() else: options = '' return options @functools.cached_property def mfmsa_start_end(self): import xml root = xml.etree.ElementTree.fromstring(self.mfmsa_options) start = int(root.find('iFace').find('SolveStart').text) end = int(root.find('iFace').find('SolveEnd').text) return start,end @functools.cached_property def dfmodel(self): '''The original input data enriched with during variablees, variables containing values for specific historic years and model specific transformation ''' # Now the data if self.from_wf2: df = pd.read_excel(self.data,index_col=0) else: df = (pd.read_excel(self.data). pipe( lambda df : df.rename(columns={c:c.upper() for c in df.columns})). pipe( lambda df : df.rename(columns={'_DATE_':'DATEID'})). pipe( lambda df : df.set_index('DATEID')) ) df.index = [int(i.year) for i in df.index] try: sca = pd.read_excel(self.scalars ,index_col=0,header=None).T.pipe( lambda _df : _df.loc[_df.index.repeat(len(df.index)),:]).\ set_index(df.index) df= pd.concat([df,sca],axis=1) except: print(f'{self.modelname} no Scalars prowided ') # breakpoint() if self.make_fitted: df = self.mfitmodel.res(df,self.fit_start,self.fit_end) #% Now set the vars with fixedvalues value_vars = self.mmodel.vlist('*_value_*') for var,val,year in (v.rsplit('_',2) for v in value_vars) : df.loc[:,f'{var}_{val}_{year}'] = df.loc[int(year),var] self.showvaluevars = df[value_vars] #% now set the values of the dummies # breakpoint() during_vars = self.mmodel.vlist('*during_*') for varname,(dur,per) in ((v,v.split('_',1)) for v in during_vars): df.loc[:,varname]=0 # print(varname,dur,per) pers = per.split('_') if len(pers) == 1: df.loc[int(pers[0]),varname] = 1 else: df.loc[int(pers[0]):int(pers[1]),varname]=1. self.showduringvars = df[during_vars] # breakpoint() df_out = self.mmodel.insertModelVar(df).pipe(self.country_df_trans).fillna(0.0) return df_out def __call__(self): return self.mmodel,self.base_input
[docs] def test_model(self,start=None,end=None,maxvar=1_000_000, maxerr=100,tol=0.0001,showall=False): ''' Compares a straight calculation with the input dataframe. shows which variables dont have the same value Args: df (TYPE): dataframe to run. start (TYPE, optional): start period. Defaults to None. end (TYPE, optional): end period. Defaults to None. maxvar (TYPE, optional): how many variables are to be chekked. Defaults to 1_000_000. maxerr (TYPE, optional): how many errors to check Defaults to 100. tol (TYPE, optional): check for absolute value of difference. Defaults to 0.0001. showall (TYPE, optional): show more . Defaults to False. Returns: None. ''' _start = start if start else self.start _end = end if end else self.end # breakpoint() resresult = self.mmodel(self.base_input,_start,_end,reset_options=True,silent=0,solver='base_res') self.mmodel.basedf = self.dfmodel pd.options.display.float_format = '{:.10f}'.format err=0 print(f'\nChekking residuals for {self.mmodel.name} {_start} to {_end}') for i,v in enumerate(self.mmodel.solveorder): # if v.endswith('_FITTED'): continue if i > maxvar : break if err > maxerr : break check = self.mmodel.get_values(v,pct=True).T check.columns = ['Before check','After calculation','Difference','Pct'] # breakpoint() if (check.Difference.abs() >= tol).any(): err=err+1 maxdiff = check.Difference.abs().max() maxpct = check.Pct.abs().max() # breakpoint() print('\nVariable with residuals above threshold') print(f"{v}, Max difference:{maxdiff:15.8f} Max Pct {maxpct:15.10f}% It is number {i} in the solveorder and error number {err}") if showall: print(f'\n{self.mmodel.allvar[v]["frml"]}') print(f'\nResult of equation \n {check}') print(f'\nEquation values before calculations: \n {self.mmodel.get_eq_values(v,last=False,showvar=1)} \n') self.mmodel.oldkwargs = {}