import os, warnings
import astropy.io.fits as fits
from astropy.convolution import Gaussian2DKernel, convolve_fft, convolve
import matplotlib.colors as mcolors
from matplotlib.patches import Ellipse
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import numpy as np
from scipy import interpolate
from scipy.ndimage import convolve1d
import scipy.constants as sc
try:
import progressbar
except ImportError:
warnings.warn("progressbar is not present", UserWarning)
from .parameters import Params, find_parameter_file
from .utils import FWHM_to_sigma, default_cmap, Wm2_to_Tb, Jy_to_Tb, Wm2_to_Jy, add_colorbar
from .wake import plot_wake
DEFAULT_LINE_FILE = "lines.fits.gz"
[docs]
class Line:
"""
A class to handle MCFOST spectral line data.
This class reads and processes MCFOST line radiative transfer outputs,
providing methods to plot channel maps, spectra, and calculate moment maps.
Attributes:
dir (str): Directory containing MCFOST output files
lines (numpy.ndarray): The line data cube
P (Params): MCFOST parameter object
pixelscale (float): Image pixel scale in arcsec
unit (str): Unit of the line data
nx (int): Number of pixels in x direction
ny (int): Number of pixels in y direction
nv (int): Number of velocity channels
velocity (numpy.ndarray): Velocity array in km/s
freq (numpy.ndarray): Frequency array in Hz
is_casa (bool): Whether the data is in CASA format
star_positions (numpy.ndarray): Array of stellar positions
extent (list): Image extent for plotting [xmin, xmax, ymin, ymax]
Example:
>>> line = Line(dir="path/to/mcfost/data")
>>> line.plot_map(i=0, iaz=0, iTrans=0, iv=10)
"""
[docs]
def __init__(self, dir=None, line_file=None, **kwargs):
"""
Initialize a Line object.
Args:
dir (str): Path to directory containing MCFOST output
line_file (str, optional): Name of line data file
**kwargs: Additional arguments passed to _read()
"""
# Correct path if needed
dir = os.path.normpath(os.path.expanduser(dir))
self.dir = dir
# Search for parameter file
para_file = find_parameter_file(dir)
# Read parameter file
self.P = Params(para_file)
# If user specified line_file
if line_file is not None:
self._line_file = line_file
# otherwise
else:
self._line_file = DEFAULT_LINE_FILE
# Read model results
self._read(**kwargs)
def _read(self):
# Read ray-traced image
try:
hdu = fits.open(self.dir + "/" + self._line_file)
self.lines = hdu[0].data
# Read a few keywords in header
self.pixelscale = hdu[0].header['CDELT2'] * 3600.0 # arcsec
self.unit = hdu[0].header['BUNIT']
self.cx = hdu[0].header['CRPIX1']
self.cy = hdu[0].header['CRPIX2']
self.nx = hdu[0].header['NAXIS1']
self.ny = hdu[0].header['NAXIS2']
self.nv = hdu[0].header['NAXIS3']
if self.unit == "JY/PIXEL":
self.is_casa = True
self.restfreq = hdu[0].header['RESTFREQ']
self.freq = np.array([self.restfreq])
self.velocity_type = hdu[0].header['CTYPE3']
if self.velocity_type == "VELO-LSR":
self.CRPIX3 = hdu[0].header['CRPIX3']
self.CRVAL3 = hdu[0].header['CRVAL3']
self.CDELT3 = hdu[0].header['CDELT3']
# velocity in km/s
self.velocity = self.CRVAL3 + self.CDELT3 * (np.arange(1, self.nv + 1) - self.CRPIX3)
else:
raise ValueError("Velocity type is not recognised")
self.nu = self.restfreq * (1 - self.velocity * 1000 / sc.c)
try:
self.star_positions = hdu[1].data
except:
self.star_positions = []
try:
self.star_vr = hdu[2].data
except:
self.star_vr = []
try:
self.star_properties = hdu[3].data
except:
self.star_properties = []
else:
self.is_casa = False
self.cont = hdu[1].data
self.ifreq = hdu[2].data
self.freq = hdu[3].data # frequencies of the transition
self.velocity = hdu[4].data / 1000 # km/s
try:
self.star_positions = hdu[5].data
except:
self.star_positions = []
try:
self.star_vr = hdu[6].data
except:
self.star_vr = []
self.dv = self.velocity[1] - self.velocity[0]
hdu.close()
except OSError:
print('cannot open', self._line_file)
[docs]
def plot_map(
self,
i=0,
iaz=0,
iTrans=0,
v=None,
iv=None,
insert=False,
subtract_cont=False,
moment=None,
psf_FWHM=None,
bmaj=None,
bmin=None,
bpa=None,
plot_beam=None,
beam_position=(0.125, 0.125), # fraction of plot width and height
axes_unit="arcsec",
conv_method=None,
fmax=None,
fmin=None,
fpeak=None,
dynamic_range=1e3,
color_scale=None,
colorbar=True,
colorbar_size=10,
colorbar_label=True,
cmap=None,
ax=None,
no_xlabel=False,
no_ylabel=False,
no_vlabel=False,
no_xticks=False,
no_yticks=False,
vlabel_position=(0.5, 0.1), # fraction of plot width and height
vlabel_size=8, # size in points
title=None,
title_size=14,
limit=None,
limits=None,
Tb=False,
Jy=False,
mJy=False,
per_arcsec2=False,
per_beam=False,
Delta_v=None,
shift_dx=0,
shift_dy=0,
plot_stars=False,
sink_particle_size=6,
sink_particle_color="cyan",
M0_threshold=None,
iv_support=None,
v_minmax = None,
rms=0,
interpolation=None,
# from casa - contours
plot_type="imshow",
alpha=1.0,
linewidths=None,
zorder=None,
origin='lower',
levels=None,
colors=None,
colorbar_side="right"
):
"""
Plot a channel map or moment map.
Args:
i (int): Inclination index
iaz (int): Azimuth angle index
iTrans (int): Transition index
v (float, optional): Velocity in km/s (alternative to iv)
iv (int, optional): Velocity channel index
moment (int, optional): Moment to plot (0=integrated intensity, 1=velocity, 2=dispersion)
**kwargs: Additional arguments passed to plotting function
Returns:
matplotlib.image.AxesImage: The plotted image
"""
# Todo:
# - print molecular info (eg CO J=3-2)
if ax is None:
ax = plt.gca()
# -- Selecting channel corresponding to a given velocity
if v is not None:
iv = np.abs(self.velocity - v).argmin()
print("Selecting channel #", iv)
# --- Compute pixel scale and extent of image
if axes_unit.lower() == 'arcsec':
pix_scale = self.pixelscale
xlabel = r'$\Delta$ RA (")'
ylabel = r'$\Delta$ Dec (")'
xaxis_factor = -1
elif axes_unit.lower() == 'au':
pix_scale = self.pixelscale * self.P.map.distance
xlabel = 'Distance from star (au)'
ylabel = 'Distance from star (au)'
xaxis_factor = 1
elif axes_unit.lower() == 'pixels' or axes_unit.lower() == 'pixel':
pix_scale = 1
xlabel = r'$\Delta$ x (pix)'
ylabel = r'$\Delta$ y (pix)'
xaxis_factor = 1
else:
raise ValueError("Unknown unit for axes_units: " + axes_unit)
halfsize = np.asarray(self.lines.shape[-2:]) / 2 * pix_scale
extent = [-halfsize[0]*xaxis_factor-shift_dx, halfsize[0]*xaxis_factor-shift_dx, -halfsize[1]-shift_dy, halfsize[1]-shift_dy]
self.extent = extent
# -- set color map
if cmap is None:
if moment in [1,9]:
cmap = "RdBu_r"
else:
cmap = default_cmap
unit = self.unit
# -- beam or psf : psf_FWHM and bmaj and bmin are in arcsec, bpa in deg
i_convolve = False
beam = None
if psf_FWHM is not None:
# in pixels
sigma = psf_FWHM / self.pixelscale * FWHM_to_sigma
beam = Gaussian2DKernel(sigma)
i_convolve = True
bmin = psf_FWHM
bmaj = psf_FWHM
bpa = 0
if plot_beam is None:
plot_beam = True
if bmaj is not None:
sigma_x = bmin / self.pixelscale * FWHM_to_sigma # in pixels
sigma_y = bmaj / self.pixelscale * FWHM_to_sigma # in pixels
beam = Gaussian2DKernel(sigma_x, sigma_y, bpa * np.pi / 180)
i_convolve = True
if plot_beam is None:
plot_beam = True
if plot_beam is None:
plot_beam = False
# -- Selecting convolution function
if conv_method is None:
conv_method = convolve_fft
# -- Selection of image to plot
if moment is not None:
im = self.get_moment_map(
i=i,
iaz=iaz,
iTrans=iTrans,
moment=moment,
i_convolve = i_convolve,
beam=beam,
conv_method=conv_method,
subtract_cont=subtract_cont,
M0_threshold=M0_threshold,
iv_support=iv_support,
v_minmax=v_minmax)
if moment in [1,2,9]:
Tb = False
else:
# individual channel
if self.is_casa:
cube = self.lines[:, :, :]
# im = self.lines[iv+1,:,:])
else:
cube = self.lines[iaz, i, iTrans, :, :, :]
# im = self.lines[i,iaz,iTrans,iv,:,:]
# -- continuum subtraction
if subtract_cont:
cube = np.maximum(cube - self.cont[iaz, i, iTrans, np.newaxis, :, :], 0.0)
# Convolve spectrally
if Delta_v is not None:
cube = self._spectral_convolve(cube, Delta_v)
if iv is None:
raise ValueError("plot_map needs either v or iv")
im = cube[iv, :, :]
# -- Convolve image
if i_convolve:
im = conv_method(im, beam)
if plot_beam is None:
plot_beam = True
# -- Conversion to brightness temperature
if Tb:
if self.is_casa:
im = Jy_to_Tb(im, self.freq[iTrans], self.pixelscale)
else:
im = Wm2_to_Tb(im, self.freq[iTrans], self.pixelscale)
im = np.nan_to_num(im)
print("Max Tb=", np.max(im), "K")
# turning off some flags that may interfere
per_arcsec2 = False
per_beam = False
# -- Conversion to Jy
if Jy:
if not self.is_casa:
im = Wm2_to_Jy(im, self.freq[iTrans])
unit = unit.replace("W.m-2", "Jy")
# -- Conversion to mJy
if mJy:
if not self.is_casa:
im = Wm2_to_Jy(im, self.freq[iTrans]) * 1e3
unit = unit.replace("W.m-2", "mJy")
# -- Conversion to flux per arcsec2 or per beam
if per_arcsec2:
im = im / self.pixelscale**2
unit = unit.replace("/pixel", "/arcsec2")
unit = unit.replace(".pixel-1", ".arcsec-2")
if per_beam:
beam_area = bmin * bmaj * np.pi / (4.0 * np.log(2.0))
pix_area = self.pixelscale**2
im *= beam_area/pix_area
unit = unit.replace("/pixel", "/beam")
unit = unit.replace(".pixel-1", "/beam")
# -- Adding noise
if rms > 0.0:
noise = np.random.randn(im.size).reshape(im.shape)
if i_convolve:
noise = convolve_fft(noise, beam)
noise *= rms / np.std(noise)
im += noise
# --- Plot range and color map`
_color_scale = 'lin'
if fmax is None:
fmax = im.max()
if fpeak is not None:
fmax = im.max() * fpeak
if fmin is None:
fmin = im.min()
if color_scale is None:
color_scale = _color_scale
if color_scale == 'log':
if fmin <= 0.0:
fmin = fmax / dynamic_range
norm = mcolors.LogNorm(vmin=fmin, vmax=fmax, clip=True)
elif color_scale == 'lin':
norm = mcolors.Normalize(vmin=fmin, vmax=fmax, clip=True)
elif color_scale == 'sqrt':
norm = mcolors.PowerNorm(0.5, vmin=fmin, vmax=fmax, clip=True)
else:
raise ValueError("Unknown color scale: " + color_scale)
# -- Make the plot
ax.cla()
image = ax.imshow(im, norm=norm, extent=extent, origin='lower', cmap=cmap,interpolation=interpolation)
if limit is not None:
limits = [limit, -limit, -limit, limit]
if limits is not None:
ax.set_xlim(limits[0], limits[1])
ax.set_ylim(limits[2], limits[3])
if not no_xlabel:
ax.set_xlabel(xlabel)
if not no_ylabel:
ax.set_ylabel(ylabel)
if no_xticks:
ax.get_xaxis().set_visible(False)
if no_yticks:
ax.get_yaxis().set_visible(False)
if title is not None:
ax.set_title(title,size=title_size)
# start from casa - contours
if plot_type=="imshow":
image = ax.imshow(
im,
norm=norm,
extent=extent,
origin='lower',
cmap=cmap,
alpha=alpha,
interpolation=interpolation,
zorder=zorder
)
elif plot_type=="contourf":
image = ax.contourf(
im,
extent=extent,
origin='lower',
levels=levels,
cmap=cmap,
linewidths=linewidths,
alpha=alpha,
zorder=zorder
)
elif plot_type=="contour":
if colors is not None:
cmap=None
image = ax.contour(
im,
extent=extent,
origin='lower',
levels=levels,
cmap=cmap,
linewidths=linewidths,
alpha=alpha,
zorder=zorder,
colors=colors
)
# end from casa - contours
# -- Color bar
if colorbar:
cb = add_colorbar(image,side=colorbar_side)
formatted_unit = unit.replace("-1", "$^{-1}$").replace("-2", "$^{-2}$")
if colorbar_label:
if moment == 0:
if Tb:
cb.set_label("\int T$_\mathrm{b}\,\mathrm{d}v$ (K.km.s$^{-1}$)",size=colorbar_size)
else:
cb.set_label("Flux (" + formatted_unit + ".km.s$^{-1}$)",size=colorbar_size)
elif moment == 1 or moment == 9:
cb.set_label("Velocity (km.s$^{-1})$",size=colorbar_size)
elif moment == 2:
cb.set_label("Velocity dispersion (km.s$^{-1}$)",size=colorbar_size)
else:
if Tb:
cb.set_label("T$_\mathrm{b}$ (K)",size=colorbar_size)
else:
cb.set_label("Flux (" + formatted_unit + ")",size=colorbar_size)
# -- Adding velocity
if moment is None:
if not no_vlabel:
vlabx, vlaby = vlabel_position
ax.text(
vlabx, vlaby,
f"$\Delta$v={self.velocity[iv]:<4.2f}$\,$km/s",
horizontalalignment='center',
size=vlabel_size,
color="white",
transform=ax.transAxes,
)
# --- Adding beam
if plot_beam:
dx, dy = beam_position
beam = Ellipse(
ax.transLimits.inverted().transform((dx, dy)),
width=bmin,
height=bmaj,
angle=-bpa,
fill=True,
color="grey",
)
ax.add_patch(beam)
#-- Add stars
if plot_stars:
factor = pix_scale / self.pixelscale
if isinstance(plot_stars,bool):
x_stars = self.star_positions[0,iaz,i,:] * factor * (-xaxis_factor)
y_stars = self.star_positions[1,iaz,i,:] * factor
else: # int or list of int
x_stars = self.star_positions[0,iaz,i,plot_stars] * factor * (-xaxis_factor)
y_stars = self.star_positions[1,iaz,i,plot_stars] * factor
ax.scatter(x_stars-shift_dx, y_stars-shift_dy,
color=sink_particle_color,s=sink_particle_size)
#-- Saving the last plotted quantity
self.last_image = im
plt.sca(ax)
return image
[docs]
def plot_line(
self,
i=0,
iaz=0,
iTrans=0,
psf_FWHM=None,
bmaj=None,
bmin=None,
bpa=None,
plot_beam=False,
plot_cont=True,
subtract_cont=False
):
"""
Plot the line spectrum.
Args:
i (int): Inclination index
iaz (int): Azimuth angle index
iTrans (int): Transition index
plot_cont (bool): Whether to plot the continuum level
**kwargs: Additional arguments passed to plotting function
"""
if self.is_casa:
line = np.sum(self.lines[:, :, :], axis=(1, 2))
ylabel = "Flux (Jy)"
else:
if subtract_cont:
lines = np.maximum(self.lines[iaz, i, iTrans, :, :, :] - self.cont[iaz, i, iTrans, np.newaxis, :, :], 0.0)
else:
lines = self.lines[iaz, i, iTrans, :, :, :]
line = np.sum(lines, axis=(1, 2))
ylabel = "Flux (W.m$^{-2}$)"
print(line)
plt.semilogy(self.velocity, line)
if plot_cont:
if self.is_casa:
Fcont = 0.5 * (line[0] + line[-1]) # approx the continuum
else:
Fcont = np.sum(self.cont[iaz, i, iTrans, :, :])
plt.plot([self.velocity[0], self.velocity[-1]], [Fcont, Fcont])
xlabel = "v (m.s$^{-1}$)"
plt.xlabel(xlabel)
plt.ylabel(ylabel)
[docs]
def get_moment_map(self, i=0, iaz=0, iTrans=0, moment=0, i_convolve=False,
beam=None, conv_method=None,subtract_cont=False,M0_threshold=None,
iv_support=None, v_minmax = None):
"""
Calculate a moment map from the line cube.
Args:
moment (int): Moment to compute:
- 0: Integrated intensity
- 1: Intensity-weighted velocity
- 2: Velocity dispersion
- 8: Peak intensity
- 9: Velocity at peak intensity
iTrans (int): Transition index
iv_support (array-like, optional): Velocity channels to include
**kwargs: Additional arguments for convolution
Returns:
numpy.ndarray: The computed moment map
"""
if self.is_casa:
cube = np.copy(self.lines[:, :, :])
else:
cube = np.copy(self.lines[iaz, i, iTrans, :, :, :])
if subtract_cont:
cube = np.maximum(cube - self.cont[iaz, i, iTrans, np.newaxis, :, :], 0.0)
dv = self.velocity[1] - self.velocity[0]
v = self.velocity
if v_minmax is not None:
vmin = np.min(v_minmax)
vmax = np.max(v_minmax)
iv_support = np.array(np.where(np.logical_and((self.velocity > vmin),(self.velocity < vmax)))).ravel()
print("Selecting channels:", iv_support)
if iv_support is None:
nv = self.nv
else:
nv = len(iv_support)
v = v[iv_support]
cube = cube[iv_support,:,:]
# Moment 0, 1 and 2
if moment == 0: # We do not need to convolve the whole cube
M0 = np.sum(cube, axis=0) * dv
if i_convolve:
M0 = conv_method(M0, beam)
else: # We need to convolve each channel indidually
if i_convolve:
print("Convolving individual channel maps, this may take a bit of time ....")
try:
bar = progressbar.ProgressBar(
maxval=self.nv,
widgets=[
progressbar.Bar('=', '[', ']'),
' ',
progressbar.Percentage(),
]
)
bar.start()
except:
pass
for iv in range(nv):
try:
bar.update(iv + 1)
except:
pass
channel = np.copy(cube[iv, :, :])
cube[iv, :, :] = conv_method(channel, beam)
try:
bar.finish()
except:
pass
if moment <=2:
M0 = np.sum(cube, axis=0) * dv
if moment >= 1 and moment <=2:
M1 = np.sum(cube[:, :, :] * v[:, np.newaxis, np.newaxis], axis=0) * dv / M0
if moment == 2:
M2 = np.sqrt(np.sum(cube[:, :, :]
* (v[:, np.newaxis, np.newaxis] - M1[np.newaxis, :, :])**2,
axis=0,) * dv / M0)
# Peak flux
if moment == 8:
M8 = np.max(cube, axis=0)
# Velocity of the peak
if moment == 9:
vmax_index = np.argmax(cube, axis=0)
M9 = v[(vmax_index)]
#print(vmax_index.shape)
#
## Extract the maximum and neighboring pixels
#print("test1")
#f_max = cube[(vmax_index)]
#print(f_max.shape)
#print("test2")
#f_minus = cube[(vmax_index-1)]
#print("test3")
#f_plus = cube[(vmax_index+1)]
#
## Work out the polynomial coefficients
#print("test4")
#a0 = 13. * f_max / 12. - (f_plus + f_minus) / 24.
#print("test5")
#a1 = 0.5 * (f_plus - f_minus)
#print("test6")
#a2 = 0.5 * (f_plus + f_minus - 2*f_max)
#
## Compute the maximum of the quadratic
#x_max = idx - 0.5 * a1 / a2
#y_max = a0 - 0.25 * a1**2 / a2
#
#M9 = xmax
if moment == 0:
M = M0
elif moment == 1:
M = M1
elif moment == 2:
M = M2
elif moment == 8:
M = M8
elif moment == 9:
M = M9
if M0_threshold is not None:
M = np.ma.masked_where(M0 < M0_threshold, M)
return M
def _spectral_convolve(self, cube, Delta_v):
print("Spectral convolution at ", Delta_v, "km/s ->",Delta_v / self.dv, "channels")
# Creating a Hanning function with k points per mcfost delta_v
# could be optmised to have k points accrod the hanning function, but this is fast
k = 100
width = 2 * Delta_v / self.dv # in channels
nchan = int(width)+1
if nchan == 1:
return cube
if nchan%2 == 0:
nchan=nchan+1
# number of point for the high res pectral response
n = int(width*k)+1
if n%2 != 0:
n=n+1
w = np.hanning(n)
w = w/np.sum(w)
# rebinning the spectral response to the mcfost resolution (and padding with zeros as required)
n_pad = (nchan * k - n) // 2
if n_pad > 0:
pad = np.zeros(n_pad) * 1.0
w = np.concatenate((pad,w,pad))
w = w.reshape(nchan,k).sum(-1)
cube = convolve1d(cube, w, mode='constant',axis=0, cval=0.)
return cube
[docs]
def plot_wake(self,i=0,i_planet=1,HonR=0.1,q=0.25, ax=None, rmax=None, rmin=None, **kwargs):
inclinations = self.P.calc_inclinations()
print("---- INCLINATION", inclinations[i])
# plot_wake uses the dynamite convention
if (np.abs(self.P.map.RT_imax<90)):
inc = -inclinations[i]
else:
inc= 180 -inclinations[i]
#inc = -inclinations[i]
xy = self.star_positions[:,0,0,i_planet]
plot_wake(xy,inc,-self.P.map.PA,HonR,q, ax=ax, rmin=rmin, rmax=rmax, **kwargs)