'''
transient_result:
submodule for reporting fitting process of time scan result
:copyright: 2021-2022 by pistack (Junho Lee).
:license: LGPL3.
'''
import h5py as h5
import numpy as np
[docs]
class TransientResult(dict):
'''
Represent results for fitting driver routine
Attributes:
model ({'decay', 'dmp_osc', 'both'}): model used for fitting
`decay`: sum of the convolution of exponential decay and instrumental response function
`dmp_osc`: sum of the convolution of damped oscillation and instrumental response function
`both`: sum of `decay` and `dmp_osc` model
same_t0 (bool): whether or not time zero set to same for every time delay scan in same dataset
name_of_dset (sequence of str): name of each dataset
t (sequence of np.ndarray): time range for each dataset
intensity (sequence of np.ndarray): sequence of datasets of intensity of time delay scan
eps (sequence of np.ndarray): sequence of datasets for estimated error of time delay scan
fit (sequence of np.ndarray): fitting curve for each data set
fit_decay (sequence of np.ndarray): decay part of fitting curve for each data set [model = 'both']
fit_osc (sequence of np.ndarray): oscillation part of fitting curve for each data set [model = 'both']
res (sequence of np.ndarray): residual curve (data-fit) for each data set
irf ({'g', 'c', 'pv'}): shape of instrument response function
'g': gaussian instrumental response function
'c': cauchy (lorenzian) instrumental response function
'pv': pseudo voigt instrumental response function (linear combination of gaussian and lorenzian function)
fwhm (float): unifrom fwhm parameter for pseudo voigt function :math:`((1-\\eta)*g(t, {fwhm})+\\eta*c{t, {fwhm}})`
eta (float): mixing parameter for pseudo voigt function :math:`((1-\\eta)*g(t, {fwhm})+\\eta*c{t, {fwhm}})`
param_name (np.ndarray): name of parameter
n_decay (int): number of decay components (except baseline feature)
n_osc (int): number of damped oscillation components
x (np.ndarray): best parameter
bounds (sequence of tuple): boundary of each parameter
base (bool): whether or not use baseline feature in fitting process
c (sequence of np.ndarray): best weight of each component of each datasets
phase (sequence of np.ndarray): phase factor of each oscillation component of each datasets [mode = 'dmp_osc', 'both']
chi2 (float): total chi squared value of fitting
aic (float): Akaike Information Criterion statistic: :math:`N\\log(\\chi^2/N)+2N_{parm}`
bic (float): Bayesian Information Criterion statistic: :math:`N\\log(\\chi^2/N)+N_{parm}\\log(N)`
chi2_ind (np.ndarray): chi squared value of individual time delay scan
red_chi2 (float): total reduced chi squared value of fitting
red_chi2_ind (np.ndarray): reduced chi squared value of individul time delay scan
nfev (int): total number of function evaluation
n_param (int): total number of effective parameter
n_param_ind (int): number of parameter which affects fitting quality of indiviual time delay scan
num_pts (int): total number of data points
jac (np.ndarray): jacobian of objective function at optimal point
cov (np.ndarray): covariance matrix (i.e. inverse of :math:`J^T J`)
cov_scaled (np.ndarray): scaled covariance matrix (i.e. :math:`\\chi^2_{red} \\cdot {cov}`)
corr (np.ndarray): parameter correlation matrix
x_eps (np.ndarray): estimated error of parameter
(i.e. square root of diagonal element of `conv_scaled`)
method_glb ({'ampgo', 'basinhopping'}):
method of global optimization used in fitting process
message_glb (str): messages from global optimization process
method_lsq ({'trf', 'dogbox', 'lm'}): method of local optimization for least_squares
minimization (refinement of global optimization solution)
success_lsq (bool): whether or not local least square optimization is successed
message_lsq (str): messages from local least square optimization process
status ({0, -1}): status of optimization process
`0` : normal termination
`-1` : least square optimization process is failed
'''
def __getattr__(self, name):
try:
return self[name]
except KeyError as e:
raise AttributeError(name) from e
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def __dir__(self):
return list(self.keys())
def __str__(self, corr_tol: float = 1e-1):
'''
Report TransientResult class
Args:
result: TransientResult class instance which has fitting result
corr_tol: parameter correlation greather `corr_tol` would be reported
Returns:
string which reports fitting results
'''
doc_lst = []
doc_lst.append('[Model information]')
doc_lst.append(f" model : {self['model']}")
doc_lst.append(f" irf: {self['irf']}")
doc_lst.append(f" fwhm: {self['fwhm']: .4f}")
doc_lst.append(f" eta: {self['eta']: .4f}")
doc_lst.append(f" base: {self['base']}")
doc_lst.append(' ')
doc_lst.append('[Optimization Method]')
if self['method_glb'] is not None:
doc_lst.append(f" global: {self['method_glb']}")
doc_lst.append(f" leastsq: {self['method_lsq']}")
doc_lst.append(' ')
doc_lst.append('[Optimization Status]')
doc_lst.append(f" nfev: {self['nfev']}")
doc_lst.append(f" status: {self['status']}")
if self['method_glb'] is not None:
doc_lst.append(f" global_opt msg: {self['message_glb']}")
doc_lst.append(f" leastsq_opt msg: {self['message_lsq']}")
doc_lst.append(' ')
doc_lst.append('[Optimization Results]')
doc_lst.append(f" Total Data points: {self['num_pts']}")
doc_lst.append(
f" Number of effective parameters: {self['n_param']}")
doc_lst.append(
f" Degree of Freedom: {self['num_pts']-self['n_param']}")
doc_lst.append(
f" Chi squared: {self['chi2']: .4f}".rstrip('0').rstrip('.'))
doc_lst.append(
f" Reduced chi squared: {self['red_chi2']: .4f}".rstrip('0').rstrip('.'))
doc_lst.append(
f" AIC (Akaike Information Criterion statistic): {self['aic']: .4f}".rstrip('0').rstrip('.'))
doc_lst.append(
f" BIC (Bayesian Information Criterion statistic): {self['bic']: .4f}".rstrip('0').rstrip('.'))
doc_lst.append(' ')
doc_lst.append('[Parameters]')
for pn, pv, p_err in zip(self['param_name'], self['x'], self['x_eps']):
doc_lst.append(
f' {pn}: {pv: .8f} +/- {p_err: .8f} ({100*np.abs(p_err/pv): .2f}%)'.rstrip('0').rstrip('.'))
doc_lst.append(' ')
doc_lst.append('[Parameter Bound]')
for pn, pv, bd in zip(self['param_name'], self['x'], self['bounds']):
doc_lst.append(f' {pn}: {bd[0]: .8f}'.rstrip('0').rstrip(
'.')+f' <= {pv: .8f} <= {bd[1]: .8f}'.rstrip('0').rstrip('.'))
doc_lst.append(' ')
if self['model'] in ['dmp_osc', 'both']:
doc_lst.append('[Phase Factor]')
for i in range(len(self['phase'])):
doc_lst.append(f" DataSet {self['name_of_dset'][i]}:")
row = [' #tscan']
for j in range(self['c'][i].shape[1]):
row.append(f'tscan_{j+1}')
doc_lst.append('\t'.join(row))
for o in range(self['n_osc']):
row = [f' dmp_osc {o+1}']
for l in range(self['phase'][i].shape[1]):
row.append(f"{self['phase'][i][o, l]/np.pi: .4f} π")
doc_lst.append('\t'.join(row))
doc_lst.append(' ')
doc_lst.append('[Component Contribution]')
if self['model'] == 'raise':
tot_decay = self['n_decay']-1
else:
tot_decay = self['n_decay']
tot_decay_tmp = tot_decay
for i in range(len(self['c'])):
doc_lst.append(f" DataSet {self['name_of_dset'][i]}:")
row = [' #tscan']
coeff_abs = np.abs(self['c'][i])
coeff_sum = np.sum(coeff_abs, axis=0)
coeff_contrib = np.einsum('j,ij->ij', 100/coeff_sum, self['c'][i])
for j in range(coeff_contrib.shape[1]):
row.append(f'tscan_{j+1}')
doc_lst.append('\t'.join(row))
for d in range(tot_decay):
row = [f' decay {d+1}']
for l in range(coeff_contrib.shape[1]):
row.append(f'{coeff_contrib[d, l]: .2f}%')
doc_lst.append('\t'.join(row))
if self['base']:
tot_decay_tmp = tot_decay+1
row = [' base']
for l in range(coeff_contrib.shape[1]):
row.append(f'{coeff_contrib[tot_decay_tmp-1,l]: .2f}%')
doc_lst.append('\t'.join(row))
for o in range(tot_decay_tmp, tot_decay_tmp+self['n_osc']):
row = [f' dmp_osc {o+1-tot_decay_tmp}']
for l in range(coeff_contrib.shape[1]):
row.append(f'{coeff_contrib[o, l]: .2f}%')
doc_lst.append('\t'.join(row))
doc_lst.append(' ')
doc_lst.append('[Parameter Correlation]')
doc_lst.append(f' Parameter Correlations > {corr_tol: .3f}'.rstrip(
'0').rstrip('.') + ' are reported.')
A = np.empty((len(self['x']), len(self['x'])), dtype=object)
for i in range(len(self['x'])):
for j in range(len(self['x'])):
A[i, j] = (i, j)
mask = (np.abs(self['corr']) > corr_tol)
for pair in A[mask]:
if pair[0] > pair[1]:
tmp_str_lst = [f" ({self['param_name'][pair[0]]},"]
tmp_str_lst.append(f"{self['param_name'][pair[1]]})")
tmp_str_lst.append('=')
tmp_str_lst.append(
f"{self['corr'][pair]: .3f}".rstrip('0').rstrip('.'))
doc_lst.append(' '.join(tmp_str_lst))
return '\n'.join(doc_lst)
[docs]
def save_TransientResult(result: TransientResult, filename: str):
'''
save transient fitting result to the h5 file
Args:
result: transient fitting result
filename: filename to store result. It will store result to filename.h5
Returns:
h5 file which stores result
'''
model_key_lst = ['model', 'irf', 'fwhm', 'eta', 'n_decay', 'n_osc',
'base', 'chi2', 'n_param', 'n_param_ind', 'num_pts', 'red_chi2',
'aic', 'bic', 'nfev',
'method_lsq', 'success_lsq', 'message_lsq', 'status']
exp_key_lst = ['t', 'intensity', 'eps']
fit_key_lst = ['fit', 'res', 'c', 'chi2_ind', 'red_chi2_ind']
exp_dir_lst = ['time_delay', 'intensity', 'error']
fit_dir_lst = ['fit', 'res', 'weight', 'chi2_ind', 'red_chi2_ind']
name_of_dset = result['name_of_dset']
with h5.File(f'{filename}.h5', 'w') as f:
f.create_dataset('name_of_time_delay_scan_datasets',
data=np.char.encode(name_of_dset.astype('str'),
encoding='utf-8').astype('S100'),
dtype='S100')
expt = f.create_group('experiment')
fit_res = f.create_group('fitting_result')
for i in range(len(name_of_dset)):
expt_dset = expt.create_group(name_of_dset[i])
fit_res_dset = fit_res.create_group(name_of_dset[i])
for k, d in zip(exp_key_lst, exp_dir_lst):
expt_dset.create_dataset(d, data=result[k][i])
for k, d in zip(fit_key_lst, fit_dir_lst):
fit_res_dset.create_dataset(d, data=result[k][i])
if result['model'] == 'decay':
fit_res_dset.create_dataset(
'tau_mask', data=result['tau_mask'][i])
if result['model'] == 'both':
fit_res_dset.create_dataset(
'fit_osc', data=result['fit_osc'][i])
fit_res_dset.create_dataset(
'fit_decay', data=result['fit_decay'][i])
if result['model'] in ['dmp_osc', 'both']:
fit_res_dset.create_dataset('phase', data=result['phase'][i])
for k in model_key_lst:
fit_res.attrs[k] = result[k]
fit_res.attrs['same_t0'] = result['same_t0']
if result['method_glb'] is None:
fit_res.attrs['method_glb'] = 'no'
else:
fit_res.attrs['method_glb'] = result['method_glb']
fit_res.attrs['message_glb'] = result['message_glb']
fit_res_param = fit_res.create_group('parameter')
fit_res_param.create_dataset('param_opt', data=result['x'])
fit_res_param.create_dataset('param_eps', data=result['x_eps'])
fit_res_param.create_dataset(
'param_name',
data=np.char.encode(result['param_name'].astype('str'),
encoding='utf-8').astype('S100'), dtype='S100')
fit_res_param.create_dataset('param_bounds', data=result['bounds'])
fit_res_param.create_dataset('correlation', data=result['corr'])
fit_res_mis = fit_res.create_group('miscellaneous')
fit_res_mis.create_dataset('jac', data=result['jac'])
fit_res_mis.create_dataset('cov', data=result['cov'])
fit_res_mis.create_dataset('cov_scaled', data=result['cov_scaled'])
[docs]
def load_TransientResult(filename: str) -> TransientResult:
'''
load transient fitting result from h5 file
Args:
filename: filename to load result. It will load filename.h5
Returns:
transient fitting result loaded from h5 file
'''
model_key_lst = ['model', 'irf', 'fwhm', 'eta', 'n_decay', 'n_osc',
'base', 'chi2', 'n_param', 'n_param_ind', 'num_pts', 'red_chi2',
'aic', 'bic', 'nfev', 'method_lsq', 'success_lsq', 'message_lsq', 'status']
exp_key_lst = ['intensity', 'eps']
fit_key_lst = ['fit', 'res', 'c', 'chi2_ind', 'red_chi2_ind']
exp_dir_lst = ['intensity', 'error']
fit_dir_lst = ['fit', 'res', 'weight', 'chi2_ind', 'red_chi2_ind']
result = TransientResult()
with h5.File(f'{filename}.h5', 'r') as f:
expt = f['experiment']
fit_res = f['fitting_result']
for k in model_key_lst:
result[k] = fit_res.attrs[k]
try:
result['same_t0'] = fit_res.attrs['same_t0']
except KeyError:
result['same_t0'] = False
if fit_res.attrs['method_glb'] == 'no':
result['method_glb'] = None
result['message_glb'] = None
else:
result['method_glb'] = fit_res.attrs['method_glb']
result['message_glb'] = fit_res.attrs['message_glb']
result['name_of_dset'] = np.char.decode(np.atleast_1d(
f['name_of_time_delay_scan_datasets']), encoding='utf-8')
result['t'] = np.empty(len(result['name_of_dset']), dtype=object)
for k in exp_key_lst:
result[k] = np.empty(len(result['name_of_dset']), dtype=object)
for k in fit_key_lst:
result[k] = np.empty(len(result['name_of_dset']), dtype=object)
if result['model'] == 'decay':
result['tau_mask'] = np.empty(
len(result['name_of_dset']), dtype=object)
if result['model'] in ['dmp_osc', 'both']:
result['phase'] = np.empty(
len(result['name_of_dset']), dtype=object)
if result['model'] == 'both':
result['fit_osc'] = np.empty(
len(result['name_of_dset']), dtype=object)
result['fit_decay'] = np.empty(
len(result['name_of_dset']), dtype=object)
for i in range(len(result['name_of_dset'])):
expt_dset = expt[result['name_of_dset'][i]]
fit_res_dset = fit_res[result['name_of_dset'][i]]
result['t'][i] = np.atleast_1d(expt_dset['time_delay'])
for k, d in zip(exp_key_lst, exp_dir_lst):
result[k][i] = np.atleast_2d(expt_dset[d])
for k, d in zip(fit_key_lst, fit_dir_lst):
result[k][i] = np.atleast_2d(fit_res_dset[d])
if result['model'] == 'decay':
result['tau_mask'][i] = \
np.array(fit_res_dset['tau_mask'], dtype=bool)
if result['model'] in ['both', 'dmp_osc']:
result['phase'][i] = np.atleast_2d(fit_res_dset['phase'])
if result['model'] == 'both':
result['fit_osc'][i] = fit_res_dset['fit_osc']
result['fit_decay'][i] = fit_res_dset['fit_decay']
fit_res_param = fit_res['parameter']
result['x'] = np.atleast_1d(fit_res_param['param_opt'])
result['x_eps'] = np.atleast_1d(fit_res_param['param_eps'])
result['param_name'] = np.char.decode(np.atleast_1d(fit_res_param['param_name']),
encoding='utf-8')
tmp = np.atleast_2d(fit_res_param['param_bounds'])
lst = tmp.shape[0]*[None]
for i in range(tmp.shape[0]):
lst[i] = (tmp[i, 0], tmp[i, 1])
result['bounds'] = lst
result['corr'] = np.atleast_2d(fit_res_param['correlation'])
fit_res_mis = fit_res['miscellaneous']
result['jac'] = np.atleast_2d(fit_res_mis['jac'])
result['cov'] = np.atleast_2d(fit_res_mis['cov'])
result['cov_scaled'] = np.atleast_2d(fit_res_mis['cov_scaled'])
return result