Source code for pulsar_spectra.spectral_fit

"""
Function used to fit different spectral models to the fluxs_mJy densities of pulsars
"""

import numpy as np
from iminuit import Minuit
from iminuit.cost import LeastSquares
from iminuit.util import propagate

import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter
from cycler import cycler

from pulsar_spectra.models import model_settings
from pulsar_spectra.catalogue import convert_cat_list_to_dict

import logging
logger = logging.getLogger(__name__)

[docs]def robust_cost_function(f_y, y, sigma_y, k=1.345): """Robust cost function. The negative log-likelihood of a Gaussian likelihood with Huber loss. Parameters ---------- f_y : `list` A list of predicted values according to the model. y : `list` A list of measured values at the same frequency as the model values. sigma_y : `list` A list of uncertainties corresponding to the measured values y. k : `float`, optional A constant that defines at which distance the loss function starts to penalize outliers. |br| Default: 1.345. Returns ------- beta : `float` The cost of the model fit. """ beta_array = [] for fi, yi, sigma_i in zip(f_y, y, sigma_y): relative_error = (fi - yi)/sigma_i if abs(relative_error) < k: beta_array.append( 1./2. * relative_error**2 ) else: beta_array.append( k * abs(relative_error) - 1./2. * k**2 ) return sum(beta_array)
[docs]def huber_loss_function(sq_resi, k=1.345): """Robust loss function which penalises outliers, as detailed in Jankowski et al (2018). Parameters ---------- sq_resi : `float` or `list` A single or list of the squared residuals. k : `float`, optional A constant that defines at which distance the loss function starts to penalize outliers. |br| Default: 1.345. Returns ------- rho : `float` or `list` The modified squared residuals. """ single_value = False if isinstance(sq_resi, float) or isinstance(sq_resi, int): sq_resi = np.array([sq_resi]) single_value = True elif isinstance(sq_resi, list): sq_resi = np.array(sq_resi) rho = [] residual = np.sqrt(abs(sq_resi)) for j in range(len(residual)): if residual[j] < k: rho.append( sq_resi[j]/2 ) else: rho.append( k * residual[j] - 1./2. * k**2 ) if single_value: return rho[0] else: return rho
[docs]def plot_fit(freqs_MHz, fluxs_mJy, flux_errs_mJy, ref, model, iminuit_result, fit_info, plot_error=True, save_name="fit.png", alternate_style=False, axis=None, secondary_fit=False, fit_range=None): """Create a plot of the pulsar spectral fit. Parameters ---------- freqs_MHz : `list` A list of the frequencies in MHz. fluxs_mJy : `list` A list of the flux density in mJy. flux_errs_mJy : `list` A list of the uncertainty of the flux density in mJy. ref : `list` A list of the reference label (in the format 'Author_year'). model : `function` One of the model functions from :py:meth:`pulsar_spectra.models`. iminuit_result : `iminuit.Minuit` The Minuit class after being fit in :py:meth:`pulsar_spectra.spectral_fit.iminuit_fit_spectral_model`. fit_info : `str` The string to label the fit with from :py:meth:`pulsar_spectra.spectral_fit.iminuit_fit_spectral_model`. plot_error : `boolean`, optional If you want to include the fit error in the plot. |br| Default: True. save_name : `str`, optional The name of the saved plot. |br| Default: "fit.png". alternate_style : `boolean`, optional Plot with the alternate plot style based on Jankowski 2018. |br| Default: False. axis : `Axes`, optional The axes with which the spectrum will be plotted. |br| None. secondary_fit : `boolean`, optional Plot model with an alternate style and without markers. |br| Default: False. fit_range : `tuple`, (`float`, `float`) optional Frequency range to plot the second model over in MHz, eg. (100, 3000). |br| Default: None, will use input frequency range. """ # Set up plot plotsize = 3.2 if axis is None: fig, ax = plt.subplots(figsize=(plotsize*4/3, plotsize)) else: ax = axis capsize = 1.5 errorbar_linewidth = 0.7 marker_border_thickness = 0.5 marker_scale = 0.7 marker_types = [("#006ddb", "o", 6), # blue circle ("#24ff24", "^", 7), # green triangle ("r", "D", 5), # red diamond ("#920000", "s", 5.5), # maroon square ("#6db6ff", "p", 6.5), # sky blue pentagon ("#ff6db6", "*", 9), # pink star ("m", "v", 7), # purple upside-down triangle ("#b6dbff", "d", 7), # light blue thin diamond ("#009292", "P", 7.5), # turqoise thick plus ("#b66dff", "h", 7), # lavender hexagon ("#db6d00", ">", 7), # orange right-pointing triangle ("c", "H", 7), # cyan sideways hexagon ("#ffb6db", "X", 7.5), # light pink thick cross ("#004949", "<", 7), # dark green right-pointing triangle ("k", "x", 7), # black thin cross ("y", "s", 5.5), # yellow square ("#009292", "^", 7), # turquoise triangle ("k", "d", 7), # black thin diamond ("#b6dbff", "*", 9), # light blue star ("y", "P", 7.5)] # yellow thick plus custom_cycler = (cycler(color = [p[0] for p in marker_types]) + cycler(marker = [p[1] for p in marker_types]) + cycler(markersize = np.array([p[2] for p in marker_types])*marker_scale)) ax.set_prop_cycle(custom_cycler) # Add data data_dict = convert_cat_list_to_dict({"dummy_pulsar":[freqs_MHz, fluxs_mJy, flux_errs_mJy, ref]})["dummy_pulsar"] if not secondary_fit: for ref in data_dict.keys(): freqs_ref = np.array(data_dict[ref]['Frequency MHz']) fluxs_ref = np.array(data_dict[ref]['Flux Density mJy']) flux_errs_ref = np.array(data_dict[ref]['Flux Density error mJy']) (_, caps, _) = ax.errorbar( freqs_ref, fluxs_ref, yerr=flux_errs_ref, linestyle='None', mec='k', markeredgewidth=marker_border_thickness, elinewidth=errorbar_linewidth, capsize=capsize, label=ref.replace('_',' ') ) for cap in caps: cap.set_markeredgewidth(errorbar_linewidth) # Create fit line if fit_range is None: # No fit range given so use full range fitted_freq = np.logspace(np.log10(min(freqs_MHz)), np.log10(max(freqs_MHz)), 100) else: # Use input fit range min_freq, max_freq = fit_range fitted_freq = np.logspace(np.log10(min_freq), np.log10(max_freq), 100) if iminuit_result.valid: fitted_flux, fitted_flux_cov = propagate(lambda p: model(fitted_freq * 1e6, *p) * 1e3, iminuit_result.values, iminuit_result.covariance) else: # No convariance values so use old method fitted_flux = model(fitted_freq * 1e6, *iminuit_result.values) * 1e3 # Plot fit line if alternate_style: # Just use a simple label model_dict = model_settings() fit_info = model_dict[fit_info.split()[0]][1] if secondary_fit: ax.plot(fitted_freq, fitted_flux, 'k', marker="None", ls=(0, (0.7, 1)), lw=2, alpha=0.5, label=fit_info) else: ax.plot(fitted_freq, fitted_flux, 'k--', label=fit_info) if plot_error and iminuit_result.valid: # draw 1 sigma error band fitted_flux_prop = np.diag(fitted_flux_cov) ** 0.5 if secondary_fit: facecolor = "r" alpha = 0 else: facecolor = "C1" alpha = 0.5 ax.fill_between(fitted_freq, fitted_flux - fitted_flux_prop, fitted_flux + fitted_flux_prop, facecolor=facecolor, alpha=alpha) # Format plot and save ax.set_xscale('log') ax.set_yscale('log') ax.get_xaxis().set_major_formatter(FormatStrFormatter('%g')) ax.get_yaxis().set_major_formatter(FormatStrFormatter('%g')) ax.tick_params(which='both', direction='in', top=1, right=1) ax.set_xlabel('Frequency (MHz)') ax.set_ylabel('Flux Density (mJy)') if alternate_style: ax.legend(loc='lower left', ncol=2, fontsize=6) else: ax.legend(loc='center left', bbox_to_anchor=(1.1, 0.5)) ax.grid(visible=True, ls=':', lw=0.6) if axis is None: # Not using axis mode so save figure plt.savefig(save_name, bbox_inches='tight', dpi=300) plt.clf()
[docs]def iminuit_fit_spectral_model( freqs_MHz, fluxs_mJy, flux_errs_mJy, ref, model_name="simple_power_law", start_params=None, mod_limits=None, plot=False, plot_error=True, save_name="fit.png", alternate_style=False, axis=None, secondary_fit=False, fit_range=None ): """Fit pulsar spectra with iminuit. Parameters ---------- freqs_MHz : `list` A list of the frequencies in MHz. fluxs_mJy : `list` A list of the flux density in mJy. flux_errs_mJy : `list` A list of the uncertainty of the flux density in mJy. ref : `list` A list of the reference label (in the format 'Author_year'). model_name : `function`, optional One of the model names from :py:meth:`pulsar_spectra.models.model_settings`. Default: :py:meth:`pulsar_spectra.models.simple_power_law`. start_params : `tuple`, optional A tuple of the starting paramaters for each input to the model that iminuit will use as an initial estimate. If none provided, will use the defaults from :py:meth:`pulsar_spectra.models.model_settings`. mod_limits : `list` of `tuple`s, optional A list of tuples where each tuples is the minimum and maximum limits that will be applied to the model by iminuit. If none provided, will use the defaults from :py:meth:`pulsar_spectra.models.model_settings`. plot : `boolean`, optional If you want to plot the result of the fit. |br| Default: False. plot_error : `boolean`, optional If you want to include the fit error in the plot. |br| Default: True. save_name : `str`, optional The name of the saved plot. |br| Default: "fit.png". alternate_style : `boolean`, optional If you want to use the alternate plot style. |br| Default: False. axis : `Axes`, optional The axes with which the spectrum will be plotted. |br| None. secondary_fit : `boolean`, optional Plot model with an alternate style and without markers. |br| Default: False. fit_range : `tuple`, optional Frequency range to plot the second model over. |br| Default: None. Returns ------- aic : `float` The Akaike information criterion of the fit. m : `iminuit.Minuit` The Minuit class after being fit in :py:meth:`pulsar_spectra.spectral_fit.iminuit_fit_spectral_model`. fit_info : `str` The string to label the fit with from :py:meth:`pulsar_spectra.spectral_fit.iminuit_fit_spectral_model`. """ # Covert to SI (Hz and Jy) v0_Hz = 10**((np.log10(min(freqs_MHz))+np.log10(max(freqs_MHz)))/2) * 1e6 # reference frequency is the logarithmic centre frequency freqs_Hz = np.array(freqs_MHz, dtype=np.float128) * 1e6 fluxs_Jy = np.array(fluxs_mJy, dtype=np.float128) / 1e3 flux_errs_Jy = np.array(flux_errs_mJy, dtype=np.float128) / 1e3 # Load model settings model_dict = model_settings() model_function = model_dict[model_name][0] # Check for model dependent defaults if start_params is None: start_params = model_dict[model_name][2] if mod_limits is None: mod_limits = model_dict[model_name][3] # Add the reference frequency start_params += (v0_Hz,) mod_limits += [None] # Check if enough inputs k = len(start_params)-1 # number of free model parameters if len(freqs_MHz) <= k + 1: logger.warn(f"Only {len(freqs_MHz)} supplied for {model_name} model fit. This is not enough so skipping") return 1e9, None, None # Fit model least_squares = LeastSquares(freqs_Hz, fluxs_Jy, flux_errs_Jy, model_function) least_squares.loss = huber_loss_function m = Minuit(least_squares, *start_params) m.fixed["v0"] = True # fix the reference frequency """Find the minimum of least_squares function using the in-built minimisation algorithms in iminuit. If migrad by itself fails, then run the simplex minimiser before migrad. If simplex fails, run a grid scan over parameter space before migrad. Systematically increase the number of calls until a valid minimum is found. """ m.tol=0.000005 # low tolerace improves likelihood of a sensible fit m.limits = mod_limits # limits are primarily to assist the scan minimiser migrad_calls = 20000 # more calls, better fit ncall = 20000 # for simplex and scan m.migrad(ncall=migrad_calls) if m.valid: logger.debug(f"Found for fit with {model_name} using migrad and {migrad_calls} calls.") else: m.simplex(ncall=ncall) m.migrad(ncall=migrad_calls) if m.valid: logger.debug(f"Found for fit with {model_name} using simplex and {ncall} calls.") else: m.scan(ncall=ncall) m.migrad(ncall=migrad_calls) if m.valid: logger.debug(f"Found for fit with {model_name} using scan and {ncall} calls.") if not m.valid: logger.warning(f"No valid minimum found for model {model_name}.") m.hesse() # accurately computes uncertainties logger.debug(m) # display legend with some fit info fit_info = [model_name] for p, v, e in zip(m.parameters, m.values, m.errors): if p.startswith("v"): fit_info.append(f"{p} = ${v/1e6:8.1f} \\pm {e/1e6:8.1}$ MHz") else: fit_info.append(f"{p} = ${v:.5f} \\pm {e:.5}$") # Calculate AIC beta = robust_cost_function(model_function(freqs_Hz, *m.values), fluxs_Jy, flux_errs_Jy) aic = 2*beta + 2*k + (2*k*(k+1)) / (len(freqs_Hz) - k -1) fit_info.append(f"AIC: {aic:.1f}") fit_info = "\n".join(fit_info) if plot: plot_fit(freqs_MHz, fluxs_mJy, flux_errs_mJy, ref, model_function, m, fit_info, save_name=save_name, plot_error=plot_error, alternate_style=alternate_style, axis=axis, secondary_fit=secondary_fit, fit_range=fit_range) return aic, m, fit_info
[docs]def find_best_spectral_fit(pulsar, freqs_MHz, fluxs_mJy, flux_errs_mJy, ref_all, plot_all=False, plot_best=False, plot_compare=False, plot_error=True, alternate_style=False, axis=None, secondary_fit=False, fit_range=None): """Fit pulsar spectra with iminuit. Parameters ---------- pulsar : `str` The Jname of the pulsar to be fit. freqs_MHz : `list` A list of the frequencies in MHz. fluxs_mJy : `list` A list of the flux density in mJy. flux_errs_mJy : `list` A list of the uncertainty of the flux density in mJy. ref_all : `list` A list of the reference label (in the format 'Author_year'). plot_all : `boolean`, optional If you want to plot the result of all fits. |br| Default: False. plot_best : `boolean`, optional If you want to only plot the best fit. |br| Default: False. plot_compare : `boolean`, optional If you want to make a single plot with the result of all fits. |br| Default: False. plot_error : `boolean`, optional If you want to include the fit error in the plot. |br| Default: True. alternate_style : `boolean`, optional Plot with the alternate plot style based on Jankowski 2018. |br| Default: False. axis : `Axes`, optional The axes with which the spectrum will be plotted. |br| Default: None. secondary_fit : `boolean`, optional Plot model with an alternate style and without markers. Does not work for comparison plots. |br| Default: False. fit_range : `tuple`, optional Frequency range to plot the second model over. |br| Default: None. Returns ------- model_name : `str` The best fit model name from :py:meth:`pulsar_spectra.models`. m : `iminuit.Minuit` The Minuit class after being fit in :py:meth:`pulsar_spectra.spectral_fit.iminuit_fit_spectral_model`. fit_info : `str` The string to label the fit with from :py:meth:`pulsar_spectra.spectral_fit.iminuit_fit_spectral_model`. p_best : `float` The probability that the best-fit model is actually the best-fit model. p_category : `str` Category based on the quality of spectral fit, as defined in Jankowski et al. (2018). """ # Prepare plots and fitting frequencies if plot_compare: # Set up plots nrows = 5 plot_size = 4 fitted_freqs_MHz = np.logspace(np.log10(min(freqs_MHz)), np.log10(max(freqs_MHz)), 100) fig, axs = plt.subplots(nrows, 1, figsize=(plot_size, plot_size * nrows)) # Load model settings model_dict = model_settings() aics = [] iminuit_results = [] fit_infos = [] model_i = [] # loop over models and fit for i, model_name in enumerate(model_dict.keys()): model_function, short_name, start_params, mod_limits = model_dict[model_name] aic, iminuit_result, fit_info = iminuit_fit_spectral_model(freqs_MHz, fluxs_mJy, flux_errs_mJy, ref_all, model_name=model_name, plot=plot_all, plot_error=plot_error, save_name=f"{pulsar}_{model_name}_fit.png", alternate_style=alternate_style, axis=axis, secondary_fit=secondary_fit) logger.debug(f"{model_name} model fit gave AIC {aic}.") if iminuit_result is not None: aics.append(aic) iminuit_results.append(iminuit_result) fit_infos.append(fit_info) model_i.append(i) # Add to comparison plot if plot_compare: # plot data plot_fit(freqs_MHz, fluxs_mJy, flux_errs_mJy, ref_all, model_function, iminuit_result, fit_info, plot_error=plot_error, alternate_style=alternate_style, axis=axs[i], secondary_fit=secondary_fit, fit_range=fit_range) # Return best result if len(aics) == 0: logger.info(f"No model found for {pulsar}") #models[aici], iminuit_results[aici], fit_infos[aici], p_best, p_category return None, None, None, None, None else: aici = aics.index(min(aics)) best_model_name = list(model_dict.keys())[model_i[aici]] logger.info(f"Best model for {pulsar} is {best_model_name}") # Calc probability of best fit li = [] for i, _ in enumerate(aics): li.append(np.exp(-1/2 * np.abs(aics[i] - aics[aici]))) p_best = 1 / np.sum(li) # Work out the catagory #TODO work out what the curvature paramter is and implimented it if p_best > 0.8: p_category = 'clear' elif p_best > 0.7: p_category = 'strong' elif p_best > 0.5: p_category = 'candidate' else: p_category = 'weak' # Perform plots if plot_compare: # highlight best fit rect = plt.Rectangle( # (lower-left corner), width, height (-0.4, -0.13), 2.4, 1.2, fill=False, color="k", lw=2, zorder=1000, transform=axs[model_i[aici]].transAxes, figure=fig ) fig.patches.extend([rect]) plt.savefig(f"{pulsar}_comparison_fit.png", bbox_inches='tight', dpi=300) plt.clf() if plot_best: plot_fit(freqs_MHz, fluxs_mJy, flux_errs_mJy, ref_all, model_dict[best_model_name][0], iminuit_results[aici], fit_infos[aici], save_name=f"{pulsar}_{best_model_name}_fit.png", plot_error=plot_error, alternate_style=alternate_style, axis=axis, secondary_fit=secondary_fit, fit_range=fit_range) return best_model_name, iminuit_results[aici], fit_infos[aici], p_best, p_category
[docs]def estimate_flux_density( est_freq, model, iminuit_result, ): """Estimate a pulsar's flux density using a previous spectra fit. Parameters ---------- est_freq : `float` or `list` A single or list of frequencies to estimate flux at (in MHz). model : `function` The pulsar spectra model function from :py:meth:`pulsar_spectra.models`. m : `iminuit.Minuit` The Minuit class after being fit in :py:meth:`pulsar_spectra.spectral_fit.iminuit_fit_spectral_model`. Returns ------- fitted_flux : `float` or `list` The estimated flux density of the pulsar at the input frequencies. fitted_flux_err : `float` or `list` The estimated flux density errors of the pulsar at the input frequencies. """ # make sure est_freq is a numpy array single_value = False if isinstance(est_freq, float) or isinstance(est_freq, int): est_freq = np.array([est_freq]) single_value = True elif isinstance(est_freq, list): est_freq = np.array(est_freq) if iminuit_result.valid: fitted_flux, fitted_flux_cov = propagate(lambda p: model(est_freq * 1e6, *p) * 1e3, iminuit_result.values, iminuit_result.covariance) fitted_flux_err = np.diag(fitted_flux_cov) ** 0.5 else: # No convariance values so use old method fitted_flux = model(est_freq * 1e6, *iminuit_result.values) * 1e3 fitted_flux_err = [None]*len(fitted_flux) if single_value: return fitted_flux[0], fitted_flux_err[0] else: return fitted_flux, fitted_flux_err