Source code for pyfstat.utils.sft

import logging
import os
from typing import Dict, Optional, Tuple

import lal
import lalpulsar
import matplotlib
import matplotlib.pyplot as plt
import numpy as np

logger = logging.getLogger(__name__)


[docs] def get_sft_as_arrays( sftfilepattern: str, fMin: Optional[float] = None, fMax: Optional[float] = None, constraints: Optional[lalpulsar.SFTConstraints] = None, ) -> Tuple[np.ndarray, Dict, Dict]: """ Read binary SFT files into NumPy arrays. Parameters ---------- sftfilepattern: Pattern to match SFTs using wildcards (`*?`) and ranges [0-9]; multiple patterns can be given separated by colons. fMin, fMax: Restrict frequency range to `[fMin, fMax]`. If None, retrieve the full frequency range. constraints: Constrains to be fed into XLALSFTdataFind to specify detector, GPS time range or timestamps to be retrieved. Returns ---------- freqs: np.ndarray The frequency bins in each SFT. These will be the same for each SFT, so only a single 1D array is returned. times: Dict The SFT start times as a dictionary of 1D arrays, one for each detector. Keys correspond to the official detector names as returned by lalpulsar.ListIFOsInCatalog. data: Dict A dictionary of 2D arrays of the complex Fourier amplitudes of the SFT data for each detector in each frequency bin at each timestamp. Keys correspond to the official detector names as returned by lalpulsar.ListIFOsInCatalog. """ constraints = constraints or lalpulsar.SFTConstraints() if fMin is None and fMax is None: fMin = fMax = -1 elif fMin is None or fMax is None: raise ValueError("Need either none or both of fMin, fMax.") sft_catalog = lalpulsar.SFTdataFind(sftfilepattern, constraints) ifo_labels = lalpulsar.ListIFOsInCatalog(sft_catalog) logger.info( f"Loading {sft_catalog.length} SFTs from {', '.join(ifo_labels.data)}..." ) multi_sfts = lalpulsar.LoadMultiSFTs(sft_catalog, fMin, fMax) logger.debug("done!") times = {} amplitudes = {} old_frequencies = None for ind, ifo in enumerate(ifo_labels.data): sfts = multi_sfts.data[ind] times[ifo] = np.array([sft.epoch.gpsSeconds for sft in sfts.data]) amplitudes[ifo] = np.array([sft.data.data for sft in sfts.data]).T nbins, nsfts = amplitudes[ifo].shape logger.debug(f"{nsfts} retrieved from {ifo}.") f0 = sfts.data[0].f0 df = sfts.data[0].deltaF frequencies = np.linspace(f0, f0 + (nbins - 1) * df, nbins) if (old_frequencies is not None) and not np.allclose( frequencies, old_frequencies ): raise ValueError( f"Frequencies don't match between {ifo_labels.data[ind - 1]} and {ifo}" ) old_frequencies = frequencies return frequencies, times, amplitudes
[docs] def get_commandline_from_SFTDescriptor(descriptor): """Extract a commandline from the 'comment' entry of a SFT descriptor. Most LALSuite SFT creation tools save their commandline into that entry, so we can extract it and reuse it to reproduce that data. Since lalapps 9.0.0 / lalpulsar 5.0.0 the relevant executables have been moved to lalpulsar, but we allow for lalapps backwards compatibility here, Parameters ---------- descriptor: SFTDescriptor Element of a `lalpulsar.SFTCatalog` structure. Returns ------- cmd: str A lalapps/lalpulsar commandline string, or an empty string if no match in comment. """ comment = getattr(descriptor, "comment", None) if comment is None: return "" comment_lines = comment.split("\n") # get the first line with the right substring # (iterate until it's found) return next( (line for line in comment_lines if "lalpulsar" in line or "lalapps" in line), "" )
[docs] def get_official_sft_filename( IFO, numSFTs, Tsft, tstart, duration, label=None, window_type=None, window_param=None, ): """Wrapper to predict the canonical lalpulsar names for SFT files. Parameters ---------- IFO: str Two-char detector name, e.g. `H1`. numSFTs: int numSFTs number of SFTs in SFT-file Tsft: int time-baseline in (integer) seconds tstart: int GPS seconds of first SFT start time duration: int total time-spanned by all SFTs in seconds label: str or None optional 'Misc' entry in the SFT 'D' field window_type: str or None window function applied to SFTs window_param: float or None additional parameter for some window functions Returns ------- filename: str The canonical SFT file name for the input parameters. """ spec = lalpulsar.SFTFilenameSpec() lalpulsar.FillSFTFilenameSpecStrings( spec=spec, path=None, extn=None, detector=IFO, window_type=window_type, privMisc=label, pubObsKind=None, pubChannel=None, ) spec.window_param = window_param or 0 spec.numSFTs = numSFTs spec.SFTtimebase = Tsft spec.gpsStart = tstart # possible gotcha: duration may be different if nanoseconds of sft-epochs are non-zero # (see SFTfileIO.c in lalpulsar) spec.SFTspan = duration return lalpulsar.BuildSFTFilenameFromSpec(spec)
[docs] def get_sft_constraints_from_tstart_duration( tstart: Optional[int] = None, duration: Optional[int] = None, timestamps: Optional[dict] = None, ) -> lalpulsar.SFTConstraints: """ Use start and duration to set up a lalpulsar.SFTConstraints object. Parameters ---------- tstart: GPS seconds of first SFT start time duration: Total time-spanned by all SFTs in seconds. timestamps: The SFT start times as a dictionary of 1D arrays, one for each detector. Keys correspond to the official detector names as returned by lalpulsar.ListIFOsInCatalog Returns ------- SFTConstraint: lalpulsar.SFTConstraints Constraints to be fed into XLALSFTdataFind to specify detector, GPS time range or timestamps to be retrieved. """ SFTConstraint = lalpulsar.SFTConstraints() if tstart is None: SFTConstraint.minStartTime = None SFTConstraint.maxStartTime = None elif duration is None: SFTConstraint.maxStartTime = None else: SFTConstraint.minStartTime = lal.LIGOTimeGPS(tstart) SFTConstraint.maxStartTime = SFTConstraint.minStartTime + duration SFTConstraint.timestamps = None # FIXME: not currently supported if timestamps is not None: # pragma: no cover raise NotImplementedError("Timestamps not yet supported in this function.") logger.info( "SFT Constraints: [minStartTime:{}, maxStartTime:{}]".format( SFTConstraint.minStartTime, SFTConstraint.maxStartTime, ) ) return SFTConstraint
[docs] def plot_spectrogram( sftfilepattern: str, detector: str, savefig: Optional[bool] = False, outdir: Optional[str] = ".", label: Optional[str] = None, quantity: Optional[str] = "power", sqrtSX: Optional[float] = None, fMin: Optional[float] = None, fMax: Optional[float] = None, constraints: Optional[lalpulsar.SFTConstraints] = None, figsize: Optional[Tuple] = (16, 9), **kwargs, ) -> matplotlib.axes.Axes: """ Compute spectrograms of a set of SFTs. In case the signal contains gaps, these are replaced by "nans", so in the plot they appear in white. This is useful to produce visualizations of the Doppler modulation of a CW signal. Parameters ---------- sftfilepattern: Pattern to match SFTs using wildcards (`*?`) and ranges [0-9]; multiple patterns can be given separated by colons. detector: Name of the detector of the data that will be plot. savefig: If True, save the figure in `outdir`. If False, return an axis object without saving to disk. outdir: Output folder. label: Output filename. quantity: Magnitude to be plotted. It can be "power" for SFT power, "normpower" for normalized power (`2*power/(Tsft*sqrtSX**2)`), "real" for the real part of the SFTs, and "imag" for the imaginary part of the SFTs. Set to "power" by default. sqrtSX: Amplitude spectral density of the data. Only needed if `quantity = "normpower". fMin, fMax: Restrict frequency range to `[fMin, fMax]`. If None, retrieve the full frequency range. constraints: Constraints to be fed into XLALSFTdataFind to specify detector, GPS time range or timestamps to be retrieved. figsize: Size of the figure, as a tuple of (horizontal,vertical) measurements (standard matplotlib convention). kwarg: dict Other kwargs, only used to be passed to `matplotlib.pcolormesh`. Returns ------- ax: matplotlib.axes.Axes The axes object containing the plot. """ logger.info("Loading SFT data") frequency, timestamps, fourier_data = get_sft_as_arrays( sftfilepattern, fMin, fMax, constraints ) if savefig: if label is None: # pragma: no cover raise ValueError("Label needed to save the figure") else: plotfile = os.path.join(outdir, label + ".png") if detector not in timestamps: # pragma: no cover raise ValueError( f"Detector {detector} not found in timestamps, available detectors are {timestamps.keys()}" ) # Compute Tsft constraints = constraints or lalpulsar.SFTConstraints() multi_sft_catalog = lalpulsar.GetMultiSFTCatalogView( lalpulsar.SFTdataFind(sftfilepattern, constraints) ) Tsft = int(round(1.0 / multi_sft_catalog.data[0].data[0].header.deltaF)) logger.info(f"Extracted Tsft={Tsft}") # Fill up gaps with Nans gap_length = np.diff(timestamps[detector]) - Tsft gap_data = [fourier_data[detector][:, 0]] gap_timestamps = [timestamps[detector][0]] gaps = False for ind, gap in enumerate(gap_length): if gap > 0: gaps = True num_nans = gap // Tsft remainder = gap % Tsft for i in range(num_nans): gap_data.append( np.full_like(fourier_data[detector][:, ind], np.nan + 1j * np.nan) ) gap_timestamps.append(timestamps[detector][ind] + (i + 1) * Tsft) if remainder > 0: gap_data.append( np.full_like(fourier_data[detector][:, ind], np.nan + 1j * np.nan) ) gap_timestamps.append( timestamps[detector][ind] + (num_nans * Tsft) + remainder ) gap_data.append(fourier_data[detector][:, ind + 1]) gap_timestamps.append(timestamps[detector][ind + 1]) if gaps is True: timestamps = {detector: np.hstack(gap_timestamps)} fourier_data = {detector: np.vstack(gap_data).T} # Initialize plot plt.rcParams["axes.grid"] = False # turn off the gridlines fig, ax = plt.subplots(figsize=figsize) ax.set(xlabel="Time [days]", ylabel="Frequency [Hz]") time_in_days = (timestamps[detector] - timestamps[detector][0]) / 86400 if "power" in quantity: logger.info("Computing SFT power") q = fourier_data[detector].real ** 2 + fourier_data[detector].imag ** 2 if quantity == "normpower": if sqrtSX is None: # pragma: no cover raise ValueError( "Value of sqrtSX needed to compute the normalized power." ) q *= 2 / (Tsft * sqrtSX**2) label = "Normalized power" else: label = "Power" elif quantity == "real": q = fourier_data[detector].real ax.set_title("SFT real part") label = "Fourier amplitude" elif quantity == "imag": q = fourier_data[detector].imag ax.set_title("SFT imaginary part") label = "Fourier amplitude" else: # pragma: no cover raise ValueError( f"Quantity '{quantity}' not accepted. Please, introduce a supported quantity." ) if savefig: logger.info(f"Plotting to file: {plotfile}") else: # pragma: no cover logger.info("Plotting, will return axes object") c = ax.pcolormesh( time_in_days, frequency, q, cmap=kwargs["cmap"] if "cmap" in kwargs else "inferno_r", shading="nearest", ) fig.colorbar(c, label=label) plt.tight_layout() if savefig: fig.savefig(plotfile) return ax