Source code for modelnormalize

# -*- coding: utf-8 -*-
"""
Created on Sat Nov 28 13:32:47 2020

This Module is used transforming model specifications to modelflow business language.  

 - preprocessing expressions to resolve functions like
    dlog, log, pct, movavg
 - replace function names
 - normalize formulas 

@author: bruger
"""

from sympy import sympify, solve, Symbol
import re
from dataclasses import dataclass, field, asdict
import dataclasses 
from IPython.display import SVG, display, Image, IFrame, HTML


from modelmanipulation import lagone, find_arg,pastestring,stripstring
from modelpattern import udtryk_parse, namepat

[docs] @dataclass class Normalized_frml: ''' class defining result from normalization of expression''' endo_var : str = '' original : str = '' preprocessed : str = '' normalized : str = '' calc_add_factor : str = '' un_normalized : str = '' fitted : str = '' eviews : str = '' def __str__(self): maxkey = max(len(k) for k in vars(self).keys()) output = "\n".join([f'{k.capitalize():<{maxkey}} : {f}' for k,f in vars(self).items() if len(f)]) return output @property def fprint(self): print(f'\n{self}') @property def fdict(self): return dataclasses.asdict(self) # def _repr_html_(self): # print(f'{self}') def __repr__(self): return f'{self}'
[docs] def endovar(f): '''Finds the first variable in a expression''' for t in udtryk_parse(f): if t.var: ud=t.var break return ud
[docs] def funk_in(funk,a_string): '''Find the first location of a function in a string if found returns a match object where the group 2 is the interesting stuff used in funk_find_arg ''' return re.search(fr'([^A-Z0-9_{{}}]|^)({funk})\(',a_string.upper(),re.MULTILINE)
[docs] def funk_replace(funk1,funk2,a_string): ''' replace funk1( with funk2( takes care that funk1 embedded in variable name is not replaced ''' return re.sub(fr'(^|[^A-Z0-9_{{}}])({funk1.upper()})\(', fr'\1{funk2.upper()}(',a_string.upper(), re.MULTILINE)
[docs] def funk_replace_list(replacelist,a_string): '''Replaces a list of funk1( , funk2( ''' out_string = a_string[:] for funk1,funk2 in replacelist: out_string = funk_replace(funk1,funk2,out_string) return out_string
funk_replace_list([('@D','DIFF'),('DLOG','DXLOG')],'d(a) = @d(v) x = dlog(y)') funk_replace_list([('@D','DIFF'),('D','DIFF'),('DLOG','DXLOG')],'d(a) = @d(v) x = dlog(y)') funk_in('D','+d(b)'.upper())
[docs] def funk_find_arg(funk_match, streng): ''' chops a string in 3 parts \n 1. before 'funk(' 2. in the matching parantesis 3. after the last matching parenthesis ''' # breakpoint() start = funk_match.start(2) match = streng[funk_match.end(2):] open = 0 for index in range(len(match)): if match[index] in '()': open = (open + 1) if match[index] == '(' else (open - 1) if not open: return streng[:start], match[1:index], match[index + 1:] assert 1==2,f'Parantese mismatch in {streng}'
[docs] def preprocess(udtryk,funks=[]): ''' test processing expanding dlog,diff,movavg,pct,logit functions Args: udtryk (str): model we want to do template expansion on funks (list, optional): list of user defined functions . Defaults to []. Returns: None. has to be changed to (= for when the transition to 3.8 is finished. ''' udtryk_up = udtryk.upper().replace(' ','') while dlog_match := funk_in('DLOG',udtryk_up): fordlog,dlogudtryk_up,efterdlog=funk_find_arg(dlog_match,udtryk_up) udtryk_up=fordlog+'DIFF(LOG('+dlogudtryk_up+'))'+efterdlog while logit_match := funk_in('LOGIT',udtryk_up): forlogit,logitudtryk_up,efterlogit=funk_find_arg(logit_match,udtryk_up) udtryk_up=forlogit+'(LOG('+logitudtryk_up+'/(1.0 -'+logitudtryk_up+')))'+efterlogit while movavg_match := funk_in('MOVAVG', udtryk_up): forkaede,kaedeudtryk,efterkaede=funk_find_arg(movavg_match,udtryk_up) arg=kaedeudtryk.split(',',1) avg='((' term=arg[0] antal=int(arg[1]) for i in range(antal): avg=f'{avg}{term}+' term=lagone(term,funks=funks) avg=avg[:-1]+')/'+str(antal)+'.0)' udtryk_up=forkaede+avg+efterkaede while pct_match := funk_in('PCT_GROWTH',udtryk_up): forpc,pcudtryk,efterpc=funk_find_arg(pct_match,udtryk_up) udtryk_up=f'{forpc} (100 * ( ({pcudtryk}) / ({lagone(pcudtryk,funks=funks)}) -1)) {efterpc}' while diff_match := funk_in('DIFF' , udtryk_up): fordif,difudtryk_up,efterdif=funk_find_arg(diff_match,udtryk_up) udtryk_up=fordif+'(('+difudtryk_up+')-('+lagone(difudtryk_up+'',funks=funks)+'))'+efterdif while diff_match := funk_in('D' , udtryk_up): fordif,difudtryk_up,efterdif=funk_find_arg(diff_match,udtryk_up) difudtryk_up = difudtryk_up.replace(' ','').replace(',0,1','') if difudtryk_up.endswith(',0,1') else difudtryk_up udtryk_up=fordif+'(('+difudtryk_up+')-('+lagone(difudtryk_up+'',funks=funks)+'))'+efterdif return udtryk_up
[docs] def fixleads(eq,check=False): leadpat = r'(?:\(([0-9]+)\))' this = eq.replace(' ','').replace('\n','') res = re.sub(namepat+leadpat,r'\g<1>(+\g<2>)',this) if check: print(f"Before {this}") print(f"After {res}") return res
[docs] def normal(ind_o,the_endo='',add_add_factor=True,do_preprocess = True,add_suffix = '_A',endo_lhs = True,make_fixable =False,make_fitted=False,eviews=''): ''' normalize an expression g(y,x) = f(y,x) ==> y = F(x,z) Default find the expression for the first variable on the left hand side (lhs) The variable - without lags- should not be on rhs. Args: ind_o (str): input expression, no $ and no frml name just lhs=rhs the_endo (str, optional): the endogeneous to isolate on the left hans side. if the first variable in the lhs. It shoud be on the left hand side. add_add_factor (bool, optional): force introduction aof adjustment term, and an expression to calculate it do_preprocess (bool, optional): DESCRIPTION. preprocess the expression endo_lhs (bool, optional): If false, accept to normalize for a rhs endogeneous variable make_fixable (bool, optional): also make this equation exogenizable fitted (bool,optional) : create a fitted equations, without exo and adjustment preprocessing handels Returns: An instance of the class: Normalized_frml which will contain the different relevant expressions ''' def getclash(f): '''Builds a list of variable names from expression. Ensures that sympy is not confused with build in names like e ''' lhs_var = {t.var for t in udtryk_parse(f) if t.var } lhs_var_clash = {var : Symbol(var) for var in lhs_var} return lhs_var_clash # return {} post = '___LAG' # breakpoint() preprocessed = preprocess(fixleads(ind_o)) if do_preprocess else fixleads(ind_o[:]) ind = preprocessed.upper().replace('LOG(','log(').replace('EXP(','exp(') lhs,rhs=ind.strip().split('=',1) lhs = lhs.strip() if len(udtryk_parse(lhs)) >=2 or not endo_lhs : # we have an expression on the left hand side clash = getclash(ind) endo_name = the_endo.upper() if the_endo else endovar(lhs) endo = sympify(endo_name,clash) a_name = f'{endo_name}{add_suffix}' if add_add_factor else '' thiseq = f'({lhs}-(__RHS__ {"+" if add_add_factor else ""}{a_name}))' if endo_lhs else \ f'({lhs}- ({rhs} {"+" if add_add_factor else ""}{a_name}))' # print(thiseq) transeq = pastestring(thiseq,post,onlylags=True).replace('LOG(','log(').replace('EXP(','exp(') kat=sympify(transeq,clash) # breakpoint() endo_frml = solve(kat,endo ,simplify=False,rational=False,warn=False) res_rhs =stripstring(str(endo_frml[0]),post).replace('__RHS__',f' ({rhs.strip()}) ') if endo_lhs else \ stripstring(str(endo_frml[0]),post) if make_fitted: thiseq_fit = f'({lhs}-__RHS__ )' if endo_lhs else \ f'({lhs}- {rhs} )' transeq_fit = pastestring(thiseq_fit,post,onlylags=True).replace('LOG(','log(').replace('EXP(','exp(') kat_fit=sympify(transeq_fit,clash) # breakpoint() endo_frml_fit = solve(kat_fit,endo ,simplify=False,rational=False,warn=False) res_rhs_fit =stripstring(str(endo_frml_fit[0]),post).replace('__RHS__',f' ({rhs.strip()}) ') if endo_lhs else \ stripstring(str(endo_frml_fit[0]),post) if make_fixable : out_frml = f'{endo} = ({res_rhs}) * (1-{endo}_D)+ {endo}_X*{endo}_D '.upper() else: out_frml = f'{endo} = {res_rhs}'.upper() if add_add_factor: a_sym = sympify(a_name,clash) a_frml = solve(kat,a_sym,simplify=False,rational=False) res_rhs_a = stripstring(str(a_frml[0]),post).replace('__RHS__',f' (({rhs.strip()})) ') out_a = f'{a_name} = {res_rhs_a}'.upper() # breakpoint() else: out_a = '' out_fitted = f'{endo}_fitted = {res_rhs_fit}'.upper() if make_fitted else '' result = Normalized_frml(str(endo),ind_o,preprocessed,out_frml,out_a,fitted=out_fitted) result.eviews=eviews return result else: # no need to normalize this equation out_frml = preprocessed out_fitted = f'{lhs}_fitted = {rhs}'.upper() if make_fitted else '' if add_add_factor: if make_fixable : result = Normalized_frml(lhs,ind_o,preprocessed, f'{lhs} = ({rhs} + {lhs}{add_suffix})* (1-{lhs}_D)+ {lhs}_X*{lhs}_D ', f'{lhs}{add_suffix} = ({lhs}) - ({rhs})',fitted=out_fitted) else: result = Normalized_frml(lhs,ind_o,preprocessed, f'{lhs} = ({rhs} + {lhs}{add_suffix}) ', f'{lhs}{add_suffix} = ({lhs}) - ({rhs})',fitted=out_fitted) else: if make_fixable : result = Normalized_frml(lhs,ind_o,preprocessed, f'{lhs} = ({rhs})* (1-{lhs}_D)+ {lhs}_X*{lhs}_D',fitted=out_fitted) else: result = Normalized_frml(lhs,ind_o,preprocessed, f'{lhs} = {rhs}',fitted=out_fitted) result.eviews=eviews return result
[docs] def elem_trans(udtryk, df=None): '''Handeles expression with @elem ''' from modelpattern import namepat def trans_elem(input,number): ''' changes @elem( ot elem of parts ''' def sub_elem(matchobj): variable = str(matchobj.group(1)) out = f'{variable}_value_{number}' return out strout = re.sub(namepat,sub_elem,input) return strout def sub_elem(input,number): ''' changes @elem( ot elem of parts still work in progress this is for pluggin in the actual value from the database ''' def sub_elem(matchobj): variable = str(matchobj.group(1)) value = df.loc[number,variable] out = f'{value}' return out strout = re.sub(namepat,sub_elem,input) return strout udtryk_up = udtryk.upper() while elem_match := funk_in('@ELEM',udtryk_up): forelem,elemudtryk_up,efterelem=funk_find_arg(elem_match,udtryk_up) elemtext,elemnumber = elemudtryk_up.replace(' ','').split(',') udtryk_up = f'{forelem}({trans_elem(elemtext,elemnumber)}){efterelem}' return udtryk_up
if __name__ == '__main__': normal('DELRFF=RFF-RFF(-1)',add_add_factor=1,add_suffix= '_AERR').fprint normal('a = n(-1)',add_add_factor=0,make_fitted = 1).fprint normal('a+b = c',add_add_factor=1,make_fitted=1).fprint normal('PCT_growth(a) = n(-1)',add_add_factor=0).fprint normal('a = movavg(pct(b),2)',add_add_factor=0).fprint normal('pct_growth(c) = pct_growth(d)',add_add_factor=0).fprint (100 * ( (C) / (C(-1)) -1)) normal('pct_growth(c) = z+pct(b) + pct(e)').fprint normal('pct_growth(c) = z+pct(b) + pct(e)').fprint normal('a = pct_growth(b)',add_add_factor=0).fprint normal("DLOG(SAUNECONGOVTXN) = -0.323583422052*(LOG(SAUNECONGOVTXN(-1))-GOVSHAREWB*LOG(SAUNEYWRPGOVCN(-1))-(1-GOVSHAREWB)*LOG(SAUNECONPRVTXN(-1)))+0.545415878897*DLOG(SAUNECONGOVTXN(-1))+(1-0.545415878897)*(GOVSHAREWB)*DLOG(SAUNEYWRPGOVCN) +(1-0.545415878897)*(1-GOVSHAREWB)*DLOG(SAUNECONPRVTXN)-1.56254616684-0.0613991001064*@DURING(""2011"")").fprint normal("D(a,0,1) = b").fprint normal('a = D( LOG(QLHP(+1)), 0, 1 )').fprint normal('a = D( LOG(QLHP(+1)))').fprint normal('a = gamma+ f+O',the_endo='f',endo_lhs=False,make_fixable =True).fprint # breakpoint() normal('zlhp-zlhp(-1) = 81 * D( LOG(QLHP(1)) ,0, 1) ',add_add_factor=1,make_fitted=1,make_fixable=1).fprint fixleads('zlhp - ddd = 81 * D( LOG(QLHP(1)),0,1) ') #%% elem_trans('DLOG(PAKNVRENPRODXN)=DLOG((WLDHYDROPOWER*PAKPANUSATLS)/(@ELEM(WLDHYDROPOWER,2011)*@ELEM(PAKPANUSATLS,2011)))-0.00421833463034*DUMH') fixleads('a = b(1) + v(33)'.upper(),1) fixleads(' 0.2121303706720161 * D( LOG(QLHP), 0, 1 ) + -0.04133299713432281 * D( LOG(QLHP(1)), 0, 1 ) + 0.9805787292172398 * ZLHP(1) + -0.1948471451936957 * ZLHP(2) ') xx = normal('a = n(-1)',add_add_factor=0,make_fitted = 1) xx.eviews = 'ffff ' normal('a_{b} = D( LOG(QLHP_{ee}(+1)), 0, 1 )').fprint