Source code for pyoof.core

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Author: Tomas Cassanelli
import numpy as np
import matplotlib.pyplot as plt
from astropy.table import Table
from astropy import units as apu
from astropy.utils.data import get_pkg_data_filename
from scipy import interpolate, optimize
import os
import time
import yaml
from .aperture import radiation_pattern, phase
from .math_functions import co_matrices, norm, snr
from .plot_routines import plot_fit_path
from .aux_functions import store_data_csv, store_data_ascii

__all__ = [
    'residual_true', 'residual', 'params_complete', 'fit_zpoly',
    ]


[docs]def residual_true( params, beam_data, u_data, v_data, d_z, wavel, illum_func, telgeo, resolution, box_factor, interp ): """ Computes the true residual ready to use for the `~pyoof.fit_zpoly` function. True means that some of the parameters used will **not** be fitted. Their selection is done by default or by adding ``config_params.yml`` file to the `~pyoof.fit_zpoly` function. Parameters ---------- params : `~numpy.ndarray` Two stacked arrays, the illumination and Zernike circle polynomials coefficients. ``params = np.hstack([I_coeff, K_coeff])``. beam_data : `~numpy.ndarray` The ``beam_data`` is an array with the three observed beam maps, :math:`P^\\mathrm{obs}(u, v)`, minus, zero and plus out-of-focus. u_data : `~astropy.units.quantity.Quantity` :math:`x` axis value for the 3 beam maps in radians. The values have to be flatten, in one dimension, and stacked in the same order as the ``d_z = [d_z-, 0., d_z+]`` values from each beam map. v_data : `~astropy.units.quantity.Quantity` :math:`y` axis value for the 3 beam maps in radians. The values have to be flatten, one dimensional, and stacked in the same order as the ``d_z = [d_z-, 0., d_z+]`` values from each beam map. d_z : `~astropy.units.quantity.Quantity` Radial offset :math:`d_z`, added to the sub-reflector in length units. This characteristic measurement adds the classical interference pattern to the beam maps, normalized squared (field) radiation pattern, which is an out-of-focus property. The radial offset list must be as follows, ``d_z = [d_z-, 0., d_z+]`` all of them in length units. wavel : `~astropy.units.quantity.Quantity` Wavelength, :math:`\\lambda`, of the observation in meters. illum_func : `function` Illumination function, :math:`E_\\mathrm{a}(x, y)`, to be evaluated with the key ``I_coeff``. The illumination functions available are `~pyoof.aperture.illum_parabolic` and `~pyoof.aperture.illum_gauss`. telgeo : `list` List that contains the blockage distribution, optical path difference (OPD) function, and the primary radius (`float`) in meters. The list must have the following order, ``telego = [block_dist, opd_func, pr]``. resolution : `int` Fast Fourier Transform resolution for a rectangular grid. The input value has to be greater or equal to the telescope resolution and with power of 2 for faster FFT processing. It is recommended a value higher than ``resolution = 2 ** 8``. box_factor : `int` Related to the FFT resolution (**resolution** key), defines the image pixel size level. It depends on the primary radius, ``pr``, of the telescope, e.g. a ``box_factor = 5`` returns ``x = np.linspace(-5 * pr, 5 * pr, resolution)``, an array to be used in the FFT2 (`~numpy.fft.fft2`). interp : `bool` If `True`, it will process the correspondent interpolation between the observed grid (:math:`P^\\mathrm{obs}(u, v)`) and the computed grid (:math:`P(u, v)`) for the FFT2 aperture distribution model (:math:`\\underline{E_\\mathrm{a}}(x, y)`). Returns ------- _residual_true : `~numpy.ndarray` One dimensional array of the residual between the observed data and the FFT aperture distribution model. It has been concatenated as minus, zero and plus radial offset (to do a multiple fit). It is required to have the residual in one dimension in order to use a least squares minimization `~scipy.optimize.least_squares` package. """ I_coeff, K_coeff = params[:5], params[5:] beam_model = np.zeros_like(beam_data) for i in range(3): u, v, F = radiation_pattern( I_coeff=I_coeff, K_coeff=K_coeff, d_z=d_z[i], wavel=wavel, illum_func=illum_func, telgeo=telgeo, resolution=resolution, box_factor=box_factor ) power_pattern = np.abs(F) ** 2 if interp: # The calculated beam needs to be transformed! intrp = interpolate.RegularGridInterpolator( points=(u.to_value(apu.rad), v.to_value(apu.rad)), values=power_pattern.T, # data in grid method='linear' # linear or nearest ) # input interpolation function is the real beam grid beam_model[i, ...] = ( intrp(np.array([ u_data[i, ...].to_value(apu.rad), v_data[i, ...].to_value(apu.rad) ]).T) ) else: beam_model[i, ...] = power_pattern _residual_true = norm(beam_data, axis=1) - norm(beam_model, axis=1) return _residual_true.flatten()
[docs]def residual( params, N_K_coeff, beam_data, u_data, v_data, d_z, wavel, illum_func, telgeo, resolution, box_factor, interp, config_params ): """ Wrapper for the `~pyoof.residual_true` function. The objective of this function is to fool the `~scipy.optimize.least_squares` package by changing the number of parameters that will be used in the fit. The parameter array must be organized as follows, ``params = np.hstack([ I_coeff, K_coeff])``. The parameter selection is done by default or by adding a ``config_params.yml`` file to the `~pyoof.fit_zpoly` function. Parameters ---------- params : `~numpy.ndarray` Two stacked arrays, the illumination and Zernike circle polynomials coefficients. ``params = np.hstack([I_coeff, K_coeff])``. N_K_coeff : `int` Total number of Zernike circle polynomials coefficients to fit. It is obtained from the order to be fitted with the formula ``N_K_coeff = (n + 1) * (n + 2) // 2.``. beam_data : `~numpy.ndarray` The ``beam_data`` is an array with the three observed beam maps, :math:`P^\\mathrm{obs}(u, v)`, minus, zero and plus out-of-focus. u_data : `~astropy.units.quantity.Quantity` :math:`x` axis value for the 3 beam maps in radians. The values have to be flatten, in one dimension, and stacked in the same order as the ``d_z = [d_z-, 0., d_z+]`` values from each beam map. v_data : `~astropy.units.quantity.Quantity` :math:`y` axis value for the 3 beam maps in radians. The values have to be flatten, one dimensional, and stacked in the same order as the ``d_z = [d_z-, 0., d_z+]`` values from each beam map. d_z : `~astropy.units.quantity.Quantity` Radial offset :math:`d_z`, added to the sub-reflector in length units. This characteristic measurement adds the classical interference pattern to the beam maps, normalized squared (field) radiation pattern, which is an out-of-focus property. The radial offset list must be as follows, ``d_z = [d_z-, 0., d_z+]`` all of them in length units. wavel : `~astropy.units.quantity.Quantity` Wavelength, :math:`\\lambda`, of the observation in meters. illum_func : `function` Illumination function, :math:`E_\\mathrm{a}(x, y)`, to be evaluated with the key ``I_coeff``. The illumination functions available are `~pyoof.aperture.illum_parabolic` and `~pyoof.aperture.illum_gauss`. telgeo : `list` List that contains the blockage distribution, optical path difference (OPD) function, and the primary radius (`float`) in meters. The list must have the following order, ``telego = [block_dist, opd_func, pr]``. resolution : `int` Fast Fourier Transform resolution for a rectangular grid. The input value has to be greater or equal to the telescope resolution and with power of 2 for faster FFT processing. It is recommended a value higher than ``resolution = 2 ** 8``. box_factor : `int` Related to the FFT resolution (**resolution** key), defines the image pixel size level. It depends on the primary radius, ``pr``, of the telescope, e.g. a ``box_factor = 5`` returns ``x = np.linspace(-5 * pr, 5 * pr, resolution)``, an array to be used in the FFT2 (`~numpy.fft.fft2`). interp : `bool` If `True`, it will process the correspondent interpolation between the observed grid (:math:`P^\\mathrm{obs}(u, v)`) and the computed grid (:math:`P(u, v)`) for the FFT2 aperture distribution model (:math:`\\underline{E_\\mathrm{a}}(x, y)`). config_params : `dict` Contains the values for the fixed parameters (excluded from the least squares minimization), by default four parameters are kept fixed, ``i_amp``, ``x0``, ``y0`` and ``K(0, 0)``. See the ``config_params.yml`` file. Returns ------- _residual_true : `~numpy.ndarray` Same output from `~pyoof.residual_true`. One dimensional array of the residual between the observed data and the FFT aperture distribution model. It has been concatenated as minus, zero and plus radial offset (to do a multiple fit). It is required to have the residual in one dimension in order to use a least squares minimization `~scipy.optimize.least_squares` package. Notes ----- The **idx_exclude** key (``config_params['excluded']``) needs an indices list of the parameters to be removed. The structure of the parameters always follows, ``params = np.hstack([I_coeff, K_coeff])``, a list with ``idx_exclude = [0, 1, 2, 4, 5, 6, 7]`` will remove from the least squares minimization, ``[i_amp, c_dB, q, x0, y0, K(0, 0), K(1, 1), K(1, -1)]``. """ # Parameters list for the true fit params_res = params_complete(params, N_K_coeff, config_params) _residual_true = residual_true( params=params_res, beam_data=beam_data, u_data=u_data, v_data=v_data, d_z=d_z, wavel=wavel, resolution=resolution, box_factor=box_factor, illum_func=illum_func, telgeo=telgeo, interp=interp, ) return _residual_true
[docs]def params_complete(params, N_K_coeff, config_params): """ This function fills the missing parameters not used in the lease squares minimization, they are required to compute the correct aperture distribution, :math:`\\underline{E_\\mathrm{a}}(x, y)`. The parameter selection is done by default or by adding a ``config_params.yml`` file to the `~pyoof.fit_zpoly` function. Parameters ---------- params : `~numpy.ndarray` Contains the incomplete array of parameters, the ``params`` array will be updated for the correct number of parameters to be used in the `~pyoof.residual_true` function. The array should be of the shape, ``params = np.hstack([I_coeff, K_coeff])``, missing or not some of the ``idx_exclude = [0, 1, 2, 3, 4, 5, 6, 7]`` parameters. N_K_coeff : `int` Total number of Zernike circle polynomials coefficients to fit. It is obtained from the order to be fitted with the formula ``N_K_coeff = (n + 1) * (n + 2) // 2.``. config_params : `dict` Contains the values for the fixed parameters (excluded from the least squares minimization), for default parameters, see the ``config_params.yml`` file. Returns ------- params_updated : `~numpy.ndarray` Complete set of parameters to be used in the `~pyoof.residual_true` function. """ [ i_amp_f, c_dB_f, q_f, x0_f, y0_f, Knl0_f, Knl1_f, Knl2_f ] = config_params['fixed'] # N_K_coeff number of Zernike circle polynomials coefficients if params.size != (5 + N_K_coeff): params_updated = params.copy() for i in config_params['excluded']: if i == 0: params_updated = np.insert(params_updated, i, i_amp_f) elif i == 1: params_updated = np.insert(params_updated, i, c_dB_f) elif i == 2: params_updated = np.insert(params_updated, i, q_f) elif i == 3: params_updated = np.insert(params_updated, i, x0_f) elif i == 4: params_updated = np.insert(params_updated, i, y0_f) elif i == 5: params_updated = np.insert(params_updated, i, Knl0_f) elif i == 6: params_updated = np.insert(params_updated, i, Knl1_f) elif i == 7: params_updated = np.insert(params_updated, i, Knl2_f) else: params_updated = params return params_updated
[docs]def fit_zpoly( data_info, data_obs, order_max, illum_func, telescope, resolution, box_factor, fit_previous=True, config_params_file=None, make_plots=False, verbose=2, work_dir=None ): """ Computes the Zernike circle polynomial coefficients, ``K_coeff``, and the illumination function coefficients, ``I_coeff``, stores and plots data ( optional) by using a least squares minimization. The stored data belongs to the best fitted power pattern (or beam map). `~pyoof.fit_zpoly` is the core function from the `~pyoof` package. Parameters ---------- data_info : `list` It contains all extra data besides the beam map. The output corresponds to a list, ``[name, pthto, obs_object, obs_date, freq, wavel, d_z, meanel]``. These are, name of the FITS file, paht of the FITS file, observed object, observation date, frequency, wavelength, radial offset and mean elevation, respectively. data_obs : `list` It contains beam maps and :math:`x`-, and :math:`y`-axis (:math:`uv`-plane in Fourier space) data for the least squares minimization (see `~pyoof.fit_zpoly`). The list has the following order ``[beam_data, u_data, v_data]``. ``beam_data`` is the three beam observations, minus, zero and plus out-of-focus, in a flat array. ``u_data`` and ``v_data`` are the beam axes in a flat array. order_max : `int` Maximum order used for the Zernike circle polynomials, :math:`n`, least squares minimization. If ``order_max = 3``, it will do the optimization for orders 1, 2 and 3. illum_func : `function` Illumination function, :math:`E_\\mathrm{a}(x, y)`, to be evaluated with the key ``I_coeff``. The illumination functions available are `~pyoof.aperture.illum_parabolic` and `~pyoof.aperture.illum_gauss`. telescope : `list` List that contains the blockage distribution, optical path difference (OPD) function, primary radius (`float`) in meters, and telescope name (`str`). The list must have the following order, ``telescope = [block_dist, opd_func, pr, tel_name]``. resolution : `int` Fast Fourier Transform resolution for a rectangular grid. The input value has to be greater or equal to the telescope resolution and with power of 2 for faster FFT processing. It is recommended a value higher than ``resolution = 2 ** 8``. box_factor : `int` Related to the FFT resolution (**resolution** key), defines the image pixel size level. It depends on the primary radius, ``pr``, of the telescope, e.g. a ``box_factor = 5`` returns ``x = np.linspace(-5 * pr, 5 * pr, resolution)``, an array to be used in the FFT2 (`~numpy.fft.fft2`). fit_previous : `bool` If set to `True`, it will fit the coefficients from the previous optimization this feature is strongly suggested. If `False`, it will find the new coefficients by using the standard initial coefficients. config_params_file : `str` Path for the configuration file, this includes, the maximum and minimum bounds, excluded, fixed and initial parameters for the optimization. See ``config_params.yml`` in the pyoof package directory. make_plots : `bool` If `True` will generate a sub-directory with all the important plots for the OOF holography, including the phase-error, :math:`\\varphi(x, y)` and fitted beam, :math:`P_\\mathrm{norm}(u, v)`. verbose : `int` {0, 1, 2} Level of algorithm verbosity. 0 work silent, 1 display termination report, 2, display progress during iteration (default). work_dir : `str` Default is `None`, it will store the ``pyoof_out/`` folder in the FITS file current directory, for other provide the desired path. """ start_time = time.time() print('\n ***** PYOOF FIT POLYNOMIALS ***** \n') print(' ... Reading data ...\n') # All observed data needed to fit the beam [name, pthto, obs_object, obs_date, freq, wavel, d_z, meanel] = data_info [beam_data, u_data, v_data] = data_obs if work_dir is None: work_dir = pthto telgeo, tel_name = telescope[:3], telescope[3] # Calling default configuration file from the pyoof package if config_params_file is None: config_params_pyoof = get_pkg_data_filename('data/config_params.yml') with open(config_params_pyoof, 'r') as yaml_config: config_params = yaml.load(yaml_config, Loader=yaml.Loader) else: with open(config_params_file, 'r') as yaml_config: config_params = yaml.load(yaml_config, Loader=yaml.Loader) # Storing files in pyoof_out directory if not os.path.exists(os.path.join(work_dir, 'pyoof_out')): os.makedirs(os.path.join(work_dir, 'pyoof_out'), exist_ok=True) for j in ["%03d" % i for i in range(101)]: name_dir = os.path.join(work_dir, 'pyoof_out', name + '-' + j) if not os.path.exists(name_dir): os.makedirs(name_dir, exist_ok=True) break _snr = [] for i in range(3): _snr.append( np.round(snr(beam_data[i, ...], u_data[i, ...], v_data[i, ...]), 2) ) print( f'Maximum order to be fitted: {order_max}', f'Telescope name: {tel_name}', f'File name: {name}', f'Obs frequency: {freq.to(apu.GHz)}', f'Obs Wavelength: {wavel.to(apu.cm)}', f'Mean elevation {meanel.to(apu.deg)}', f'd_z (out-of-focus): {d_z.to(apu.cm)}', f'Illumination to be fitted: {illum_func.__qualname__}', f'SNR out-, in-, and out-focus beam: {_snr}', f'Beam data shape: {beam_data.shape}', sep='\n', end='\n' ) for order in range(1, order_max + 1): if not verbose == 0: print('\n ... Fit order {} ... \n'.format(order)) # Setting limits for plotting fitted beam plim = np.array([ u_data[0, ...].min().to_value(apu.rad), u_data[0, ...].max().to_value(apu.rad), v_data[0, ...].min().to_value(apu.rad), v_data[0, ...].max().to_value(apu.rad) ]) * u_data.unit n = order # order polynomial to fit N_K_coeff = (n + 1) * (n + 2) // 2 # number of K(n, l) to fit # Looking for result parameters lower order if fit_previous and n != 1: N_K_coeff_previous = n * (n + 1) // 2 path_params_previous = os.path.join( name_dir, f'fitpar_n{n - 1}.csv' ) params_to_add = np.ones(N_K_coeff - N_K_coeff_previous) * 0.1 params_previous = Table.read(path_params_previous, format='ascii') params_init = np.hstack((params_previous['parfit'], params_to_add)) if not verbose == 0: print('Initial params: n={} fit'.format(n - 1)) else: params_init = config_params['init'] + [0.1] * (N_K_coeff - 3) print('Initial parameters: default') # i_amp, c_dB, q, x0, y0, K(n, l) # Giving an initial value of 0.1 for each K_coeff idx_exclude = config_params['excluded'] # exclude params from fit # [0, 1, 2, 3, 4, 5, 6, 7] = # [i_amp, c_dB, q, x0, y0, K(0, 0), K(1, 1), K(1, -1)] # or 'None' to include all params_init_true = np.delete(params_init, idx_exclude) bound_min = config_params['bound_min'] + [-5] * (N_K_coeff - 3) bound_max = config_params['bound_max'] + [5] * (N_K_coeff - 3) bound_min_true = np.delete(bound_min, idx_exclude) bound_max_true = np.delete(bound_max, idx_exclude) if not verbose == 0: print('Parameters to fit: {}\n'.format(len(params_init_true))) # Running nonlinear least squares minimization res_lsq = optimize.least_squares( fun=residual, x0=params_init_true, args=( # Conserve this order in arguments! N_K_coeff, # Total Zernike circle polynomial coeff beam_data, # Power pattern maps u_data, # x coordinate beam map v_data, # y coordinate beam map d_z, # Radial offset wavel, # Wavelength of observation illum_func, # Illumination function telgeo, # telgeo = [block_dist, opd_func, pr] resolution, # FFT2 resolution for a rectangular grid box_factor, # Image pixel size level True, # Grid interpolation config_params # Params configuration for minimization (dict) ), bounds=tuple([bound_min_true, bound_max_true]), method='trf', tr_solver='exact', verbose=verbose, max_nfev=None ) # Solutions from least squared optimization params_solution = params_complete( params=res_lsq.x, N_K_coeff=N_K_coeff, config_params=config_params ) res_optim = res_lsq.fun.reshape(3, -1) # Optimum residual solution jac_optim = res_lsq.jac # Last Jacobian matrix grad_optim = res_lsq.grad # Last Gradient # covariance and correlation cov, cor = co_matrices( res=res_lsq.fun, jac=res_lsq.jac, n_pars=params_init_true.size # number of parameters fitted ) cov_ptrue = np.vstack( (np.delete(np.arange(N_K_coeff + 5), idx_exclude), cov)) cor_ptrue = np.vstack( (np.delete(np.arange(N_K_coeff + 5), idx_exclude), cor)) # Final phase from fit in the telescope's primary reflector _phase = phase( K_coeff=params_solution[5:], pr=telgeo[2], piston=False, tilt=False )[2].to_value(apu.rad) # Storing files in directory if not verbose == 0: print('\n ... Saving data ...\n') store_data_ascii( name=name, name_dir=name_dir, order=n, params_solution=params_solution, params_init=params_init, ) # Printing the results from saved ascii file if not verbose == 0: Table.read( os.path.join(name_dir, f'fitpar_n{n}.csv'), format='ascii' ).pprint_all() if n == 1: # TODO: yaml doesn't like astropy :( pyoof_info = dict( tel_name=tel_name, # tel_blockage=telgeo[0].__qualname__, tel_opd=telgeo[1].__qualname__, pr=float(telgeo[2].to_value(apu.m)), name=name, obs_object=obs_object, obs_date=obs_date, d_z=d_z.to_value(apu.m).tolist(), wavel=float(wavel.to_value(apu.m)), frequency=float(freq.to_value(apu.Hz)), illumination=illum_func.__qualname__, meanel=float(meanel.to_value(apu.deg)), fft_resolution=resolution, box_factor=box_factor, snr=list(float(_snr[i]) for i in range(3)) ) with open(os.path.join(name_dir, 'pyoof_info.yml'), 'w') as outf: outf.write('# pyoof relevant information\n') yaml.dump( pyoof_info, outf, default_flow_style=False, Dumper=yaml.Dumper ) # To store large files in csv format save_to_csv = [ beam_data, u_data.to_value(apu.rad), v_data.to_value(apu.rad), res_optim, jac_optim, grad_optim, _phase, cov_ptrue, cor_ptrue ] store_data_csv( name=name, name_dir=name_dir, order=n, save_to_csv=save_to_csv ) if make_plots: if not verbose == 0: print('\n ... Making plots ...') # Making all relevant plots plot_fit_path( path_pyoof_out=name_dir, order=n, telgeo=telgeo, illum_func=illum_func, plim=plim, save=True, angle=apu.deg ) plt.close('all') final_time = np.round((time.time() - start_time) / 60, 2) print(f'\n ***** PYOOF FIT COMPLETED AT {final_time} mins *****\n')