'''
_static_thy:
submodule for static spectrum with the
sum of voigt broadend theoretical spectrum, edge function and baseline function
:copyright: 2021-2022 by pistack (Junho Lee).
:license: LGPL3.
'''
from typing import Optional, Tuple, Sequence
import numpy as np
from numpy.polynomial.legendre import legval
from .static_result import StaticResult
from ._ampgo import ampgo
from scipy.optimize import basinhopping
from scipy.optimize import least_squares
from ..mathfun.peak_shape import edge_gaussian, edge_lorenzian, voigt_thy
from ..mathfun.A_matrix import fact_anal_A
from ..res.parm_bound import set_bound_e0, set_bound_t0
from ..res.res_thy import residual_thy, res_grad_thy
GLBSOLVER = {'basinhopping': basinhopping, 'ampgo': ampgo}
[docs]def fit_static_thy(thy_peak: Sequence[np.ndarray],
fwhm_G_init: float, fwhm_L_init: float,
policy: str, peak_shift: Optional[np.ndarray] = None,
peak_scale: Optional[np.ndarray] = None,
edge: Optional[str] = None,
edge_pos_init: Optional[np.ndarray] = None,
edge_fwhm_init: Optional[np.ndarray] = None,
base_order: Optional[int] = None,
method_glb: Optional[str] = None,
method_lsq: Optional[str] = 'trf',
kwargs_glb: Optional[dict] = None,
kwargs_lsq: Optional[dict] = None,
bound_fwhm_G: Optional[Tuple[float, float]] = None,
bound_fwhm_L: Optional[Tuple[float, float]] = None,
bound_peak_shift: Optional[Sequence[Tuple[float, float]]] = None,
bound_peak_scale: Optional[Sequence[Tuple[float, float]]] = None,
bound_edge_pos: Optional[Sequence[Tuple[float, float]]] = None,
bound_edge_fwhm: Optional[Sequence[Tuple[float, float]]] = None,
e: Optional[np.ndarray] = None,
intensity: Optional[np.ndarray] = None,
eps: Optional[np.ndarray] = None) -> StaticResult:
'''
driver routine for fitting static spectrum with sum of voigt broadend thoretical spectrum,
edge and polynomial base line. To solve least square optimization problem efficiently, it implements
the seperation scheme.
Moreover this driver uses two step algorithm to search best parameter, its covariance and
estimated parameter error.
Step 1. (method_glb)
Use global optimization to find rough global minimum of our objective function.
In this stage, it use analytic gradient for scalar residual function.
Step 2. (method_lsq)
Use least squares optimization algorithm to refine global minimum of objective function and approximate covariance matrix.
Because of linear and non-linear seperation scheme, the analytic jacobian for vector residual function is hard to optain.
Thus, in this stage, it uses numerical jacobian.
Args:
thy_peak (sequence of np.ndarray): peak position and intensity for theoretically calculated spectrum
fwhm_G_init (float): initial gaussian part of fwhm parameter
fwhm_L_init (float): initial lorenzian part of fwhm parameter
policy ({'shift', 'scale', 'both'}): policy to match discrepancy between thoretical spectrum and experimental one
peak_shift (np.ndarray): peak shift parameter for each species
peak_scale (np.ndarray): peak scale parameter for each species
edge ({'g', 'l'}): type of edge function. If edge is not set, edge feature is not included.
edge_pos_init (np.ndarray): initial edge position
edge_fwhm_init (np.ndarray): initial fwhm parameter of edge
method_glb ({None, 'basinhopping', 'ampgo'}): Method for global optimization
method_lsq ({'trf', 'dogbox', 'lm'}): method of local optimization for least_squares
minimization (refinement of global optimization solution)
kwargs_glb: keyward arguments for global optimization solver
kwargs_lsq: keyward arguments for least square optimization solver
bound_fwhm_G (tuple): boundary for fwhm_G parameter.
If `bound_fwhm_G` is `None`, the upper and lower bound are given as `(fwhm_G/2, 2*fwhm_G)`.
bound_fwhm_L (tuple): boundary for fwhm_L parameter.
If `bound_fwhm_L` is `None`, the upper and lower bound are given as `(fwhm_L/2, 2*fwhm_L)`.
bound_peak_shift (sequence of tuple): boundary for peak shift parameter.
If `bound_peak_shift` is `None`, the upper and lower bound are given by `set_bound_e0`.
bound_peak_scale (sequence of tuple): boundary for peak scale parameter. If `bound_peak_scale` is `None`, the upper and lower bound are
given as `(0.9*peak_scale, 1.1*peak_scale)`.
bound_edge_pos (sequence of tuple): boundary for edge position,
if `bound_edge_pos` is `None` and `edge` is set, the upper and lower bound are given by `set_bound_t0`.
bound_edge_fwhm (sequence of tuple): boundary for fwhm parameter of edge feature.
If `bound_edge_fwhm` is `None`, the upper and lower bound are given as `(edge_fwhm/2, 2*edge_fwhm)`.
e (np.narray): energy range for data
intensity (np.ndarray): intensity of static spectrum data
eps (np.ndarray): estimated errors of static spectrum data
Returns:
StaticResult class object
Note:
* if initial fwhm_G is zero then such voigt component is treated as lorenzian component
* if initial fwhm_L is zero then such voigt component is treated as gaussian component
* Every theoretical spectrum is normalize.
'''
if method_glb is not None and method_glb not in ['basinhopping', 'ampgo']:
raise Exception('Unsupported global optimization Method, Supported global optimization Methods are ampgo and basinhopping')
if method_lsq not in ['trf', 'lm', 'dogbox']:
raise Exception('Invalid local least square minimizer solver. It should be one of [trf, lm, dogbox]')
if edge is not None and edge not in ['g', 'l']:
raise Exception('Invalid Edge type.')
num_voigt = len(thy_peak)
num_param = 2 + num_voigt*(1+(policy == 'both'))
num_comp = num_voigt
num_edge = 0
if edge is not None:
num_edge = edge_pos_init.size
num_comp = num_comp+num_edge
num_param = num_param+2*num_edge
if base_order is not None:
num_comp = num_comp + base_order + 1
param = np.empty(num_param, dtype=float)
fix_param_idx = np.empty(num_param, dtype=bool)
param[0] = fwhm_G_init
param[1] = fwhm_L_init
if policy == 'shift':
param[2:2+num_voigt] = peak_shift
elif policy == 'scale':
param[2:2+num_voigt] = peak_scale
elif policy == 'both':
param[2:2+num_voigt] = peak_shift
param[2+num_voigt:2+2*num_voigt] = peak_scale
if edge is not None:
if policy in ['shift', 'scale']:
edge_param_start = 2+num_voigt
else:
edge_param_start = 2+2*num_voigt
param[edge_param_start:edge_param_start+num_edge] = edge_pos_init
param[edge_param_start+num_edge:] = edge_fwhm_init
bound = num_param*[None]
if bound_fwhm_G is None:
bound[0] = (fwhm_G_init/2, 2*fwhm_G_init)
else:
bound[0] = bound_fwhm_G
if bound_fwhm_L is None:
bound[1] = (fwhm_L_init/2, 2*fwhm_L_init)
else:
bound[1] = bound_fwhm_G
if policy in ['shift', 'both']:
if bound_peak_shift is None:
for i in range(num_voigt):
bound[2+i] = set_bound_e0(peak_shift[i],
fwhm_G_init, fwhm_L_init)
else:
bound[2:2+num_voigt] = bound_peak_shift
elif policy == 'scale':
if bound_peak_scale is None:
for i in range(num_voigt):
bound[2+i] = (0.9*peak_scale[i], 1.1*peak_scale[i])
else:
bound[2:2+num_voigt] = bound_peak_scale
if policy == 'both':
if bound_peak_scale is None:
for i in range(num_voigt):
bound[2+num_voigt+i] = (0.9*peak_scale[i], 1.1*peak_scale[i])
else:
bound[2+num_voigt:2+2*num_voigt] = bound_peak_scale
if edge is not None:
if bound_edge_pos is None:
for i in range(num_edge):
bound[edge_param_start+i] = \
set_bound_t0(edge_pos_init[i], edge_fwhm_init[i])
else:
bound[edge_param_start:edge_param_start+num_edge] = bound_edge_pos
if bound_edge_fwhm is None:
for i in range(num_edge):
bound[edge_param_start+num_edge+i] = \
(edge_fwhm_init[i]/2, 2*edge_fwhm_init[i])
else:
bound[edge_param_start+num_edge:] = bound_edge_fwhm
for i in range(num_param):
fix_param_idx[i] = (bound[i][0] == bound[i][1])
if method_glb is not None:
go_args = (policy, thy_peak, edge, num_edge, base_order, fix_param_idx,
e, intensity, eps)
min_go_kwargs = {'args': go_args, 'jac': True, 'bounds': bound}
if kwargs_glb is not None:
minimizer_kwargs = kwargs_glb.pop('minimizer_kwargs', None)
if minimizer_kwargs is None:
kwargs_glb['minimizer_kwargs'] = min_go_kwargs
else:
minimizer_kwargs['args'] = go_args
minimizer_kwargs['jac'] = True
minimizer_kwargs['bounds'] = bound
kwargs_glb['minimizer_kwargs'] = minimizer_kwargs
else:
kwargs_glb = {'minimizer_kwargs': min_go_kwargs}
res_go = GLBSOLVER[method_glb](res_grad_thy, param, **kwargs_glb)
else:
res_go = {}
res_go['x'] = param
res_go['message'] = None
res_go['nfev'] = 0
param_gopt = res_go['x']
lsq_args = (policy, thy_peak, edge, num_edge,
base_order, e, intensity, eps)
if kwargs_lsq is not None:
_ = kwargs_lsq.pop('args', None)
_ = kwargs_lsq.pop('kwargs', None)
kwargs_lsq['args'] = lsq_args
else:
kwargs_lsq = {'args': lsq_args}
bound_tuple = (num_param*[None], num_param*[None])
for i in range(num_param):
bound_tuple[0][i] = bound[i][0]
bound_tuple[1][i] = bound[i][1]
if bound[i][0] == bound[i][1]:
if bound[i][0] > 0:
bound_tuple[1][i] = bound[i][1]*(1+1e-8)+1e-16
else:
bound_tuple[1][i] = bound[i][1]*(1-1e-8)+1e-16
# jacobian for vector residual function is inaccurate
res_lsq = least_squares(residual_thy, param_gopt,
method=method_lsq, bounds=bound_tuple, **kwargs_lsq)
param_opt = res_lsq['x']
param_name = np.empty(param_opt.size, dtype=object)
param_name[0] = 'fwhm_G'
param_name[1] = 'fwhm_L'
fwhm_G_opt = param_opt[0]
fwhm_L_opt = param_opt[1]
if policy in ['scale', 'shift']:
peak_factor_opt = param_opt[2:2+num_voigt]
if policy == 'scale':
for i in range(num_voigt):
param_name[2+i] = f'peak_scale {i+1}'
else:
for i in range(num_voigt):
param_name[2+i] = f'peak_shift {i+1}'
elif policy == 'both':
peak_factor_opt = np.array(num_voigt, dtype=object)
for i in range(num_voigt):
peak_factor_opt[i] = np.ndarray([param_opt[2+i],
param_opt[2+num_voigt+i]])
param_name[2+i] = f'peak_shift {i+1}'
param_name[2+num_voigt+i] = f'peak_scale {i+1}'
# Calc individual chi2
chi = res_lsq['fun']
num_param_tot = num_comp+num_param-np.sum(fix_param_idx)
chi2 = 2*res_lsq['cost']
red_chi2 = chi2/(chi.size-num_param_tot)
if edge is not None:
for i in range(num_edge):
param_name[edge_param_start+i] = f'E0_{edge} {i+1}'
param_name[edge_param_start+num_edge+i] = \
f'fwhm_({edge}, edge {i+1})'
A = np.empty((num_comp, e.size))
area = np.empty(num_voigt)
for i in range(num_voigt):
area[i] = np.sum(thy_peak[i][:, 1])
A[i, :] = \
voigt_thy(e, thy_peak[i], fwhm_G_opt, fwhm_L_opt,
peak_factor_opt[i], policy)/area[i]
base_start = num_voigt
if edge is not None:
base_start = base_start+num_edge
if edge == 'g':
for i in range(num_edge):
A[num_voigt+i, :] = edge_gaussian(e-param_opt[edge_param_start+i],
param_opt[edge_param_start+num_edge+i])
elif edge == 'l':
for i in range(num_edge):
A[num_voigt+i, :] = edge_lorenzian(e-param_opt[edge_param_start+i],
param_opt[edge_param_start+num_edge+i])
if base_order is not None:
e_max = np.max(e)
e_min = np.min(e)
e_norm = 2*(e-(e_max+e_min)/2)/(e_max-e_min)
tmp = np.eye(base_order+1)
A[base_start:, :] = legval(e_norm, tmp, tensor=True)
c = fact_anal_A(A, intensity, eps)
fit = c@A
fit_comp = np.einsum('i,ij->ij', c[:base_start], A[:base_start, :])
base = None
if base_order is not None:
base = c[base_start:]@A[base_start:, :]
res = intensity - fit
c[:num_voigt] = c[:num_voigt]/area
jac = res_lsq['jac']
hes = jac.T @ jac
cov = np.zeros_like(hes)
n_free_param = np.sum(~fix_param_idx)
mask_2d = np.einsum('i,j->ij', ~fix_param_idx, ~fix_param_idx)
cov[mask_2d] = np.linalg.inv(hes[mask_2d].reshape(
(n_free_param, n_free_param))).flatten()
cov_scaled = red_chi2*cov
param_eps = np.sqrt(np.diag(cov_scaled))
corr = cov_scaled.copy()
weight = np.einsum('i,j->ij', param_eps, param_eps)
corr[mask_2d] = corr[mask_2d]/weight[mask_2d]
result = StaticResult()
result['model'] = 'thy'
result['policy'] = policy
result['thy_peak'] = thy_peak
result['e'] = e
result['intensity'] = intensity
result['eps'] = eps
result['fit'] = fit
result['fit_comp'] = fit_comp
result['res'] = res
result['base_order'] = base_order
result['edge'] = edge
result['n_voigt'] = num_voigt
result['n_edge'] = num_edge
result['param_name'] = param_name
result['x'] = param_opt
result['bounds'] = bound
result['base'] = base
result['c'] = c
result['chi2'] = chi2
result['aic'] = chi.size*np.log(chi2/chi.size)+2*num_param_tot
result['bic'] = chi.size * \
np.log(chi2/chi.size)+num_param_tot*np.log(chi.size)
result['red_chi2'] = red_chi2
result['nfev'] = res_go['nfev'] + res_lsq['nfev']
result['n_param'] = num_param_tot
result['num_pts'] = chi.size
result['jac'] = jac
result['cov'] = cov
result['corr'] = corr
result['cov_scaled'] = cov_scaled
result['x_eps'] = param_eps
result['method_lsq'] = method_lsq
result['message_lsq'] = res_lsq['message']
result['success_lsq'] = res_lsq['success']
if result['success_lsq']:
result['status'] = 0
else:
result['status'] = -1
if method_glb is not None:
result['method_glb'] = method_glb
result['message_glb'] = res_go['message'][0]
else:
result['method_glb'] = None
result['message_glb'] = None
return result