Source code for modelnewton

# -*- coding: utf-8 -*-
"""
Created on Fri Jun 19 19:49:50 2020

@author: IBH

Module which handles model differentiation, construction of jacobi matrizex, 
companion matrices, eigenvalues and creates dense and sparse solving functions. 

"""
import matplotlib.pyplot  as plt 
import matplotlib as mpl

import pandas as pd
from sympy import sympify ,Symbol,Function
from collections import defaultdict, namedtuple
import itertools

import numpy as np 
import scipy as sp
import os
import sys
from subprocess import run 
import seaborn as sns 
import ipywidgets as ip
import inspect 
from itertools import chain, zip_longest
import fnmatch 
from IPython.display import  display,Latex, Markdown, HTML
from itertools import chain, zip_longest
from tqdm.notebook  import tqdm 
from pathlib import Path


from dataclasses import dataclass, field, asdict
from functools import lru_cache,cached_property

import re


import modelpattern as pt
# from modelclass import model, ttimer, insertModelVar


from modelmanipulation import split_frml,udtryk_parse,pastestring,stripstring
#import modeljupyter as mj

from modelhelp import tovarlag, ttimer, insertModelVar   
import modeljupyter as mj

 
[docs] @dataclass class diff_value_base: ''' class define columns in database with values from differentiation''' var : str # lhs var pvar : str # rhs var lag : int # lag of rhs var var_plac : int # placement of lhs in array of endogeneous pvar_plac : int # placement of lhs in array of endogeneous pvar_endo : bool # is pvar an endogeneous variable pvar_exo_plac : int # placement of lhs in array of endogeneous
[docs] @dataclass(unsafe_hash=True) class diff_value_col(diff_value_base): ''' The hash able class which can be used as pandas columns'''
[docs] @dataclass class diff_value(diff_value_base): ''' class to contain values from differentiation''' number : int = field(default=0) # index relativ to start in current_per date : any = field(default=0) # index in dataframe
[docs] class newton_diff(): ''' Class to handle Newton solving for un-normalized or normalized models, i.e., models of the form: 0 = G(y,x) y = F(y,x) This class provides functionalities to differentiate model equations, analyze eigenvalues and eigenvectors, and provide utilities for solving dynamic systems through various approaches. Provided Functions: - eigenvector_plot: Plot eigenvectors for a specified period. - eigplot: Plot eigenvalues for a specific period in polar coordinates. - eigplot_all: Plot all eigenvalues for specified periods in polar coordinates. - eigplot_all0: Alternative method to plot all eigenvalues in polar coordinates. - get_df_comp_dict: Get a dictionary of DataFrames representing companion matrices. - get_df_eigen_dict: Get a dictionary of DataFrames containing eigenvalues and eigenvectors. - get_diff_df_1per: Get a DataFrame of derivatives for one period. - get_diff_df_tot: Get a DataFrame of stacked Jacobian matrices for the entire period range. - get_diff_mat_1per: Get a dictionary of sparse matrices representing the Jacobian for one period. - get_diff_mat_all_1per: Get a dictionary of all derivative matrices for one period, including all lags. - get_diff_mat_tot: Get a sparse matrix representing the stacked Jacobian for the entire period range. - get_diff_melted: Get a "melted" DataFrame of derivatives suitable for creating sparse matrices. - get_diff_melted_var: Get a melted DataFrame of derivatives including variable information. - get_diff_values_all: Get all derivative values in a structured format. - get_diffmodel: Generate a model which calculates the partial derivatives of the original model. - get_eigen_jackknife: Perform a jackknife analysis by computing eigenvalues with each variable excluded one at a time. - get_eigen_jackknife_abs: Compute the absolute values of the largest eigenvalues from the jackknife analysis. - get_eigen_jackknife_abs_select: Summarize the absolute largest eigenvalues for a specific year from the jackknife analysis. - get_eigen_jackknife_df: Convert jackknife eigenvalue data into a DataFrame. - get_eigenvalues: Calculate and return the eigenvectors based on the companion matrix for a dynamic system. - get_feedback: Static method to return feedback on the max absolute eigenvector and its sign. - get_solve1per: Get a solving function for one period using precomputed Jacobian matrices. - get_solve1perlu: Get a LU decomposition-based solving function for one period. - get_solvestacked: Get a solving function for the stacked system of equations. - get_solvestacked_it: Get an iterative solving function for the stacked system using a specified solver. - modeldiff: Differentiate model equations with respect to endogenous variables. - show_diff: Display expressions for differential coefficients for specified variables. - show_diff_latex: Display LaTeX-formatted differential expressions and possibly their values. - show_stacked_diff: Show selected rows and columns of the stacked Jacobian as a DataFrame. ''' def __init__(self, mmodel, df=None, endovar=None, onlyendocur=False, timeit=False, silent=True, forcenum=False, per='', ljit=0, nchunk=None, endoandexo=False): pass # [Methods implementations] def __init__(self, mmodel, df = None , endovar = None,onlyendocur=False, timeit=False, silent = True, forcenum=False,per='',ljit=0,nchunk=None,endoandexo=False): """ Args: mmodel (TYPE): Model to analyze. df (TYPE, optional): Dataframe. if None mmodel.lastdf will be used endovar (TYPE, optional): if set defines which endogeneous to include . Defaults to None. onlyendocur (TYPE, optional): Only calculate for the curren endogeneous variables. Defaults to False. timeit (TYPE, optional): writeout time informations . Defaults to False. silent (TYPE, optional): Defaults to True. forcenum (TYPE, optional): Force differentiation to be numeric else try sumbolic (slower) Defaults to False. per (TYPE, optional): Period for which to calculte the jacobi . Defaults to ''. ljit (TYPE, optional): Trigger just in time compilation of the differential coiefficient. Defaults to 0. nchunk (TYPE, optional): Chunks for which the model is written - relevant if ljit == True. Defaults to None. endoandexo (TYPE, optional): Calculate for both endogeneous and exogeneous . Defaults to False. Returns: None. """ self.df = df if type(df) == pd.DataFrame else mmodel.lastdf self.endovar = sorted(mmodel.endogene if endovar == None else endovar) self.endoandexo = endoandexo self.mmodel = mmodel self.onlyendocur = onlyendocur self.silent = silent self.maxdif = 9999999999999 self.forcenum = forcenum self.timeit= timeit self.per=per self.ljit=ljit self.nchunk = nchunk if not self.silent: print(f'Prepare model for calculate derivatives for Newton solver') self.declared_endo_list0 = [pt.kw_frml_name(self.mmodel.allvar[v]['frmlname'], 'ENDO',v) for v in self.endovar] self.declared_endo_list = [v[:-6] if v.endswith('___RES') else v for v in self.declared_endo_list0] # real endogeneous variables self.declared_endo_set = set(self.declared_endo_list) assert len(self.declared_endo_list) == len(self.declared_endo_set) self.placdic = {v : i for i,v in enumerate(self.endovar)} self.newmodel = mmodel.__class__ if self.endoandexo: self.exovar = [v for v in sorted(mmodel.exogene) if not v in self.declared_endo_set] self.exoplacdic = {v : i for i,v in enumerate(self.exovar)} else: self.exoplacdic = {} # breakpoint() self.diffendocur = self.modeldiff() self.diff_model = self.get_diffmodel()
[docs] def modeldiff(self): ''' Differentiate relations for self.enovar with respect to endogeneous variable The result is placed in a dictory in the model instanse: model.diffendocur ''' def numdif(model,v,rhv,delta = 0.005,silent=True) : # print('**',model.allvar[v]['terms']['frml']) def tout(t): if t.lag: return f'{t.var}({t.lag})' return t.op+t.number+t.var # breakpoint() nt = model.allvar[v]['terms'] assignpos = nt.index(model.aequalterm) # find the position of = rhsterms = nt[assignpos+1:-1] vterm = udtryk_parse(rhv)[0] plusterm = udtryk_parse(f'({rhv}+{delta/2})',funks=model.funks) minusterm = udtryk_parse(f'({rhv}-{delta/2})',funks=model.funks) plus = itertools.chain.from_iterable([plusterm if t == vterm else [t] for t in rhsterms]) minus = itertools.chain.from_iterable([minusterm if t == vterm else [t] for t in rhsterms]) eplus = f'({"".join(tout(t) for t in plus)})' eminus = f'({"".join(tout(t) for t in minus)})' expression = f'({eplus}-{eminus})/{delta}' if (not silent) and False: print(expression) return expression def findallvar(model,v): '''Finds all endogenous variables which is on the right side of = in the expresion for variable v lagged variables are included if self.onlyendocur == False ''' # print(v) terms= self.mmodel.allvar[v]['terms'][model.allvar[v]['assigpos']:-1] if self.endoandexo: rhsvar={(nt.var+('('+nt.lag+')' if nt.lag != '' else '')) for nt in terms if nt.var} rhsvar={tovarlag(nt.var,nt.lag) for nt in terms if nt.var} else: if self.onlyendocur : rhsvar={tovarlag(nt.var,nt.lag) for nt in terms if nt.var and nt.lag == '' and nt.var in self.declared_endo_set} else: rhsvar={tovarlag(nt.var,nt.lag) for nt in terms if nt.var and nt.var in self.declared_endo_set} var2=sorted(list(rhsvar)) return var2 with ttimer('Find espressions for partial derivatives',self.timeit): clash = {var : Symbol(var) for var in self.mmodel.allvar.keys()} # clash = {var :None for var in self.mmodel.allvar.keys()} diffendocur={} #defaultdict(defaultdict) #here we wanmt to store the derivativs i=0 for nvar,v in enumerate(self.endovar): if nvar >= self.maxdif: break if not self.silent and 0: print(f'Now differentiating {v} {nvar}') endocur = findallvar(self.mmodel,v) diffendocur[v]={} t=self.mmodel.allvar[v]['frml'].upper() a,fr,n,udtryk=split_frml(t) udtryk=udtryk udtryk=re.sub(r'LOG\(','log(',udtryk) # sympy uses lover case for log and exp udtryk=re.sub(r'EXP\(','exp(',udtryk) lhs,rhs=udtryk.split('=',1) post = '_____XXYY' try: if not self.forcenum: # kat=sympify(rhs[0:-1], md._clash) # we take the the $ out _clash1 makes I is not taken as imiganary lookat = pastestring(rhs[0:-1], post,onlylags=True,funks= self.mmodel.funks) kat=sympify(lookat,clash) # we take the the $ out _clash1 makes I is not taken as imiganary except Exception as inst: # breakpoint() print(inst) e = sys.exc_info()[0] print(e) print(lookat) # print({c:type(s) for c,s in clash.items()}) print('* Problem sympify ',lhs,'=',rhs[0:-1],'\n') for rhv in endocur: try: # breakpoint() if not self.forcenum: # ud=str(kat.diff(sympify(rhv,md._clash))) try: ud=str(kat.diff(sympify(pastestring(rhv, post,funks=self.mmodel.funks,onlylags=True ),clash))) # print(v,rhv,ud) ud = stripstring(ud,post,self.mmodel.funks) ud = re.sub(pt.namepat+r'(?:(\()([0-9]*)(\)))',r'\g<1>\g<2>+\g<3>\g<4>',ud) except: ud = numdif(self.mmodel,v,rhv,silent=self.silent) if self.forcenum or 'DERIVATIVE(' in ud.upper() : ud = numdif(self.mmodel,v,rhv,silent=self.silent) if not self.silent and 0: print('numdif of {rhv}') diffendocur[v.upper()][rhv.upper()]=ud except: print('we have a serious problem deriving:',lhs,'|',rhv,'\n',lhs,'=',rhs) # breakpoint() i+=1 if not self.silent: print('Model :',self.mmodel.name) print('Number of endogeneus variables :',len(diffendocur)) print('Number of derivatives :',i) return diffendocur
[docs] def show_diff(self,pat='*'): ''' Displays espressions for differential koifficients for a variable if var ends with * all matchning variables are displayes''' l=self.mmodel.maxnavlen xx = self.get_diff_values_all() for v in [var for p in pat.split() for var in fnmatch.filter(self.declared_endo_set,p)]: # breakpoint() thisvar = v if v in self.mmodel.endogene else v+'___RES' print(self.mmodel.allvar[thisvar]['frml']) for e in self.diffendocur[thisvar]: print(f'd{v}/d( {e} ) = {self.diffendocur[thisvar][e]}') print(f'& = & {self.diffvalues[thisvar][e].iloc[:,:3]}') print(' ')
[docs] def show_stacked_diff(self,time=None, lhs='',rhs='',dec=2,show=True): ''' Parameters ---------- time : list, optional DESCRIPTION. The default is None. Time for which to retrieve stacked jacobi lhs : string, optional DESCRIPTION. The default is ''. Left hand side variables rhs : TYPE, optional DESCRIPTION. The default is ''. Right hand side variabnles dec : TYPE, optional DESCRIPTION. The default is 2. show : TYPE, optional DESCRIPTION. The default is True. Returns ------- selected rows and columns of stacked jacobi as dataframe . ''' idx = pd.IndexSlice stacked_df_all = self.get_diff_df_tot() if type(time)== type(None) : perslice = slice(None) # [stacked_df_all.index[0][0],stacked_df_all.index[0][-1]] else: perslice = time if lhs: lhsslice = lhs.upper().split() else: lhsslice =slice(None) # [stacked_df_all.index[0][1],stacked_df_all.index[0][-1]] if rhs: rhsslice = rhs.upper().split() else: rhsslice =slice(None) # [stacked_df_all.index[0][1],stacked_df_all.index[0][-1]] # breakpoint() # slices = idx[perslice,varslice] stacked_df = stacked_df_all.loc[(perslice,lhsslice), (perslice,rhsslice)] if show: sdec = str(dec) display( HTML(stacked_df.applymap(lambda x:f'{x:,.{sdec}f}' if x != 0.0 else ' ').to_html())) return stacked_df
[docs] def show_diff_latex(self,pat='*',show_expression=True,show_values=True,maxper=5): varpat = r'(?P<var>[a-zA-Z_]\w*)\((?P<lag>[+-][0-9]+)\)' # varlatex = '\g<var>_{t\g<lag>}' def partial_to_latex(v,k): udtryk=r'\frac{\partial '+ mj.an_expression_to_latex(v)+r'}{\partial '+mj.an_expression_to_latex(k)+'}' return udtryk if show_values: _ = self.get_diff_values_all() for v in [var for p in pat.split() for var in fnmatch.filter(self.declared_endo_set,p)]: thisvar = v if v in self.mmodel.endogene else v+'___RES' _ = f'{mj.frml_as_latex(self.mmodel.allvar[thisvar]["frml"],self.mmodel.funks,name=False)}' # display(Latex(r'$'+frmlud+r'$')) if show_expression: totud = [ f'{partial_to_latex(thisvar,i)} & = & {mj.an_expression_to_latex(expression)}' for i,expression in self.diffendocur[thisvar].items()] ud=r'\\'.join(totud) display(Latex(r'\begin{eqnarray*}'+ud+r'\end{eqnarray*} ')) #display(Latex(f'{ud}')) if show_values: # breakpoint() if len(self.diffvalues[thisvar].values()): resdf = pd.concat([row for row in self.diffvalues[thisvar].values()]).iloc[:,:maxper] resdf.index = ['$'+partial_to_latex(thisvar,k)+'$' for k in self.diffvalues[thisvar].keys()] markout = resdf.iloc[:,:].to_markdown() display(Markdown(markout))
# print( (r'\begin{eqnarray}'+ud+r'\end{eqnarray} '))
[docs] def get_diffmodel(self): ''' Returns a model which calculates the partial derivatives of a model''' def makelag(var): vterm = udtryk_parse(var)[0] if vterm.lag: if vterm.lag[0] == '-': return f'{vterm.var}___lag___{vterm.lag[1:]}' elif vterm.lag[0] == '+': return f'{vterm.var}___lead___{vterm.lag[1:]}' else: return f'{vterm.var}___per___{vterm.lag}' else: return f'{vterm.var}___lag___0' with ttimer('Generates a model which calculatews the derivatives for a model',self.timeit): out = '\n'.join([f'{lhsvar}__P__{makelag(rhsvar)} = {self.diffendocur[lhsvar][rhsvar]} ' for lhsvar in sorted(self.diffendocur) for rhsvar in sorted(self.diffendocur[lhsvar]) ] ) dmodel = self.newmodel(out,funks=self.mmodel.funks,straight=True, modelname=self.mmodel.name +' Derivatives '+ ' no lags and leads' if self.onlyendocur else ' all lags and leads') return dmodel
[docs] def get_diff_melted(self,periode=None,df=None): '''returns a tall matrix with all values to construct jacobimatrix(es) ''' def get_lagnr(l): ''' extract lag/lead from variable name and returns a signed lag (leads are positive''' # breakpoint() return int('-'*(l.split('___')[0]=='AG') + l.split('___')[1]) def get_elm(vartuples,i): ''' returns a list of lags list of tupels ''' return [v[i] for v in vartuples] _per_first = periode if type(periode) != type(None) else self.mmodel.current_per if hasattr(_per_first,'__iter__'): _per = _per_first else: _per = [_per_first] _df = self.df if type(df) != pd.DataFrame else df self.df = _df _df = _df.pipe(lambda df0: df0.rename(columns={c: c.upper() for c in df0.columns})) self.diff_model.current_per = _per # breakpoint() with ttimer('calculate derivatives',self.timeit): self.difres = self.diff_model.res(_df,silent=self.silent,stats=0,ljit=self.ljit, chunk=self.nchunk).loc[_per,sorted(self.diff_model.endogene)].fillna(0.0) with ttimer('Prepare wide input to sparse matrix',self.timeit): # breakpoint() cname = namedtuple('cname','var,pvar,lag') self.coltup = [cname(i.split('__P__',1)[0], i.split('__P__',1)[1].split('___L',1)[0], get_lagnr(i.split('__P__',1)[1].split('___L',1)[1])) for i in self.difres.columns] # breakpoint() self.coltupnum = [(self.placdic[var],self.placdic[pvar+'___RES' if (pvar+'___RES' in self.mmodel.endogene) else pvar],lag) for var,pvar,lag in self.coltup] self.difres.columns = self.coltupnum self.numbers = [i for i,n in enumerate(self.difres.index)] self.maxnumber = max(self.numbers) self.numbers_to_date = {i:n for i,n in enumerate(self.difres.index)} self.nvar = len(self.endovar) self.difres.loc[:,'number'] = self.numbers with ttimer('melt the wide input to sparse matrix',self.timeit): dmelt = self.difres.melt(id_vars='number') dmelt.loc[:,'value']=dmelt['value'].astype('float') with ttimer('assign tall input to sparse matrix',self.timeit): # breakpoint() dmelt = dmelt.assign(var = lambda x: get_elm(x.variable,0), pvar = lambda x: get_elm(x.variable,1), lag = lambda x: get_elm(x.variable,2)) return dmelt
[docs] def get_diff_mat_tot(self,df=None): ''' Fetch a stacked jacobimatrix for the whole model.current_per Returns a sparse matrix.''' dmelt = self.get_diff_melted(periode=None,df=df) dmelt = dmelt.eval('''\ keep = (@self.maxnumber >= lag+number) & (lag+number >=0) row = number * @self.nvar + var col = (number+lag) *@self.nvar +pvar ''') dmelt = dmelt.query('keep') #csc_matrix((data, (row_ind, col_ind)), [shape=(M, N)]) size = self.nvar*len(self.numbers) values = dmelt.value.values indicies = (dmelt.row,dmelt.col) raw = self.stacked = sp.sparse.csc_matrix((values,indicies ),shape=(size, size)) if self.mmodel.normalized: this = raw - sp.sparse.identity(size,format='csc') else: this = raw return this
[docs] def get_diff_df_tot(self,periode=None,df=None): #breakpoint() stacked_mat = self.get_diff_mat_tot(df=df).toarray() colindex = pd.MultiIndex.from_product([self.mmodel.current_per,self.declared_endo_list],names=['per','var']) rowindex = pd.MultiIndex.from_product([self.mmodel.current_per,self.declared_endo_list],names=['per','var']) out = pd.DataFrame(stacked_mat,index=rowindex,columns=colindex) return out
[docs] def get_diff_mat_1per(self,periode=None,df=None): ''' fetch a dict of one periode sparse jacobimatrices ''' # breakpoint() dmelt = self.get_diff_melted(periode=periode,df=df) dmelt = dmelt.eval('''\ keep = lag == 0 row = var col = pvar ''') outdic = {} dmelt = dmelt.query('keep') grouped = dmelt.groupby(by='number') for per,df in grouped: values = df.value.values indicies = (df.row.values,df.col.values) raw = sp.sparse.csc_matrix((values,indicies ), shape=(self.nvar, self.nvar)) # breakpoint() #csc_matrix((data, (row_ind, col_ind)), [shape=(M, N)]) if self.mmodel.normalized: this = raw -sp.sparse.identity(self.nvar,format='csc') else: this = raw outdic[self.numbers_to_date[per]] = this return outdic
[docs] def get_diff_df_1per(self,df=None,periode=None): self.jacsparsedic = self.get_diff_mat_1per(df=df,periode=periode) self.jacdfdic = {p: pd.DataFrame(jac.toarray(),columns=self.endovar,index=self.endovar) for p,jac in self.jacsparsedic.items()} return self.jacdfdic
[docs] def get_solve1perlu(self,df='',periode=''): # if update or not hasattr(self,'stacked'): self.jacsparsedic = self.get_diff_mat_1per(df=df,periode=periode) self.ludic = {p : sp.linalg.lu_factor(jac.toarray()) for p,jac in self.jacsparsedic.items()} self.solveludic = {p: lambda distance : sp.linalg.lu_solve(lu,distance) for p,lu in self.ludic.items()} return self.solveludic
[docs] def get_solve1per(self,df=None,periode=None): # if update or not hasattr(self,'stacked'): # breakpoint() self.jacsparsedic = self.get_diff_mat_1per(df=df,periode=periode) self.solvelusparsedic = {p: sp.sparse.linalg.factorized(jac) for p,jac in self.jacsparsedic.items()} return self.solvelusparsedic
[docs] def get_solvestacked(self,df=''): # if update or not hasattr(self,'stacked'): self.stacked = self.get_diff_mat_tot(df=df) self.solvestacked = sp.sparse.linalg.factorized(self.stacked) return self.solvestacked
[docs] def get_solvestacked_it(self,df='',solver = sp.sparse.linalg.bicg): # if update or not hasattr(self,'stacked'): self.stacked = self.get_diff_mat_tot(df=df) def solvestacked_it(b): return solver(self.stacked,b)[0] return solvestacked_it
[docs] def get_diff_melted_var(self,periode=None,df=None): '''makes dict with all derivative matrices for all lags ''' def get_lagnr(l): ''' extract lag/lead from variable name and returns a signed lag (leads are positive''' #breakpoint() return int('-'*(l.split('___')[0]=='AG') + l.split('___')[1]) def get_elm(vartuples,i): ''' returns a list of lags list of tupels ''' return [v[i] for v in vartuples] _per_first = periode if type(periode) != type(None) else self.mmodel.current_per if hasattr(_per_first,'__iter__'): _per = _per_first else: _per = [_per_first] _df = self.df if type(df) != pd.DataFrame else df _df = _df.pipe(lambda df0: df0.rename(columns={c: c.upper() for c in df0.columns})) self.diff_model.current_per = _per # breakpoint() difres = self.diff_model.res(_df,silent=self.silent,stats=0,ljit=self.ljit,chunk=self.nchunk ).loc[_per,sorted(self.diff_model.endogene)].fillna(0.0).astype('float') cname = namedtuple('cname','var,pvar,lag') col_vars = [cname(i.rsplit('__P__',1)[0], i.rsplit('__P__',1)[1].split('___L',1)[0], get_lagnr(i.rsplit('__P__',1)[1].split('___L',1)[1])) for i in difres.columns] col_ident = [diff_value_col(**i._asdict(), var_plac=self.placdic[i.var], pvar_plac=self.placdic.get(i.pvar+'___RES' if (i.pvar+'___RES' in self.mmodel.endogene) else i.pvar, 0), pvar_endo = i.pvar in self.mmodel.endogene or i.pvar+'___RES' in self.mmodel.endogene, pvar_exo_plac = self.exoplacdic.get(i.pvar, 0) ) for i in col_vars] difres.columns = col_ident difres = difres.copy() difres.loc[:,'dates'] = difres.index dmelt = difres.melt(id_vars='dates') unfolded = pd.DataFrame( [asdict(i) for i in dmelt.variable.values]) totalmelt = pd.concat([dmelt[['dates','value']],unfolded],axis=1) # breakpoint() return totalmelt
[docs] def get_diff_mat_all_1per(self,periode=None,df=None,asdf=False): dmelt = self.get_diff_melted_var(periode=periode,df=df) with ttimer('Prepare numpy input to sparse matrix',self.timeit): outdic = defaultdict(lambda: defaultdict(dict)) grouped = dmelt.groupby(by=['pvar_endo','dates','lag']) for (endo,date,lag),df in grouped: values = df.value.values # breakpoint() # #csc_matrix((data, (row_ind, col_ind)), [shape=(M, N)]) # print(f'endo:{endo} ,date:{date}, lag:{lag}, \n df') if endo: indicies = (df.var_plac.values,df.pvar_plac.values) this = sp.sparse.csc_matrix((values,indicies ), shape=(len(self.declared_endo_list), len(self.declared_endo_list))) if asdf: outdic[date]['endo'][f'lag={lag}'] = pd.DataFrame(this.toarray(), columns=self.declared_endo_list,index=self.declared_endo_list) else: outdic[date]['endo'][f'lag={lag}'] = this else: indicies = (df.var_plac.values,df.pvar_exo_plac.values) this = sp.sparse.csc_matrix((values,indicies ), shape=(len(self.endovar), len(self.exovar))) if asdf: outdic[date]['exo'][f'lag={lag}']= pd.DataFrame(this.toarray(), columns=self.exovar,index=self.declared_endo_list) else: outdic[date]['exo'][f'lag={lag}'] = this return outdic
[docs] def get_diff_values_all(self,periode=None,df=None,asdf=False): ''' stuff the values of derivatives into nested dic ''' dmelt = self.get_diff_melted_var(periode=periode,df=df) with ttimer('Prepare numpy input to sparse matrix',self.timeit): self.diffvalues = defaultdict(lambda: defaultdict(dict)) grouped = dmelt.groupby(by=['var','pvar','lag']) for (var,pvar,lag),df in grouped: res = df.pivot(index='pvar',columns='dates',values='value') pvar_name = tovarlag(pvar,int(lag)) #reakpoint() # #csc_matrix((data, (row_ind, col_ind)), [shape=(M, N)]) # print(f'endo:{endo} ,date:{date}, lag:{lag}, \n df') self.diffvalues[var][pvar_name]=res return self.diffvalues
[docs] def get_eigenvalues (self, periode=None, asdf=True, filnan=False, silent=False, dropvar=None, dropvar_nr=0,progressbar=False): """ Calculate and return the eigenvectors based on the companion matrix for a dynamic system. This method computes the eigenvectors of a dynamic system represented by a companion matrix derived from Jacobian matrices for different periods. The computation involves handling missing values, optionally filling NaN values with zero, and the ability to drop specific variables from the calculation. Parameters: - periode (optional): The period for which the eigenvectors are to be calculated. If None, defaults to the entire range. - asdf (bool, optional): Determines the format of the matrices (DataFrame or sparse matrix). Defaults to True (DataFrame). - filnan (bool, optional): If True, fills NaN values in the Jacobian matrices with zero. Defaults to False. - silent (bool, optional): If False, prints detailed information about NaN values and other relevant details during the computation. Defaults to False. - dropvar (optional): Specifies variables to be dropped from the calculation. If None, no variables are dropped. Defaults to None. - dropvar_nr (int, optional): The number of variables to drop. Defaults to 0. Returns: dict: A dictionary with keys as dates and values as eigenvectors for each period, derived from the companion matrix of the system. The function performs several steps: - Computes the Jacobian matrices for the given period. - Handles NaN values based on the 'filnan' parameter. - Optionally drops specified variables from the calculation. - Constructs the companion matrix from the modified Jacobian matrices. - Calculates the eigenvectors from the companion matrix. Note: The companion matrix is crucial in analyzing the stability and dynamics of the system. The eigenvectors provide insights into the system's behavior over time. """ ... first_element = lambda dic: dic[list(dic.keys())[0]] # first element in a dict # print('Calculate derivatives') jacobiall = self.get_diff_mat_all_1per(periode,asdf=asdf) # breakpoint() if not silent: for date,content in jacobiall.items(): for lag,df in content['endo'].items(): if not (this := df.loc[df.isna().any(axis=1),df.isna().any(axis=0)]).empty: if filnan: print(f'{date} {lag} These elements in the jacobi is set to zero',this,'\n') else: print(f'{date} {lag} These elements in the jacobi contains NaN, You can use filnan = True to set values equal to 0',this,'\n') pat = ' '.join(this.index) self.show_diff_latex(pat) A_dic_gross0 = {date : {lag : df.fillna(0) if filnan else df for lag,df in content['endo'].items()} for date,content in jacobiall.items()} # vnow to get lag=-1 leftmost and so om A_dic_gross = {date: {lag : content[lag] for lag in sorted(content.keys())} for date,content in A_dic_gross0.items()} if type(dropvar)== type(None): A_dic = {date: {lag: df for lag,df in a_year.items() } for date,a_year in A_dic_gross.items() } else: A_dic = {date: {lag: df.drop(index=dropvar,columns=dropvar) for lag,df in a_year.items() } for date,a_year in A_dic_gross.items() } # print(f'{dropvar_nr} {dropvar}') # breakpoint() xlags = sorted([lag for lag in first_element(A_dic).keys() if lag !='lag=0'],key=lambda lag:int(lag.split('=')[1]),reverse=True) number=len(xlags) # dim = len(self.endovar) first_A_dict = first_element(A_dic) # The first dict of A's first_A = first_A_dict['lag=0'] self.varnames = first_A.columns dim = len(first_A) if asdf: np_to_df = lambda nparray: pd.DataFrame(nparray, index = self.varnames,columns=self.varnames) lib = np values = lambda df: df.values calc_eig = lib.linalg.eig calc_eig_reserve = lambda sparse_matrix : sp.linalg.eig(sparse_matrix.toarray()) else: np_to_df = lambda sparse_matrix : sparse_matrix lib = sp.sparse values = lambda sparse_matrix : sparse_matrix calc_eig = lambda sparse_matrix : lib.linalg.eigs(sparse_matrix) calc_eig_reserve = lambda sparse_matrix : sp.linalg.eig(sparse_matrix.toarray()) I=lib.eye(dim)*1.0000 zeros = lib.zeros((dim,dim)) self.A_dic = A_dic # a idendity matrix AINV_dic = {date: np_to_df(lib.linalg.inv(I-A['lag=0'])) for date,A in (tqdm(A_dic.items(),'Invert (I-A)') if progressbar else A_dic.items()) } C_dic = {date: {lag : AINV_dic[date] @ A[lag] for lag,Alag in A.items() if lag!='lag=0'} for date,A in A_dic.items()} # calculate A**-1*A(lag) # top=lib.eye((number-1)*dim,(number)*dim,dim) newbottom = lib.eye((number-1)*dim,(number)*dim) # top=lib.eye((number)*dim,(number+1)*dim,dim) # breakpoint() top_dic = {date: lib.hstack([values(thisC) for thisC in C.values()]) for date,C in C_dic.items()} comp_dic = {} for date,top in top_dic.items(): comp_dic[date] = lib.vstack([top,newbottom]) try: eigen_values_and_vectors = {date : calc_eig(comp) for date,comp in ( tqdm(comp_dic.items(),'Calculate Eigenvalues') if progressbar else comp_dic.items()) } except: print(f'Using reserve calculatioon of eigenvalues {dropvar_nr=} {dropvar=}') eigen_values_and_vectors = {date : calc_eig_reserve(comp) for date,comp in tqdm(comp_dic.items())} eig_dic = {date : both[0] for date,both in eigen_values_and_vectors.items() } for i,date in enumerate(eig_dic.keys() ): ... if 0: if i == 0: print('here') print( sum(abs(eig_dic[date]))) self.A_dic = A_dic self.comp_dic = comp_dic self.eigen_values_and_vectors = eigen_values_and_vectors self.eig_dic = eig_dic return eig_dic
[docs] @lru_cache(maxsize=None) def get_eigen_jackknife(self,maxnames = 200_000,periode=None,progressbar=True): """ Compute and cache eigenvalues for matrices with each variable excluded one at a time, up to a maximum number. This function uses the jackknife technique to evaluate the impact of each variable on the system's stability by excluding each variable one by one from the eigenvector calculation. It caches the results for efficient repeated access. Parameters: - maxnames (int, optional): The maximum number of variables to consider for exclusion in the jackknife process. Defaults to 20. Returns: dict: A dictionary where keys are the names of variables excluded (or 'ALL' for no exclusion) and values are the corresponding eigenvectors. Note: The function is computationally intensive and can take significant time for larger systems. """ if not hasattr(self, 'eig_dic'): _ = self.get_eigenvalues (filnan = True,silent=False,asdf=1) name_to_loop =[n for i,n in enumerate(self.varnames) if i < maxnames and not n.endswith('_FITTED') ] print(f'Calculating eigenvalues of {len(name_to_loop)} different matrices takes time, so make cup of coffee and a take a short nap') jackknife_dict = {f'{name}': self.get_eigenvalues (dropvar=name,periode=periode) for name in (tqdm(name_to_loop) if progressbar else name_to_loop)} base_dict = {'NONE' : self.get_eigenvalues (dropvar=None,periode=periode )} # we læeave the properties clean return {**base_dict, **jackknife_dict}
[docs] @lru_cache(maxsize=None) def get_eigen_jackknife_df(self,maxnames = 200_000,progressbar=True,periode=None,filecache=True,refresh=False): """ Convert the eigenvalue data obtained from a jackknife analysis into a pandas DataFrame, including additional columns for the absolute length, real, and imaginary parts of the eigenvalues. The jackknife analysis is performed by computing and caching eigenvalues for matrices with each variable excluded one at a time, up to a specified maximum number. This method uses the jackknife technique to evaluate the impact of each variable on the system's stability by excluding each variable one by one from the eigenvector calculation. The results are then flattened and transformed into a DataFrame for further analysis. Parameters: - maxnames (int, optional): The maximum number of variables to consider for exclusion in the jackknife process. Defaults to 200,000. - progressbar (bool, optional): If True, displays a progress bar during the computation of eigenvalues. Defaults to True. Returns: pandas.DataFrame: A DataFrame containing the eigenvalues with additional columns for length, real, and imaginary parts. Each row represents an eigenvalue for a specific variable exclusion, year, and index. Note: - The function is computationally intensive and can take significant time for larger systems. - A progress bar can be displayed for monitoring the computation progress. - The function is especially useful for detailed analysis and visualization of the eigenvalues obtained from the jackknife analysis. """ jackfile = Path('jackdf.csv') if (not refresh) and jackfile.exists() and filecache: df = pd.read_csv('jackdf.csv',index_col=0) print('Jackdf read from file') return df jackdict = self.get_eigen_jackknife(maxnames = maxnames,progressbar=progressbar,periode=periode) flattened_data = [{'excluded': scenario, 'year': year, 'index': i, 'value': value} for scenario, years in jackdict.items() for year, values in years.items() for i, value in enumerate(values)] # Creating the DataFrame df = pd.DataFrame.from_dict(flattened_data) # Applying transformations df['length'] = df['value'].apply(lambda x: abs(x)) df['realvalue'] = df['value'].apply(lambda x: x.real) df['imagvalue'] = df['value'].apply(lambda x: x.imag) vardict = {**{'NONE':'whole model'}, **{k : f'{k} {v}' for k,v in self.mmodel.var_description.items() }} df = df.assign(excluded_description=df['excluded'].map(lambda x: vardict.get(x, x))) if filecache: df.to_csv('jackdf.csv') print('Jackdf written to file') return df
[docs] def get_eigen_jackknife_abs(self,largest=20,maxnames = 200_000): """ Compute the absolute values of the largest eigenvalues from the jackknife eigenvalue analysis. This function calculates the absolute values of the largest eigenvalues for each set of eigenvalues obtained from the `get_eigen_jackknife` method. It focuses on the largest eigenvalues to understand the most significant influences on the system's stability. Parameters: - largest (int, optional): The number of largest eigenvalues to consider. Defaults to 20. - maxnames (int, optional): The maximum number of variables to exclude in the jackknife process. Defaults to 20. Returns: dict: A dictionary with the absolute values of the largest eigenvalues for each variable exclusion scenario. Note: This method helps in identifying the most impactful variables on the system's stability by focusing on the largest eigenvalues. """ base = self.get_eigen_jackknife(maxnames=maxnames) res = {name: {date: np.sort(np.partition(np.abs(eigenvalues), -largest)[-largest:])[::-1] for date,eigenvalues in alldates.items()} for name,alldates in base.items()} return res
[docs] @staticmethod def jack_largest_reduction(jackdf, eigenvalue_row=0, periode=None,imag_only=False): """ Identifies the largest reduction in eigenvalue magnitude for a specified period and optionally focuses on eigenvalues with imaginary parts if imag_only is True. Parameters: - jackdf (DataFrame): A DataFrame containing jackknife analysis results, including eigenvalues, their real and imaginary parts, and descriptions of exclusions. - eigenvalue_row (int, optional): The row index of the eigenvalue to analyze. Defaults to 0, which typically represents the largest magnitude eigenvalue. - periode (int/str, optional): The specific period (year) to analyze. If None, the function processes the first year found in the DataFrame. Defaults to None. - imag_only (bool, optional): If True, only considers eigenvalues with non-zero imaginary parts for analysis. Defaults to False. Returns: DataFrame: A sorted DataFrame with the nth largest length (eigenvalue magnitude) for each excluded variable or condition, including the year, exclusion identifier, length (magnitude of the eigenvalue), description of the exclusion, and the real and imaginary parts of the eigenvalue. The row for 'excluded == "NONE"' is moved to the front. Raises: Exception: If the specified period is not found in the DataFrame's years. """ years = jackdf.year.unique() # Determine the year to analyze based on the input if years[0] == periode or type(periode) == type(None): year = years[0] elif periode in years: year = periode else: raise Exception('No such year') # Filter DataFrame based on the specified year and condition df = jackdf.query('year == @year & imagvalue != 0.0 ') if imag_only else jackdf.query('year == @year') df_sorted = df.sort_values(by=['year', 'excluded', 'length'], ascending=[True, True, False]) nth_largest_length = df_sorted.groupby(['year', 'excluded']).nth(eigenvalue_row).reset_index() # Further refine and sort the resulting DataFrame nth_largest_length = nth_largest_length[['year', 'excluded', 'length', 'excluded_description', 'realvalue', 'imagvalue']].sort_values('length') # Move the row where 'excluded == "NONE"' to the front none_row = nth_largest_length.query('excluded == "NONE" ') others = nth_largest_length.query('excluded != "NONE" ') new_nth_largest_length = pd.concat([none_row, others]).reset_index(drop=True) return new_nth_largest_length
[docs] def jack_largest_reduction_plot(self,jackdf, eigenvalue_row=0, periode=None,imag_only=False): """ Creates an interactive Plotly plot to visualize the reduction in eigenvalue magnitude across different exclusions, highlighting the 'NONE' exclusion category with a distinct color and displaying detailed information on hover. Optionally focuses on eigenvalues with imaginary parts if imag_only is True. Parameters: - jackdf (DataFrame): A DataFrame containing the results of a jackknife analysis, including eigenvalues and their descriptions. The DataFrame is expected to have at least the columns 'excluded', 'length', 'excluded_description', 'realvalue', and 'imagvalue'. - eigenvalue_row (int, optional): Specifies the row index of the eigenvalue to analyze. Defaults to 0, which typically corresponds to the largest magnitude eigenvalue. - periode (int/str, optional): The specific period (year) to analyze. If None, the function processes data without filtering by period. Defaults to None. - imag_only (bool, optional): If True, only considers eigenvalues with non-zero imaginary parts for analysis. Defaults to False. The function processes the input DataFrame to highlight the 'NONE' category in red and all other categories in blue. It then creates a Plotly FigureWidget to plot these data points as markers on a scatter plot. The y-axis tick labels are hidden to emphasize the data points rather than the categorical labels. An HTML widget is used to display detailed information about a data point (exclusion description, length, and imaginary part of the eigenvalue) when the user hovers over it. This interactive feature provides a deeper insight into the impact of each exclusion on the eigenvalue magnitude. Displays: - An interactive Plotly scatter plot within the Jupyter notebook. - A dynamic HTML widget that updates with detailed information about the hovered data point. """ import plotly.graph_objs as go from ipywidgets import VBox, HTML, Textarea, Layout result_df = __class__.jack_largest_reduction(jackdf, eigenvalue_row=eigenvalue_row, periode=periode,imag_only=imag_only) #print(result_df) # Assign highlight colors based on the 'excluded' category result_df['Highlight'] = result_df['excluded'].apply(lambda x: 'NONE' if x == 'NONE' else 'Other') # Create a combined highlight column based on 'excluded' and 'imagvalue' def get_highlight(row): if row['excluded'] == 'NONE' and row['imagvalue'] != 0.0: return 'NONE (Non-zero Imag)' elif row['excluded'] == 'NONE' and row['imagvalue'] == 0.0: return 'NONE (Zero Imag)' elif row['imagvalue'] != 0.0: return 'Other (Non-zero Imag)' else: return 'Other (Zero Imag)' result_df['CombinedHighlight'] = result_df.apply(get_highlight, axis=1) # Add a column for marker size based on the 'CombinedHighlight' or any condition result_df['marker_size'] = result_df['CombinedHighlight'].apply(lambda x: 20 if 'NONE' in x else 10) # Define color map for the combined highlight color_map = { 'NONE (Non-zero Imag)': 'red', 'NONE (Zero Imag)': 'red', 'Other (Non-zero Imag)': 'green', 'Other (Zero Imag)': 'blue' } # Create the Plotly FigureWidget using the combined highlight for color mapping fig = go.FigureWidget(data=[ go.Scatter( x=result_df['length'], y=result_df['excluded'], mode='markers', marker=dict( color=result_df['CombinedHighlight'].map(color_map), size=result_df['marker_size'] # Use the marker_size column here ) ) ]) # Create the Plotly FigureWidget with customized marker colors # fig = go.FigureWidget(data=[ # go.Scatter(x=result_df['length'], y=result_df['excluded'], mode='markers', # marker=dict(color=result_df['Highlight'].map({'NONE': 'red', 'Other': 'blue'}))) # ]) # Update layout to hide y-axis tick labels for a cleaner presentation fig.update_layout( title={ 'text': f"Eigenvalue Lengths by excluded equation. {eigenvalue_row}'th largest eigenvalues, {'Only eigenvalues with imaginary values' if imag_only else''}", 'y':0.9, 'x':0.5, 'xanchor': 'center', 'yanchor': 'top' }, yaxis=dict(showticklabels=False) ) # Initialize the HTML widget for displaying hover information textarea = Textarea( value="", placeholder='', description='Info:', disabled=False, layout = {'width': '95%', 'height': '100px'} , style = {'description_width': '5%'}, # layout={'width': '100%', 'height': '100px'} # Adjust the size as needed ) info2 = f"Length: {result_df.iloc[0]['length']:.2f} Imag: {result_df.iloc[0]['imagvalue']:.2f}"+'\n' textarea.value = 'The whole model, no equation excluded\n' + info2 # Define the function to update the info widget upon hovering over a data point def update_info(trace, points, state): if points.point_inds: ind = points.point_inds[0] # Index of the hovered point # Format the information to display info1 = f"Excluded equation: {result_df.iloc[ind]['excluded_description']}"+'\n' info2 = f"Length: {result_df.iloc[ind]['length']:.2f} Imag: {result_df.iloc[ind]['imagvalue']:.2f}"+'\n' if result_df.iloc[ind]['excluded'] == 'NONE': textarea.value = 'The whole model, no equation excluded\n' + info2 else: textarea.value = info1 + info2 + self.mmodel.allvar[result_df.iloc[ind]['excluded']]['frml'] # Bind the on_hover event to the update_info function for each trace for trace in fig.data: trace.on_hover(update_info) # Combine the plot and info widget in a vertical layout and display them display(VBox([fig,textarea]))
[docs] def get_eigen_jackknife_abs_select(self,year=2023,largest=20,maxnames = 200_000): """ Select and summarize the absolute largest eigenvalues for a specific year from the jackknife analysis. This function focuses on a specific year and extracts the sum of the absolute largest eigenvalues obtained from the `get_eigen_jackknife_abs` method. It helps in understanding the aggregate impact of variable exclusions on the system's stability for a particular year. Parameters: - year (int, optional): The specific year to focus on. Defaults to 2023. - largest (int, optional): The number of largest eigenvalues to consider. Defaults to 20. - maxnames (int, optional): The maximum number of variables to exclude in the jackknife process. Defaults to 20. Returns: pandas.Series: A series sorted by the sum of the absolute largest eigenvalues for each variable exclusion scenario in the specified year. Note: This method is useful for temporal analysis of the system's stability, focusing on the contributions of each variable in a specific year. """ xx = {v: sum(d[year]) for v,d in self.get_eigen_jackknife_abs(maxnames=maxnames,largest=largest).items() } return pd.Series(xx).sort_values()
[docs] def get_df_eigen_dict(self): rownames = [f'{c}{("("+str(l)+")") if l != 0 else ""}' for l in range(-1,self.mmodel.maxlag-1,-1) for c in self.varnames] # breakpoint() values_and_vectors = {per: [pd.DataFrame(vv[0],columns = ['Eigenvalues']).T, pd.DataFrame(vv[1],index = rownames) ] for per,vv in self.eigen_values_and_vectors.items()} combined = {per : pd.concat(vandv) for per,vandv in values_and_vectors.items()} return combined
[docs] def get_df_comp_dict(self): rownames = [f'{c}{("("+str(l)+")") if l != 0 else ""}' for l in range(-1,self.mmodel.maxlag-1,-1) for c in self.varnames] df_dict = {k : pd.DataFrame(v,columns=rownames,index=rownames) for k,v in self.comp_dic.items()} return df_dict
[docs] def eigplot(self, eig_dic=None,per=None,size=(4,3),top=0.9): import matplotlib.pyplot as plt plt.close('all') if type(eig_dic) == type(None): this_eig_dic = self.eig_dic else: this_eig_dic = eig_dic # breakpoint() if type(per) == type(None): first_key = list(this_eig_dic.keys())[0] else: first_key = per w = this_eig_dic[first_key] fig, ax = plt.subplots(figsize=size,subplot_kw={'projection': 'polar'}) #A4 fig.suptitle(f'Eigen values in {first_key}\n',fontsize=20) for x in w: ax.plot([0,np.angle(x)],[0,np.abs(x)],marker='o') ax.set_rticks([0.5, 1, 1.5]) fig.subplots_adjust(top=0.8) return fig
[docs] def eigenvector_plot(self,per=None,size=(4,3),top=0.9): import matplotlib.pyplot as plt this_eig_dic = self.eig_dic # breakpoint() if type(per) == type(None): first_key = list(this_eig_dic.keys())[0] else: first_key = per w = this_eig_dic[first_key] fig, ax = plt.subplots(figsize=size,subplot_kw={'projection': 'polar'}) #A4 fig.suptitle(f'Eigen values in {first_key}\n',fontsize=20) for x in w: ax.plot([0,np.angle(x)],[0,np.abs(x)],marker='o') ax.set_rticks([0.5, 1, 1.5]) fig.subplots_adjust(top=0.8) return fig
[docs] @staticmethod def get_feedback(eig_dic,per=None): '''Returns a dict of max abs eigenvector and the sign ''' return {per: max(abs(eigen_vector)) * (-1 if sum(abs(eigen_vector.imag)) else 1) for per,eigen_vector in eig_dic.items()}
[docs] def eigplot_all0(self,eig_dic,size=(4,3)): colrows = 4 ncols = min(colrows,len(eig_dic)) nrows=-((-len(eig_dic))//ncols) fig, axis = plt.subplots(nrows=nrows,ncols=ncols,figsize=(3*ncols,3*nrows), subplot_kw={'projection': 'polar'},constrained_layout=True) # breakpoint() laxis = axis.flatten() for i,(ax,(key,w)) in enumerate(zip(laxis,eig_dic.items())): for x in w: ax.plot([0,np.angle(x)],[0,np.abs(x)],marker='o') ax.set_rticks([0.5, 1, 1.5]) ax.set_title(f'{key}',loc='right') return fig
def eigplot_all(self,eig_dic,periode=None,size=(4,3),maxfig=6): """ Plots the eigenvalues for specified periods in polar coordinates. This method takes a dictionary of eigenvalues, optionally filters them by specified periods, and plots each eigenvalue on polar plots. The number of plots can be limited by `maxfig`. If `periode` is not specified, it defaults to the current period defined in the model object. The plots are arranged in a grid, with a maximum of two columns. Parameters ---------- eig_dic : dict A dictionary where keys are period identifiers and values are iterables of complex numbers representing eigenvalues. periode : iterable, optional An iterable of period identifiers to plot. If `None` (default), eigenvalues for the current period in the model are plotted. size : tuple of int, optional The size of each subplot in inches. Default is (4, 3). maxfig : int, optional The maximum number of figures to display. Default is 6. Returns ------- matplotlib.figure.Figure A matplotlib Figure object containing the generated plots. """ plt.close('all') plt.ioff() _per_first = periode if type(periode) != type(None) else self.mmodel.current_per if hasattr(_per_first,'__iter__'): _per = _per_first else: _per = [_per_first] plot_dic = {p : v for p,v in eig_dic.items() if p in _per } maxaxes = min(maxfig,len(plot_dic)) colrow = 2 ncols = min(colrow,maxaxes) nrows=-((-maxaxes)//ncols) fig = plt.figure(figsize=(9*ncols,10*nrows),constrained_layout=True) spec = mpl.gridspec.GridSpec(ncols=ncols,nrows=nrows,figure=fig) # breakpoint() fig.suptitle('Eigenvalues',fontsize=20) for i,(key,w) in enumerate(plot_dic.items()): if i >= maxaxes: break col = i%colrow row = i//colrow # print(i,row,col) ax = fig.add_subplot(spec[row, col],projection='polar') for x in w: ax.plot([0,np.angle(x)],[0,np.abs(x)],marker='o') ax.set_rticks([0.5, 1, 1.5]) ax.set_title(f'{key}',loc='right') plt.show() return fig
[docs] def eigplot_all(self, eig_dic, periode=None, size=(4, 3), maxfig=6): plt.close('all') plt.ioff() _per_first = periode if periode is not None else self.model.current_per if hasattr(_per_first, '__iter__'): _per = _per_first else: _per = [_per_first] plot_dic = {p: v for p, v in eig_dic.items() if p in _per} maxaxes = min(maxfig, len(plot_dic)) colrow = 2 ncols = min(colrow, maxaxes) nrows = -((-maxaxes) // ncols) # Dynamically calculate figure size based on subplot size and layout fig_width = size[0] * ncols # Adjusted to consider 'size' parameter for width fig_height = size[1] * nrows # Adjusted to consider 'size' parameter for height fig = plt.figure(figsize=(fig_width, fig_height), constrained_layout=True) spec = mpl.gridspec.GridSpec(ncols=ncols, nrows=nrows, figure=fig) fig.suptitle('Eigenvalues', fontsize=20) for i, (key, w) in enumerate(plot_dic.items()): if i >= maxaxes: break col = i % colrow row = i // colrow ax = fig.add_subplot(spec[row, col], projection='polar') for x in w: ax.plot([0, np.angle(x)], [0, np.abs(x)], marker='o') ax.set_rticks([0.5, 1, 1.5]) ax.set_title(f'{key}', loc='right') plt.show() return fig
[docs] def plot_eigenvalues_polar_old(self,eig_dic): import plotly.graph_objects as go import numpy as np from ipywidgets import Dropdown, Output, VBox, HTML from IPython.display import display year_dropdown = Dropdown(options=list(eig_dic.keys()), description='Time:') info_box = HTML(value="Hover over a point to see details.") plot_output = Output() def plot_eigenvalues_polar_vectors(year): eigenvalues = eig_dic[year] with plot_output: plot_output.clear_output(wait=True) r_values = [np.abs(ev) for ev in eigenvalues] # Magnitudes for the polar plot theta_values = [np.angle(ev, deg=True) for ev in eigenvalues] # Angles for the polar plot # Prepare data for the plot, repeating magnitude and angle for each eigenvalue, and adding breaks (None) r_plot_values = [val for pair in zip([0]*len(r_values), r_values, [None]*len(r_values)) for val in pair] theta_plot_values = [val for pair in zip(theta_values, theta_values, [None]*len(theta_values)) for val in pair] fig = go.FigureWidget(go.Scatterpolar( r=r_plot_values, theta=theta_plot_values, mode='lines+markers', marker=dict(color='blue', size=5), line=dict(color='blue') )) fig.update_layout( title=f'Polar Plot of Eigenvalue Vectors for Year {year}', polar=dict( radialaxis=dict(visible=True, range=[0, max(r_values) * 1.1]), angularaxis=dict(direction='clockwise', thetaunit='degrees', rotation=0) ), showlegend=False ) def update_info(trace, points, _): if points.point_inds: # Each eigenvalue is represented by 3 points in the plot data (start, end, None), calculate the index accordingly ind = points.point_inds[0] // 3 ev = eigenvalues[ind] # Get the eigenvalue for trace in fig.data: trace.on_hover(update_info) display(fig) def on_year_change(change): plot_eigenvalues_polar_vectors(change.new) year_dropdown.observe(on_year_change, names='value') display(VBox([year_dropdown, plot_output])) plot_eigenvalues_polar_vectors(year_dropdown.value)
[docs] def eigenvalues_show(self,eig_dic): """ Generates and displays a polar plot of eigenvalues and their corresponding eigenvectors for a selected year from a dictionary of DataFrames. Each DataFrame contains eigenvalues and eigenvectors of the companion matrix for that year. The first row of the DataFrame consists of eigenvalues, and the subsequent rows contain the corresponding eigenvectors. The user can interact with the plot via a dropdown for year selection, a slider for eigenvalue selection, and a button to toggle additional plot details. The polar plot dynamically updates to reflect the selected eigenvalue, displaying its magnitude and phase. Additional information about the selected eigenvector is also displayed, facilitating a detailed temporal analysis of the eigenvalues. Parameters: - eig_dic (dict): A dictionary where keys are years (or time periods) and values are DataFrames containing the first row as eigenvalues and the subsequent rows as the corresponding eigenvectors of the companion matrix for each year. Side Effects: - Displays interactive widgets including a dropdown for year selection, a plot output area, and a slider for selecting specific eigenvalues. Additionally, displays textual information about the selected eigenvalue and its eigenvectors. - Utilizes Plotly for generating the polar plot and ipywidgets for interactive controls. - The method defines and uses several inner functions to handle events like year change, eigenvalue selection, and other interactions. Returns: - None. The method's primary function is to display interactive widgets and plots within a Jupyter notebook environment. Note: - This method is designed for use within a Jupyter notebook as it relies on IPython.display for rendering and ipywidgets for interactivity. - The actual plotting and widget setup are accomplished through several nested functions within this method, making use of closures and nonlocal variables for state management. """ import plotly.graph_objects as go import numpy as np from ipywidgets import Dropdown, Output, VBox, HTML, HBox, IntSlider,Select,Textarea,Checkbox,Button from IPython.display import display year_dropdown = Dropdown(options=list(eig_dic.keys()), description='Time:') plot_output = Output() def get_a_eigenvalue_vector(eigenvalues_vectors,year): """ Extracts and processes eigenvalues and their corresponding eigenvectors for a given year. Filters and sorts eigenvalues by their magnitude, returning the significant eigenvalues and their associated eigenvectors. Parameters: - eigenvalues_vectors (dict): Dictionary containing DataFrames of eigenvalues and eigenvectors indexed by year. - year (str/int): The year for which to extract and process the eigenvalue vector. Returns: - tuple: A tuple containing the significant eigenvalue and a DataFrame of the corresponding eigenvectors after processing. """ compabs = lambda complex: [abs(value) for index,value in complex.items()] eig_gt = (eigenvalues_vectors[year]. T. # Transpose as we query and eval on columns eval('absolute_value=@compabs(Eigenvalues)'). # calculate the absolute value of the eigenvalue query('absolute_value > 0.01'). sort_values(by='absolute_value',ascending=False).reset_index(drop=True). # Transpose again. drop('absolute_value',axis=1) # We dont need the absolute value anymore, so the column is dropped. ) return eig_gt.iloc[:,0],eig_gt.iloc[:,1:].abs() def plot_eigenvalues_polar_vectors(year): eigenvalues,eigenvectors = get_a_eigenvalue_vector(eig_dic,year ) valueslider = IntSlider( value=1, min=0, max=len(eigenvalues)-1, step=1, description='Number:', disabled=False, continuous_update=False, orientation='vertical', readout=True, readout_format='d' ) var_info = Textarea( value="", placeholder='', description='Info:', disabled=False, layout = {'width': '95%', 'height': '100px'} , style = {'description_width': '5%'}, # layout={'width': '100%', 'height': '100px'} # Adjust the size as needed ) wopenplot = Button(value=True,description = 'Open plot widget',disabled=False,icon='check', layout={'width':'95%'} ,style={'description_width':'70%'}) with plot_output: plot_output.clear_output(wait=True) r_values = [np.abs(ev) for ev in eigenvalues] # Magnitudes for the polar plot theta_values = [np.angle(ev, deg=True) for ev in eigenvalues] # Angles for the polar plot # Prepare data for the plot, repeating magnitude and angle for each eigenvalue, and adding breaks (None) r_plot_values = [val for pair in zip([0]*len(r_values), r_values, [None]*len(r_values)) for val in pair] theta_plot_values = [val for pair in zip(theta_values, theta_values, [None]*len(theta_values)) for val in pair] fig = go.FigureWidget(go.Scatterpolar( r=r_plot_values, theta=theta_plot_values, mode='lines+markers', marker=dict(color='blue', size=5), line=dict(color='blue') )) fig.update_layout( title_text='', # Ensure no title is set margin=dict(l=20, r=20, t=20, b=20) # Adjust margins around the plot ) fig.update_layout( # title=f'Polar Plot of Eigenvalue Vectors for Year {year}', polar=dict( radialaxis=dict(visible=True, range=[0, max(r_values) * 1.1]), angularaxis=dict(direction='clockwise', thetaunit='degrees', rotation=0) ), showlegend=False ) v_dropdown_description = HTML(value="<strong>Eigenvalue :</strong> <br><strong>Eigenvectors:</strong>", placeholder='',description='',) v_dropdown = Select(description='',rows=14,layout = {'width': '95%'}) box = VBox([HBox([fig,valueslider,VBox([v_dropdown_description,v_dropdown,wopenplot])]),var_info]) lopenplot = False def vector_info(selected_index): """ Updates the displayed information for a selected eigenvector, including its magnitude, phase, and detailed component values. Dynamically updates dropdown options to reflect the components of the selected eigenvector. Parameters: - selected_index (int): Index of the selected eigenvalue/eigenvector. """ nonlocal lopenplot this_vector = eigenvectors.iloc[selected_index,:].sort_values(ascending=False) eigenvalue = eigenvalues[selected_index] select_options = [(f"{index} - {value:.2f}", f"{index}") for index, value in this_vector.items() if value >= 0.01] v_dropdown.options = select_options v_dropdown_description.value=f"<strong>Eigenvalue #: {selected_index}</strong> <br>Length: {abs(eigenvalue):.2f}<br>Real: {eigenvalue.real:.2f}<br>Imag: {eigenvalue.imag:.2f} <br><strong>Eigenvectors:</strong>" # gross_varnames = [v.split('(')[0] for t,v in select_options] # varnames = ' '.join(list(dict.fromkeys(gross_varnames))) # keepbox = self.mmodel.keep_show(use_var_groups=False,selectfrom = varnames,use_smpl= True) if lopenplot : plot_output.clear_output(wait=True) # box = VBox([HBox([fig,valueslider,VBox([v_dropdown_description,v_dropdown,wopenplot])]),var_info,keepbox.datawidget]) box = VBox([HBox([fig,valueslider,VBox([v_dropdown_description,v_dropdown,wopenplot])]),var_info]) display(box) lopenplot = False vector_info(0) def on_openplot(change): """ Handles the event triggered by clicking the 'Open plot widget' button. It refreshes the plot and optionally includes additional widgets for further data exploration. Parameters: - change (dict): Contains details of the button click event. Not used in the function body but necessary for event handler signature. """ nonlocal lopenplot lopenplot = True gross_varnames = [v.split('(')[0] for t,v in v_dropdown.options] varnames = ' '.join(list(dict.fromkeys(gross_varnames))) keepbox = self.mmodel.keep_show(use_var_groups=False,selectfrom = varnames,use_smpl= True,init_dif=True) box = VBox([HBox([fig,valueslider,VBox([v_dropdown_description,v_dropdown,wopenplot])]),var_info,keepbox.datawidget]) plot_output.clear_output(wait=True) display(box) def info_show(ind): """ Displays information about the eigenvalue and its corresponding eigenvectors for a given index. This function is designed to update the displayed information based on user selection or interaction. Parameters: - ind (int): The index of the selected eigenvalue to display information for. """ ev = eigenvalues[ind] # Get the eigenvalue vector_info(ind) def update_info(trace, points, _): """ Callback function to handle hover events on the plot. It identifies the eigenvalue corresponding to the hovered point and updates the information displayed to the user. Parameters: - trace: The trace object associated with the hover event. Not used in the function body. - points: The points object containing information about the hovered point. - _: Placeholder for additional arguments. Not used in the function body. """ nonlocal lopenplot if points.point_inds: if lopenplot : plot_output.clear_output(wait=True) # box = VBox([HBox([fig,valueslider,VBox([v_dropdown_description,v_dropdown,wopenplot])]),var_info,keepbox.datawidget]) box = VBox([HBox([fig,valueslider,VBox([v_dropdown_description,v_dropdown,wopenplot])]),var_info]) display(box) lopenplot = False # Each eigenvalue is represented by 3 points in the plot data (start, end, None), calculate the index accordingly ind = points.point_inds[0] // 3 valueslider.value = ind info_show(ind) def on_v_dropdown_change(change): """ Handles changes in the eigenvector selection dropdown. Updates the displayed information related to the selected eigenvector, including its description and formula. Parameters: - change (dict): Contains details of the selection change in the dropdown widget. """ # print(f'{change=}') # print(f'{change.owner=}') # print(f'{change.owner.options=}') # print(f'{change.new=}' + '\n') if type(change.new) == dict and len(change.new ): index=change.new['index'] name = change.owner.options[index][1] varname = name.split('(')[0] info1 = f'{varname}: {self.mmodel.var_description[varname]}' +'\n' info2 = self.mmodel.allvar[varname]['frml'] var_info.value= info1 + info2 for trace in fig.data: trace.on_hover(update_info) def on_slide_change(change): """ Responds to changes in the eigenvalue selection slider. Updates the plot to highlight the selected eigenvalue and its corresponding eigenvectors, and updates displayed information. Parameters: - change (dict): Contains details of the slider value change. """ # print(f'{change.new=} ') if type(change.new) == int: selected_index = change.new else: if len(change.new): selected_index = change.new['value'] else: return # print(f'{change.new=} {selected_index=}') colors = ['red' if i == selected_index else 'green' for i in range(len(eigenvalues))] fig.data[0].marker.color = [color for pair in zip(colors, colors, colors) for color in pair] sizes = [15 if i == selected_index else 5 for i in range(len(eigenvalues))] fig.data[0].marker.size = [size for pair in zip(sizes,sizes,sizes) for size in pair] info_show(selected_index) # update_info(None,SimulatedPoints(0),None) wopenplot.on_click(on_openplot) valueslider.observe(on_slide_change) v_dropdown.observe(on_v_dropdown_change) valueslider.value= 0 display(box) def on_year_change(change): """ Callback function for handling changes in the year selection dropdown. Updates the polar plot to display the eigenvalues and eigenvectors for the newly selected year. Parameters: - change (dict): Contains details about the change event in the year dropdown. """ plot_eigenvalues_polar_vectors(change.new) year_dropdown.observe(on_year_change, names='value') display(VBox([year_dropdown, plot_output])) plot_eigenvalues_polar_vectors(year_dropdown.value)
#%% if __name__ == '__main__': #%% testing os.environ['PYTHONBREAKPOINT'] = '1' from modelclass import model fsolow = '''\ Y = a * k**alfa * l **(1-alfa) C = (1-SAVING_RATIO) * Y(-1) I = Y - C diff(K) = I-depreciates_rate * K(-1) diff(l) = labor_growth * (L(-1)+l(-2))/2 K_intense = K/L ''' msolow = model.from_eq(fsolow) #print(msolow.equations) N = 32 df = pd.DataFrame({'L':[100]*N,'K':[100]*N},index =[i+2000 for i in range(N)]) df.loc[:,'ALFA'] = 0.5 df.loc[:,'DEPRECIATES_RATE'] = 0.01 df.loc[:,'LABOR_GROWTH'] = 0.01 df.loc[:,'SAVING_RATIO'] = 0.10 msolow(df,silent=1,ljit=0,transpile_reset=1) msolow.normalized = True newton_all = newton_diff(msolow,forcenum=0,per=2002) dif__model = newton_all.diff_model.equations melt = newton_all.get_diff_melted_var() tt = newton_all.get_diff_mat_all_1per(2002,asdf=True) #newton_all.show_diff() cc = newton_all.get_eigenvalues (asdf=True,periode=2010) fig= newton_all.eigplot_all(cc,maxfig=3) #%% more testing if 1: newton = newton_diff(msolow) pdic = newton.get_diff_df_1per() longdf = newton.get_diff_melted()