'''
static_result:
submodule for reporting fitting process of static spectrum result
:copyright: 2021-2022 by pistack (Junho Lee).
:license: LGPL3.
'''
from typing import Callable
import numpy as np
from scipy.special import wofz
import h5py as h5
from ..mathfun.peak_shape import voigt, \
edge_gaussian, edge_lorenzian
[docs]class StaticResult(dict):
'''
Represent results for fitting static driver routine
Attributes:
model ({'thy', 'voigt'}): model used for fitting
`voigt`: sum of voigt function, edge function and base funtion
`thy`: sum of voigt broadened theoretical lineshape spectrum, edge function and base function
thy_peak (sequence of np.ndarray): theoretical calculated peak position and intensity [model: `thy`]
policy ({'shift', 'scale', 'both'}): policy to match discrepancy between theoretical spectrum and
experimental static spectrum.
e (np.ndarray): energy range
intensity (np.ndarray): intensity of static spectrum
eps (np.ndarray): estimated error of static spectrum
fit (np.ndarray): fitting curve for data (n,)
fit_comp (np.ndarray): curve for each voigt component and edge
base (np.ndaray): fitting curve for baseline
res (np.ndarray): residual curve (data-fit) for static spectrum (n,)
edge ({'g', 'l'}): type of edge function, if edge is None then edge function is not
included in the fitting model
'g': gaussian type edge function
'l': lorenzian type edge function
base_order (int): order of baseline function
if base_order is None then baseline is not included in the fitting model
param_name (np.ndarray): name of parameter
n_voigt (int): number of voigt component
n_edge (int): number of edge component
x (np.ndarray): best parameter
bounds (sequence of tuple): boundary of each parameter
c (np.ndarray): best weight of each voigt component and edge of data
chi2 (float): chi squared value of fitting
aic (float): Akaike Information Criterion statistic: :math:`N\\log(\\chi^2/N)+2N_{parm}`
bic (float): Bayesian Information Criterion statistic: :math:`N\\log(\\chi^2/N)+N_{parm}\log(N)`
red_chi2 (float): total reduced chi squared value of fitting
nfev (int): total number of function evaluation
n_param (int): total number of effective parameter
num_pts (int): total number of data points
jac (np.ndarray): jacobian of objective function at optimal point
cov (np.ndarray): covariance matrix (i.e. inverse of (jac.T @ jac))
cov_scaled (np.ndarray): scaled covariance matrix (i.e. `red_chi2` * `cov`)
corr (np.ndarray): parameter correlation matrix
x_eps (np.ndarray): estimated error of parameter
(i.e. square root of diagonal element of `conv_scaled`)
method_glb ({'basinhopping'}):
method of global optimization used in fitting process
message_glb (str): messages from global optimization process
method_lsq ({'trf', 'dogbox', 'lm'}): method of local optimization for least_squares
minimization (refinement of global optimization solution)
success_lsq (bool): whether or not local least square optimization is successed
message_lsq (str): messages from local least square optimization process
status ({0, -1}): status of optimization process
`0` : normal termination
`-1` : least square optimization process is failed
'''
def __getattr__(self, name):
try:
return self[name]
except KeyError as e:
raise AttributeError(name) from e
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def __dir__(self):
return list(self.keys())
def __str__(self, corr_tol: float = 1e-1):
'''
Report StaticResult
Args:
corr_tol: parameter correlation greather than `corr_tol` would be reported
'''
doc_lst = []
doc_lst.append("[Model information]")
doc_lst.append(f" model : {self['model']}")
if self['model'] == 'thy':
doc_lst.append(f" policy: {self['policy']}")
if self['edge'] is not None:
doc_lst.append(f" edge: {self['edge']}")
if self['base_order'] is not None:
doc_lst.append(f" base_order: {self['base_order']}")
doc_lst.append(' ')
doc_lst.append("[Optimization Method]")
if self['method_glb'] is not None:
doc_lst.append(f" global: {self['method_glb']}")
doc_lst.append(f" leastsq: {self['method_lsq']}")
doc_lst.append(' ')
doc_lst.append("[Optimization Status]")
doc_lst.append(f" nfev: {self['nfev']}")
doc_lst.append(f" status: {self['status']}")
if self['method_glb'] is not None:
doc_lst.append(f" global_opt msg: {self['message_glb']}")
doc_lst.append(f" leastsq_opt msg: {self['message_lsq']}")
doc_lst.append(' ')
doc_lst.append("[Optimization Results]")
doc_lst.append(f" Data points: {self['num_pts']}")
doc_lst.append(f" Number of effective parameters: {self['n_param']}")
doc_lst.append(f" Degree of Freedom: {self['num_pts']-self['n_param']}")
doc_lst.append(f" Chi squared: {self['chi2']: .4f}".rstrip('0').rstrip('.'))
doc_lst.append(f" Reduced chi squared: {self['red_chi2']: .4f}".rstrip('0').rstrip('.'))
doc_lst.append(f" AIC (Akaike Information Criterion statistic): {self['aic']: .4f}".rstrip('0').rstrip('.'))
doc_lst.append(f" BIC (Bayesian Information Criterion statistic): {self['bic']: .4f}".rstrip('0').rstrip('.'))
doc_lst.append(' ')
doc_lst.append("[Parameters]")
for pn, pv, p_err in zip(self['param_name'], self['x'], self['x_eps']):
doc_lst.append(f" {pn}: {pv: .8f} +/- {p_err: .8f} ({100*np.abs(p_err/pv): .2f}%)".rstrip('0').rstrip('.'))
doc_lst.append(' ')
doc_lst.append("[Parameter Bound]")
for pn, pv, bd in zip(self['param_name'], self['x'], self['bounds']):
doc_lst.append(f" {pn}: {bd[0]: .8f}".rstrip('0').rstrip('.')+f" <= {pv: .8f} <= {bd[1]: .8f}".rstrip('0').rstrip('.'))
doc_lst.append(' ')
doc_lst.append("[Component Contribution]")
doc_lst.append(f" Static spectrum")
if self['base_order'] is None:
coeff_abs = np.abs(self['c'])
coeff_contrib = 100*self['c']
else:
coeff_abs = np.abs(self['c'][:-self['base_order']-1])
coeff_contrib = 100*self['c'][:-self['base_order']-1]
coeff_sum = np.sum(coeff_abs)
coeff_contrib = coeff_contrib/coeff_sum
for v in range(self['n_voigt']):
row = [f" {self['model']} {v+1}:"]
row.append(f'{coeff_contrib[v]: .2f}%')
doc_lst.append(' '.join(row))
if self['edge'] is not None:
for e in range(self['n_edge']):
row = [f" {self['edge']} type edge {e+1}:"]
row.append(f"{coeff_contrib[self['n_voigt']+e]: .2f}%")
doc_lst.append(' '.join(row))
doc_lst.append(' ')
doc_lst.append("[Parameter Correlation]")
doc_lst.append(f" Parameter Correlations > {corr_tol: .3f}".rstrip('0').rstrip('.') + " are reported.")
A = np.empty((len(self['x']), len(self['x'])), dtype=object)
for i in range(len(self['x'])):
for j in range(len(self['x'])):
A[i,j] = (i,j)
mask = (np.abs(self['corr']) > corr_tol)
for pair in A[mask]:
if pair[0] > pair[1]:
tmp_str_lst = [f" ({self['param_name'][pair[0]]},"]
tmp_str_lst.append(f"{self['param_name'][pair[1]]})")
tmp_str_lst.append('=')
tmp_str_lst.append(f"{self['corr'][pair]: .3f}".rstrip('0').rstrip('.'))
doc_lst.append(' '.join(tmp_str_lst))
return '\n'.join(doc_lst)
[docs]def save_StaticResult(result: StaticResult, filename: str):
'''
save static fitting result to the h5 file
Args:
result: static fitting result
filename: filename to store result. It will store result to filename.h5
Returns:
h5 file which stores result
'''
model_key_lst = ['chi2', 'n_param', 'num_pts', 'red_chi2',
'aic', 'bic', 'nfev', 'method_lsq', 'success_lsq', 'message_lsq', 'status']
with h5.File(f'{filename}.h5', 'w') as f:
expt = f.create_group("experiment")
fit_res = f.create_group("fitting_result")
if result['model'] == 'thy':
thy_stick = f.create_group("theoretical_peaks")
for i in range(result['n_voigt']):
thy_stick.create_dataset(f'species {i+1}', data=result['thy_peak'][i])
fit_res.attrs['policy'] = result['policy']
expt.create_dataset("energy", data=result['e'])
expt.create_dataset("intensity", data=result['intensity'])
expt.create_dataset("error", data=result['eps'])
fit_res.attrs['model'] = result['model']
fit_res.attrs['n_voigt'] = result['n_voigt']
fit_res.attrs['n_edge'] = result['n_edge']
if result['edge'] is not None:
fit_res.attrs['edge'] = result['edge']
if result['base_order'] is not None:
fit_res.attrs['base_order'] = result['base_order']
else:
fit_res.attrs['base_order'] = 'no'
if result['method_glb'] is not None:
fit_res.attrs['method_glb'] = result['method_glb']
fit_res.attrs['message_glb'] = result['message_glb']
else:
fit_res.attrs['method_glb'] = 'no'
for k in model_key_lst:
fit_res.attrs[k] = result[k]
fit_res_curve = fit_res.create_group("fit_curve")
fit_res_curve.create_dataset("fit", data=result['fit'])
fit_res_curve.create_dataset("fit_comp", data=result['fit_comp'])
fit_res_curve.create_dataset("weight", data=result['c'])
if result['base_order'] is not None:
fit_res_curve.create_dataset("base", data=result['base'])
fit_res_curve.create_dataset("res", data=result['res'])
fit_res_param = fit_res.create_group("parameter")
fit_res_param.create_dataset("param_opt", data=result['x'])
fit_res_param.create_dataset("param_eps", data=result['x_eps'])
fit_res_param.create_dataset("param_name", data=result['param_name'].astype("S100"), dtype='S100')
fit_res_param.create_dataset("param_bounds", data=result['bounds'])
fit_res_param.create_dataset("correlation", data=result['corr'])
fit_res_mis = fit_res.create_group("miscellaneous")
fit_res_mis.create_dataset("jac", data=result['jac'])
fit_res_mis.create_dataset('cov', data=result['cov'])
fit_res_mis.create_dataset('cov_scaled', data=result['cov_scaled'])
return
[docs]def load_StaticResult(filename: str) -> StaticResult:
'''
load static fitting result from h5 file
Args:
filename: filename to load result. It will load result to filename.h5
Returns:
loaded StaticResult instance
'''
model_key_lst = ['chi2', 'n_param', 'num_pts', 'red_chi2',
'aic', 'bic', 'nfev',
'method_lsq', 'success_lsq', 'message_lsq', 'status']
result = StaticResult()
with h5.File(f'{filename}.h5', 'r') as f:
expt = f['experiment']
fit_res = f['fitting_result']
result['e'] = np.atleast_1d(expt['energy'])
result['intensity'] = np.atleast_1d(expt['intensity'])
result['eps'] = np.atleast_1d(expt['error'])
result['model'] = fit_res.attrs['model']
result['n_voigt'] = fit_res.attrs['n_voigt']
result['n_edge'] = fit_res.attrs['n_edge']
if result['model'] == 'thy':
thy_stick = f['theoretical_peaks']
result['thy_peak'] = np.empty(result['n_voigt'], dtype=object)
for i in range(result['n_voigt']):
result['thy_peak'][i] = \
np.atleast_2d(thy_stick[f'species {i+1}'])
result['policy'] = fit_res.attrs['policy']
if fit_res.attrs['n_edge'] != 0:
result['edge'] = fit_res.attrs['edge']
else:
result['edge'] = None
if fit_res.attrs['base_order'] != 'no':
result['base_order'] = fit_res.attrs['base_order']
else:
result['base_order'] = None
if fit_res.attrs['method_glb'] == 'no':
result['method_glb'] = None
result['message_glb'] = None
else:
result['method_glb'] = fit_res.attrs['method_glb']
result['message_glb'] = fit_res.attrs['message_glb']
for k in model_key_lst:
result[k] = fit_res.attrs[k]
fit_res_curve = fit_res['fit_curve']
result['fit'] = np.atleast_1d(fit_res_curve['fit'])
result['fit_comp'] = np.atleast_1d(fit_res_curve['fit_comp'])
result['c'] = np.atleast_1d(fit_res_curve['weight'])
if result['base_order'] is not None:
result['base'] = np.atleast_1d(fit_res_curve['base'])
else:
result['base'] = None
result['res'] = np.atleast_1d(fit_res_curve['res'])
fit_res_param = fit_res['parameter']
result['x'] = np.atleast_1d(fit_res_param['param_opt'])
result['x_eps'] = np.atleast_1d(fit_res_param['param_eps'])
result['param_name'] = np.atleast_1d(fit_res_param['param_name'])
tmp = np.atleast_2d(fit_res_param['param_bounds'])
lst = tmp.shape[0]*[None]
for i in range(tmp.shape[0]):
lst[i] = (tmp[i,0], tmp[i, 1])
result['bounds'] = lst
result['corr'] = np.atleast_2d(fit_res_param['correlation'])
fit_res_mis = fit_res['miscellaneous']
result['jac'] = np.atleast_2d(fit_res_mis['jac'])
result['cov'] = np.atleast_2d(fit_res_mis['cov'])
result['cov_scaled'] = np.atleast_2d(fit_res_mis['cov_scaled'])
return result
def deriv_edge_g_aux(e: np.ndarray, fwhm_G: float) -> np.ndarray:
'''
derivative of gaussian type edge
Args:
e: energy
fwhm_G: full width at half maximum
Returns:
first derivative of gaussian edge function
'''
tmp = np.exp(-4*np.log(2)*(e/fwhm_G)**2)/np.sqrt(np.pi)
return 2*np.sqrt(np.log(2))/fwhm_G*tmp
def dderiv_edge_g_aux(e: np.ndarray, fwhm_G: float) -> np.ndarray:
'''
2nd derivative of gaussian type edge
Args:
e: energy
fwhm_G: full width at half maximum
Returns:
2nd derivative of gaussian edge function
'''
tmp = -8*np.log(2)*e*np.exp(-4*np.log(2)*(e/fwhm_G)**2)/(np.sqrt(np.pi)*fwhm_G**2)
return 2*np.sqrt(np.log(2))/fwhm_G*tmp
def deriv_edge_l_aux(e: np.ndarray, fwhm_L: float) -> np.ndarray:
'''
1st derivative of lorenzian type edge
Args:
e: energy
Returns:
first derivative of lorenzian type function
'''
tmp = 1/np.pi/(e**2+fwhm_L**2/4)
return fwhm_L*tmp/2
def dderiv_edge_l_aux(e: np.ndarray, fwhm_L: float) -> np.ndarray:
'''
2nd derivative of lorenzian type edge
Args:
e: energy
Returns:
second derivative of lorenzian type function
'''
tmp = -e/np.pi/(e**2+fwhm_L**2/4)**2
return fwhm_L*tmp
def deriv_voigt_aux(e: np.ndarray, fwhm_G: float, fwhm_L: float) -> np.ndarray:
'''
1st derivative of voigt profile
Args:
e: energy
fwhm_G: full width at half maximum of gaussian part :math:(2\\sqrt{2\\log(2)}\\sigma)
fwhm_L: full width at half maximum of lorenzian part :math:(2\\gamma)
Returns:
first derivative of voigt profile
'''
if fwhm_G < 1e-8:
tmp = fwhm_L/np.pi/(e**2+fwhm_L**2/4)**2
return -e*tmp
sigma = fwhm_G/(2*np.sqrt(2*np.log(2)))
if fwhm_L < 1e-8:
tmp = np.exp(-(e/sigma)**2/2)/(sigma*np.sqrt(2*np.pi))
return -e/sigma**2*tmp
z = (e+complex(0,fwhm_L/2))/(sigma*np.sqrt(2))
return -(z*wofz(z)).real/(sigma**2*np.sqrt(np.pi))
def dderiv_voigt_aux(e: np.ndarray, fwhm_G: float, fwhm_L: float) -> np.ndarray:
'''
2nd derivative of voigt profile
Args:
e: energy
fwhm_G: full width at half maximum of gaussian part :math:(2\\sqrt{2\\log(2)}\\sigma)
fwhm_L: full width at half maximum of lorenzian part :math:(2\\gamma)
Returns:
second derivative of voigt profile
'''
if fwhm_G < 1e-8:
tmp = fwhm_L/np.pi/(e**2+fwhm_L**2/4)**2
return (4*e**2/(e**2+fwhm_L**2/4)-1)*tmp
sigma = fwhm_G/(2*np.sqrt(2*np.log(2)))
if fwhm_L < 1e-8:
tmp = np.exp(-(e/sigma)**2/2)/(sigma*np.sqrt(2*np.pi))
return 1/sigma**2*((e/sigma)**2-1)*tmp
z = (e+complex(0,fwhm_L/2))/(sigma*np.sqrt(2))
f = wofz(z)/(sigma*np.sqrt(2*np.pi))
return 1/sigma**4*\
((e**2-fwhm_L**2/4-sigma**2)*f.real - e*fwhm_L*f.imag + fwhm_L/(2*np.pi))
def voigt_thy_aux(e: np.ndarray, thy_peak: np.ndarray,
fwhm_G: float, fwhm_L: float,
deriv_order: int = 0) -> np.ndarray:
'''
Calculates derivative of normalized
voigt broadened theoretically calculated lineshape spectrum
Args:
e: energy
thy_peak: theoretical calculated peak position and intensity
fwhm_G: full width at half maximum of gaussian shape
fwhm_L: full width at half maximum of lorenzian shape
deriv_order({0, 1, 2}): order of derivative
Returns:
derivative of normalized voigt broadened theoritical lineshape spectrum
'''
v_matrix = np.empty((e.size, thy_peak.shape[0]))
if deriv_order == 0:
for i in range(thy_peak.shape[0]):
v_matrix[:, i] = voigt(e-thy_peak[i, 0], fwhm_G, fwhm_L)
elif deriv_order == 1:
for i in range(thy_peak.shape[0]):
v_matrix[:, i] = deriv_voigt_aux(e-thy_peak[i, 0], fwhm_G, fwhm_L)
else:
for i in range(thy_peak.shape[0]):
v_matrix[:, i] = dderiv_voigt_aux(e-thy_peak[i, 0], fwhm_G, fwhm_L)
broadened_theory = v_matrix @ thy_peak[:, 1].reshape((thy_peak.shape[0], 1))
return broadened_theory.flatten()
[docs]def static_spectrum(e: np.ndarray,
result: StaticResult, deriv_order: int = 0) -> Callable:
'''
Evaluates static spectrum from static spectrum
fitting result.
Args:
e: energy at which evaulate static spectrum
result: static spectrum fitting result
deriv_order ({0, 1, 2}): order of derivative. [default: 0]
Returns:
Evaluated static spectrum
Note:
Baseline feature is removed.
'''
tmp = np.zeros(e.size)
if result['model'] == 'voigt' and result['n_voigt'] > 0:
e0_voigt = result['x'][:result['n_voigt']]
fwhm_G_voigt = result['x'][result['n_voigt']:2*result['n_voigt']]
fwhm_L_voigt = result['x'][2*result['n_voigt']:3*result['n_voigt']]
V = np.empty((result['n_voigt'], e.size))
if deriv_order == 0:
for i in range(result['n_voigt']):
V[i, :] = voigt(e-e0_voigt[i], fwhm_G_voigt[i], fwhm_L_voigt[i])
elif deriv_order == 1:
for i in range(result['n_voigt']):
V[i, :] = deriv_voigt_aux(e-e0_voigt[i], fwhm_G_voigt[i],
fwhm_L_voigt[i])
else:
for i in range(result['n_voigt']):
V[i, :] = dderiv_voigt_aux(e-e0_voigt[i], fwhm_G_voigt[i],
fwhm_L_voigt[i])
tmp = (result['c'][:result['n_voigt']]@V).flatten()
else:
if result['policy'] == 'shift':
for i in range(result['n_voigt']):
peak_tmp = result['thy_peak'][i].copy()
peak_tmp[:, 0] = peak_tmp[:, 0]+result['x'][2+i]
tmp = tmp + result['c'][i]*voigt_thy_aux(e, peak_tmp,
result['x'][0], result['x'][1], deriv_order)
elif result['policy'] == 'scale':
for i in range(result['n_voigt']):
peak_tmp = result['thy_peak'][i].copy()
peak_tmp[:, 0] = peak_tmp[:, 0]*result['x'][2+i]
tmp = tmp + result['c'][i]*voigt_thy_aux(e, peak_tmp,
result['x'][0], result['x'][1], deriv_order)
else:
for i in range(result['n_voigt']):
peak_tmp = result['thy_peak'][i].copy()
peak_tmp[:, 0] = result['x'][2+result['n_voigt']+i]*peak_tmp[:, 0]+\
result['x'][2+i]
tmp = tmp + result['c'][i]*voigt_thy_aux(e, peak_tmp,
result['x'][0], result['x'][1], deriv_order)
if result['model'] == 'voigt':
param_edge_start = 3*result['n_voigt']
else:
if result['policy'] in ['scale', 'shift']:
param_edge_start = 2+result['n_voigt']
else:
param_edge_start = 2+2*result['n_voigt']
if result['edge'] == 'g':
if deriv_order == 0:
for i in range(result['n_edge']):
tmp = tmp + result['c'][result['n_voigt']+i]*\
edge_gaussian(e-result['x'][param_edge_start+i],
result['x'][param_edge_start+result['n_edge']+i])
elif deriv_order == 1:
for i in range(result['n_edge']):
tmp = tmp + result['c'][result['n_voigt']+i]*\
deriv_edge_g_aux(e-result['x'][param_edge_start+i],
result['x'][param_edge_start+result['n_edge']+i])
else:
for i in range(result['n_edge']):
tmp = tmp + result['c'][result['n_voigt']+i]*\
dderiv_edge_g_aux(e-result['x'][param_edge_start+i],
result['x'][param_edge_start+result['n_edge']+i])
elif result['edge'] == 'l':
if deriv_order == 0:
for i in range(result['n_edge']):
tmp = tmp + result['c'][result['n_voigt']+i]*\
edge_lorenzian(e-result['x'][param_edge_start+i],
result['x'][param_edge_start+result['n_edge']+i])
elif deriv_order == 1:
for i in range(result['n_edge']):
tmp = tmp + result['c'][result['n_voigt']+i]*\
deriv_edge_l_aux(e-result['x'][param_edge_start+i],
result['x'][param_edge_start+result['n_edge']+i])
else:
for i in range(result['n_edge']):
tmp = tmp + result['c'][result['n_voigt']+i]*\
dderiv_edge_l_aux(e-result['x'][param_edge_start+i],
result['x'][param_edge_start+result['n_edge']+i])
return tmp