import mhealpy as hp
from mhealpy.plot.util import get_fig_ax, healpy_coord_to_astropy
import numpy as np
from numpy import array, log2, sqrt
import matplotlib.pyplot as plt
from collections import deque
from astropy.coordinates import (UnitSphericalRepresentation, SkyCoord,
BaseRepresentation, CartesianRepresentation,
BaseCoordinateFrame)
import astropy.units as u
from astropy.units import cds
# I couldn't find a public method to do this
from astropy.coordinates.sky_coordinate_parsers import _get_frame_class
[docs]class HealpixBase:
"""
Basic operations related to HEALPix pixelization, for which the map
contents information is not needed. This class is conceptually very similar
the the Healpix_Base class of Healpix_cxx.
Single resolution maps are fully defined by specifying their order
(or NSIDE) and ordering scheme ("RING" or "NESTED").
Multi-resolution maps follow an explicit "NUNIQ" scheme, with each pixel
identfied by a _uniq_ number. No specific is needed nor guaranteed.
.. warning::
The initialization input is not validated by default. Consider calling
`is_mesh_valid()` after initialization, otherwise results might be
unexpected.
Args:
uniq (array): Explicit numbering of each pixel in an "NUNIQ" scheme.
Order (int): Order of HEALPix map.
nside (int): Alternatively, you can specify the NSIDE parameter.
npix (int): Alternatively, you can specify the total number of pixels.
scheme (str): Healpix scheme. Either 'RING', 'NESTED' or 'NUNIQ'
coordsys (BaseFrameRepresentation or str): Instrinsic coordinates of the map.
Either ‘G’ (Galactic), ‘E’ (Ecliptic) , ‘C’ (Celestial = Equatorial) or any other
coordinate frame recognized by astropy.
base (HealpixBase): Alternatively, you can copy the properties of another
HealpixBase object
"""
def __init__(self,
uniq = None,
order = None,
nside = None,
npix = None,
scheme = 'ring',
coordsys = None,
base = None):
# Cache of rangeset representation and order.
# Make some calculations faster
self._pix_rangesets = None
self._pix_rangesets_argsort = None
if base is not None:
# Copy another HealpixBase
self._uniq = base._uniq
self._scheme = base._scheme
self._order = base._order
self._coordsys = base._coordsys
self._pix_rangesets = base._pix_rangesets
self._pix_rangesets_argsort = base._pix_rangesets_argsort
elif uniq is not None:
# MOC map
self._uniq = array(uniq, dtype = int)
# Scheme and order are implicit
self._scheme = "NUNIQ"
self._order = hp.nside2order(hp.uniq2nside(max(uniq)))
self.coordsys = coordsys
else:
# Single resolution map
self._uniq = None
# User specified nside instead of order
if order is not None:
self._order = order
elif nside is not None:
self._order = hp.nside2order(nside)
elif npix is not None:
self._order = hp.nside2order(hp.npix2nside(npix))
else:
raise ValueError("Specify nside, order or npix")
if scheme is None:
raise ValueError("Specify scheme")
self._scheme = scheme.upper()
if self._scheme not in ['RING','NESTED', 'NUNIQ']:
raise ValueError("Scheme can only be 'ring', 'nested' or 'NUNIQ'")
self.coordsys = coordsys
# Cache the ratio between the number of pixels in this maps and
# that of a map with the highest possible order
self._npix_ratio_max = hp.MAX_NSIDE * hp.MAX_NSIDE // self.nside // self.nside
def __eq__(self, other):
return self.conformable(other)
@property
def coordsys(self):
return self._coordsys
@coordsys.setter
def coordsys(self, coordsys):
if coordsys is None:
self._coordsys = None
elif isinstance(coordsys, BaseCoordinateFrame):
self._coordsys = coordsys
else:
self._coordsys = _get_frame_class(healpy_coord_to_astropy(coordsys))()
[docs] @classmethod
def adaptive_moc_mesh(cls, max_nside, split_fun, coordsys = None):
"""
Return a MOC mesh with an adaptive resolution
determined by an arbitrary function.
Args:
max_nside (int): Maximum HEALPix nside to consider
split_fun (function): This method should return ``True`` if a pixel
should be split into pixel of a higher order, and ``False`` otherwise.
It takes two integers, ``start`` (inclusive) and ``stop`` (exclusive),
which correspond to a single pixel in nested rangeset format for a
map of nside ``max_nside``.
coordsys (BaseFrameRepresentation or str): Assigns a coordinate system to the map
Return:
HealpixBase
"""
map_uniq = deque()
max_order = hp.nside2order(max_nside)
order_list = array(range(max_order+1))
nside = 2**order_list
npix_ratio = 4 ** (max_order - order_list)
uniq_shift = 4 * nside * nside
npix_ratio = 4 ** (max_order - order_list)
start_buffer = np.zeros((max_order+1, 4), dtype=int)
cursor = -np.ones((max_order+1), dtype=int)
for base_pix in range(12):
order = 0
start_buffer[order][0] = base_pix * npix_ratio[order]
cursor[order] = 0
while order >= 0:
if cursor[order] < 0:
order -= 1
continue
start = start_buffer[order][cursor[order]]
stop = start + npix_ratio[order]
cursor[order] -= 1
if order < max_order and split_fun(start, stop):
# Split
order += 1
split_shift = array(range(4))*npix_ratio[order]
start_buffer[order] = start + split_shift
cursor[order] = 3
else:
# Add to map
uniq = (start / npix_ratio[order] + uniq_shift[order]).astype(int)
map_uniq.append(uniq)
return cls(map_uniq, coordsys = coordsys)
[docs] @classmethod
def moc_from_pixels(cls, nside, pixels, nest = False, coordsys = None):
"""
Return a MOC mesh where a list of pixels are kept at a
given nside, and every other pixel is appropiately downsampled.
Also see the more generic ``adaptive_moc()`` and ``adaptive_moc_mesh()``.
Args:
nside (int): Maximum healpix NSIDE (that is, the NSIDE for the pixel
list)
pixels (array): Pixels that must be kept at the finest pixelation
nest (bool): Whether the pixels are a 'NESTED' or 'RING' scheme
coordsys (BaseFrameRepresentation or str): Assigns a coordinate system to the map
"""
# Always work in nested
if not nest:
pixels = array([hp.ring2nest(nside, pix) for pix in pixels])
# Auxiliary function so we can reuse adaptive_moc_mesh()
pixels.sort()
def pix_list_range_intersect(start, stop):
start_index,stop_index = np.searchsorted(pixels, [start,stop])
return start_index < stop_index
# Get the grid
return cls.adaptive_moc_mesh(nside,
pix_list_range_intersect,
coordsys = coordsys)
@property
def npix(self):
"""
Get number of pixels.
For multi-resolutions maps, this corresponds to the number of utilized
UNIQ pixels.
Return:
int
"""
if self.is_moc:
return len(self._uniq)
else:
return int(12 * 4**self._order)
@property
def order(self):
"""
Get map order
Return:
int
"""
return self._order
@property
def nside(self):
"""
Get map NSIDE
Return:
int
"""
return int(2**self.order)
@property
def scheme(self):
"""
Return HEALPix scheme
Return:
str: Either 'NESTED', 'RING' or 'NUNIQ'
"""
return self._scheme
@property
def is_nested(self):
"""
Return true if scheme is NESTED or NUNIQ
Return
bool
"""
return self._scheme == "NESTED" or self._scheme == 'NUNIQ'
@property
def is_ring(self):
"""
Return true if scheme is RING
Return
bool
"""
return self._scheme == "RING"
@property
def is_moc(self):
"""
Return true if this is a Multi-Dimensional Coverage (MOC) map
(multi-resolution)
Return:
bool
"""
return self._uniq is not None
[docs] def pix_rangesets(self, nside = None, argsort = False):
"""
Get the equivalent range of `child pixels` in nested scheme for a map
of equal or higher nside
Args:
nside (int or None): Nside of output range sets. If None, the map
nside will be used. nside = mhealpy.MAX_NSIDE returns the
cached result
argsort (bool): Also return also the indices that would sort the array.
Return:
recarray: With columns named 'start' (inclusive) and
'stop' (exclusive)
"""
if nside is None:
nside = self.nside
# Ranges
if self._pix_rangesets is None:
# No cache, compute from scratch
start,stop = self.pix2range(hp.MAX_NSIDE, range(self.npix))
self._pix_rangesets = np.rec.fromarrays([start,stop], names = ['start', 'stop'])
# Use cache
if nside == hp.MAX_NSIDE:
rs = self._pix_rangesets
else:
npix_ratio = hp.MAX_NSIDE * hp.MAX_NSIDE // nside // nside
rs = np.rec.fromarrays([self._pix_rangesets.start // npix_ratio,
self._pix_rangesets.stop // npix_ratio],
names = ['start', 'stop'])
# Argsort
if argsort:
if self._pix_rangesets_argsort is None:
# No cache, compute from scratch
self._pix_rangesets_argsort = np.argsort(rs.start)
# Use cache
rs_argsort = self._pix_rangesets_argsort
return rs, rs_argsort
else:
return rs
[docs] def pix_order_list(self):
"""
Get a list of lists containing all pixels sorted by order
Return:
(list, list): (``pix_per_order``, ``nest_pix_per_order``)
Each list has a size equal to the map order.
Each element is a list of all pixels whose order
matches the index of the list position.
The first output contains the index of the pixels, while the
second contains their coresponding pixel number in a nested scheme.
"""
pix_per_order = [[] for _ in range(self.order+1)]
nest_pix_per_order = [[] for _ in range(self.order+1)]
for pix in range(self.npix):
nside, nest_pix = hp.uniq2nest(self.pix2uniq(pix))
order = hp.nside2order(nside)
pix_per_order[order].append(pix)
nest_pix_per_order[order].append(nest_pix)
return pix_per_order,nest_pix_per_order
[docs] def pix2range(self, nside, pix):
"""
Get the equivalent range of `child pixels` in nested scheme for a map
of equal or higher nside
Args:
nside (int): Nside of output range sets
pix (int or array): Pixel numbers
Return:
(int or array, int or array): Start pixel (inclusive) and
stop pixel (exclusive)
"""
if self.is_moc:
return hp.uniq2range(nside, self._uniq[pix])
else:
if self.is_ring:
pix = hp.ring2nest(self.nside, pix)
return hp.nest2range(self.nside, pix, nside)
[docs] def pixarea(self, pix = None):
"""
Return area of a pixel
Args:
pix (int or array): Pixel number. Only relevant for MOC maps.
Default: All pixels for MOC, a single value for single
resolution
Return:
Quantity
"""
if pix is None:
if self.is_moc:
pix = np.arange(self.npix)
else:
pix = 0
if self.is_moc:
ranges = self.pix_rangesets(hp.MAX_NSIDE)[pix]
area = 3.6331963520923245e-18 * (ranges.stop - ranges.start)
else:
area = 1.047197551196597746 / self.nside / self.nside
area = np.broadcast_to(area, np.shape(pix))
return area * u.sr
[docs] def pix2ang(self, pix, lonlat = False):
"""
Return the coordinates of the center of a pixel
Args:
pix (int or array)
Return:
(float or array, float or array)
"""
if self.is_moc:
nside, pix = hp.uniq2nest(self.pix2uniq(pix))
return hp.pix2ang(nside, pix, nest = True, lonlat = lonlat)
else:
return hp.pix2ang(self.nside, pix, nest = self.is_nested, lonlat = lonlat)
[docs] def pix2vec(self, pix):
"""
Return a vector corresponding to the center of a pixel
Args:
pix (int or array)
Return:
array: Size (3,N)
"""
if self.is_moc:
nside, pix = hp.uniq2nest(self.pix2uniq(pix))
return hp.pix2vec(nside, pix, nest = True)
else:
return hp.pix2vec(self.nside, pix, nest = self.is_nested)
[docs] def pix2skycoord(self, pix):
"""
Return the sky coordinate for the center of a given pixel
Args:
pix (int or array): Pixel number
Return:
SkyCoord
"""
if self.coordsys is None:
raise ValueError("Undefined coordinate system")
lon,lat = self.pix2ang(pix, lonlat = True)
skycoord = SkyCoord(lon*u.deg, lat*u.deg, frame = self.coordsys)
return skycoord
def _standarize_theta_phi_lonlat(self, theta, phi, lonlat):
if isinstance(theta, (SkyCoord, BaseRepresentation)):
# Support astropy
if isinstance(theta, SkyCoord):
if self.coordsys is None:
raise ValueError("Undefined coordinate system")
theta = theta.transform_to(self.coordsys)
coord = theta.represent_as(UnitSphericalRepresentation)
theta,phi = coord.lon.deg, coord.lat.deg
lonlat = True
return theta,phi,lonlat
[docs] def ang2pix(self, theta, phi = None, lonlat = False):
"""
Get the pixel (as used in []) that contains a given coordinate
Args:
theta (float, array or SkyCoord): Zenith angle
phi (float or arrray): Azimuth angle
Return:
int or array
"""
theta, phi, lonlat = self._standarize_theta_phi_lonlat(theta, phi, lonlat)
pixels = hp.ang2pix(self.nside, theta, phi,
nest=self.is_nested, lonlat = lonlat)
if self.is_moc:
pixels = self.nest2pix(pixels)
return pixels
[docs] def vec2pix(self, x, y, z):
"""
Get the pixel (as used in []) that contains a given coordinate
Args:
x (float or array): x coordinate
y (float or array): y coordinate
z (float or array): z coordinate
Return:
int or array
"""
pixels = hp.vec2pix(self.nside, x, y, z, nest=self.is_nested)
if self.is_moc:
pixels = self.nest2pix(pixels)
return pixels
[docs] def pix2uniq(self, pix):
"""
Get the UNIQ representation of a given pixel index.
Args:
pix (int): Pixel number in the current scheme (as used for [])
"""
if self.is_moc:
return self._uniq[pix]
else:
if self.scheme == 'RING':
pix = hp.ring2nest(self.nside, pix)
return hp.nest2uniq(self.nside, pix)
@property
def uniq(self):
"""
Get an array with the NUNIQ numbers for all pixels
"""
if self.is_moc:
return self._uniq
else:
return self.pix2uniq(range(self.npix))
[docs] def nest2pix(self, pix):
"""
Get the corresponding pixel in the current grid for a pixel in NESTED
scheme. For MOC map, return the pixel that contains it.
Args:
pix (int or array): Pixel number in NESTED scheme. Must correspond
to a map of the same order as the current.
Return:
int or array
"""
if self.is_moc:
# Work with rangesets for maximum order,
# then find pixel number for this order,
# and then find the rangesets that contain these pixels
pix = np.array(pix)
rangesets, rs_argsort = self.pix_rangesets(hp.MAX_NSIDE, argsort = True)
ipix = np.searchsorted(rangesets.stop, pix * self._npix_ratio_max,
side = 'right',
sorter = rs_argsort)
opix = rs_argsort[ipix]
# Follow healpy convention for null pix
if np.ndim(opix) == 0:
if pix == -1:
opix = -1
else:
opix[pix == -1] = -1
return opix
else:
if self.is_nested:
return pix
else:
return hp.nest2ring(self.nside, pix)
[docs] def get_interp_weights(self, theta, phi = None, lonlat = False):
"""
Return the 4 closest pixels on the two rings above and below the
location and corresponding weights. Weights are provided for bilinear
interpolation along latitude and longitude
Args:
theta (float or array): Zenith angle (rad)
phi (float or array): Azimuth angle (rad)
Return:
tuple: (pixels, weights), each with of (4,) if the input is scalar,
if (4,N) where N is size of
theta and phi. For MOC maps, these pixel numbers might repeate.
"""
theta, phi, lonlat = self._standarize_theta_phi_lonlat(theta, phi, lonlat)
pixels,weights = hp.get_interp_weights(self.nside, theta, phi,
nest = self.is_nested,
lonlat = lonlat)
if self.is_moc:
pixels = self.nest2pix(pixels)
return (pixels, weights)
[docs] def get_all_neighbours(self, theta, phi = None, lonlat = False):
"""
Return the 8 nearest pixels. For MOC maps, these might repeat, as this
is equivalent to raterizing the maps to the highest order, getting the
neighbohrs, and then finding the pixels tha contain them.
Args:
theta (float or int or array): Zenith angle (rad). If phi is
``None``, these are assummed to be pixels numbers.
phi (float or array or None): Azimuth angle (rad)
Return:
array: pixel number of the SW, W, NW, N, NE, E, SE and S neighbours,
shape is (8,) if input is scalar, otherwise shape is (8, N) if
input is of length N. If a neighbor does not exist (it can be
the case for W, N, E and S) the corresponding pixel number will
be -1.
"""
theta, phi, lonlat = self._standarize_theta_phi_lonlat(theta, phi, lonlat)
if self.is_moc and phi is None:
theta,phi = self.pix2ang(theta, lonlat = lonlat)
neighbors = hp.get_all_neighbours(self.nside, theta, phi,
nest = self.is_nested,
lonlat = lonlat)
if self.is_moc:
neighbors = self.nest2pix(neighbors)
return neighbors
[docs] def is_mesh_valid(self):
"""
Return ``True`` if the map pixelization is valid. For
single resolution this simply checks that the size is a valid NSIDE value.
For MOC maps, it checks that every point in the sphere is covered by
one and only one pixel.
Return:
True
"""
if self.is_moc:
# Work in rangesets, and check that there is no gap in between them
rs, rs_argsort = self.pix_rangesets(hp.MAX_NSIDE, argsort = True)
rs = rs[rs_argsort]
return (rs.start[0] == 0 and
rs.stop[-1] == hp.nside2npix(hp.MAX_NSIDE) and
np.array_equal(rs.start[1:], rs.stop[:-1]))
else:
return hp.isnpixok(self.npix)
def _pix_query_fun(self, fun):
"""
Return a wrapper for healpy's pix querying routines
Args:
fun (function): Healpy's query_* functions
Return:
function: With apprpiate grid, passes rest of arguments to fun
"""
def wrapper(*args, **kwargs):
if self.is_moc:
# We'll do it order by order
pix_per_order, nest_pix_per_order = self.pix_order_list()
query_pix = np.zeros(0, dtype = int)
for order in range(self.order+1):
pixels = fun(hp.order2nside(order), nest = True,
*args, **kwargs)
query_bool = np.isin(nest_pix_per_order[order], pixels)
order_pix = array(pix_per_order[order], dtype=int)
query_pix = np.append(query_pix,
order_pix[query_bool])
return np.sort(query_pix)
else:
return fun(self.nside, nest = self.is_nested,
*args, **kwargs)
return wrapper
[docs] def query_polygon(self, vertices, inclusive=False, fact=4):
"""
Returns the pixels whose centers lie within the convex polygon defined
by the vertices array (if inclusive is False), or which overlap with
this polygon (if inclusive is True).
Args:
vertices (float): Vertex array containing the vertices of the
polygon, shape (N, 3).
inclusive (bool): f False, return the exact set of pixels whose
pixels centers lie within the region; if True, return all
pixels that overlap with the region.
fact (int): Only used when inclusive=True. The overlapping test
will be done at the resolution fact*nside. For NESTED ordering,
fact must be a power of 2, less than 2**30, else it can be any
positive integer. Default: 4.
Return:
int array: The pixels which lie within the given polygon.
"""
fun = self._pix_query_fun(hp.query_polygon)
return fun(vertices, inclusive=inclusive, fact=fact)
[docs] def query_disc(self, vec, radius, inclusive=False, fact=4):
"""
Args:
vec (float, sequence of 3 elements, SkyCoord): The coordinates of unit vector
defining the disk center.
radius (float): The radius (in radians) of the disk
inclusive (bool): f False, return the exact set of pixels whose
pixels centers lie within the region; if True, return all
pixels that overlap with the region.
fact (int): Only used when inclusive=True. The overlapping test
will be done at the resolution fact*nside. For NESTED ordering,
fact must be a power of 2, less than 2**30, else it can be any
positive integer. Default: 4.
Return:
int array: The pixels which lie within the given disc.
"""
if isinstance(vec, (SkyCoord, BaseRepresentation)):
vec = vec.represent_as(CartesianRepresentation).xyz.value
fun = self._pix_query_fun(hp.query_disc)
return fun(vec, radius, inclusive=inclusive, fact=fact)
[docs] def query_strip(self, theta1, theta2, inclusive=False):
"""
Returns pixels whose centers lie within the colatitude range defined by
theta1 and theta2 (if inclusive is False), or which overlap with this
region (if inclusive is True). If theta1<theta2, the region between
both angles is considered, otherwise the regions 0<theta<theta2 and
theta1<theta<pi.
Args:
theta (float): First colatitude (radians)
phi (float): Second colatitude (radians)
inclusive (bool): f False, return the exact set of pixels whose
pixels centers lie within the region; if True, return all
pixels that overlap with the region.
Return:
int array: The pixels which lie within the given strip.
"""
fun = self._pix_query_fun(hp.query_strip)
return fun(theta1, theta2, inclusive=inclusive)
[docs] def boundaries(self, pix, step = 1):
"""
Returns an array containing vectors to the boundary of the nominated pixel.
The returned array has shape (3, 4*step), the elements of which are the
x,y,z positions on the unit sphere of the pixel boundary. In order to
get vector positions for just the corners, specify step=1.
"""
if self.is_moc:
def single_pix_bounds(pix):
nside, nest_pix = hp.uniq2nest(self._uniq[pix])
return hp.boundaries(nside, nest_pix, step = step, nest = True)
moc_bounds = np.vectorize(single_pix_bounds)
return moc_bounds(pix)
else:
return hp.boundaries(self.nside, pix, step, nest = self.is_nested)
def _default_coordsys(self, coord):
if coord is None:
coord = self.coordsys
if coord is None:
coord = 'C'
coord = healpy_coord_to_astropy(coord)
return coord
[docs] def plot_grid(self,
ax = 'mollview',
ax_kw = {},
step = 32,
coord = None,
**kwargs):
"""
Plot the pixel boundaries of a Healpix grid
Args:
ax (WCSAxes or str): Astropy's WCSAxes to plot the map. Either
an existing instance or the name of a registered projection.
ax_kw (dict): Extra arguments if a new axes needs to be created.
step (int): How many points per pixel side
coord (str): Instrinsic coordinates of the map. Either ‘G’ (Galactic),
‘E’ (Ecliptic) , ‘C’ (Celestial = Equatorial) or any other
coordinate frame recognized by astropy. The default is 'C' unless
``coordsys`` is defined. This option overrides ``coordsys``
**kwargs: Passed to matplotlib.pyplot.plot()
Return:
matplotlib.lines.Line2D list, WCSAxes: The first return value
corresponds to the output ``pyplot.plot()`` for one of the pixels.
The second is the astropy WCSAxes object used.
"""
# Standarize frame
coord = self._default_coordsys(coord)
# Standarize axes and figure
fig,ax = get_fig_ax(ax, **ax_kw)
wcs = ax.wcs
# Every line should have the same color
if 'c' in kwargs:
color = kwargs.pop('c')
else:
color = kwargs.pop('color', 'black')
# Plot boundaries as lines
lines = []
for pix in range(self.npix):
# Get boundaries
vec = self.boundaries(pix, step = step)
# Close loop
vec = np.append(vec, vec[:,0].reshape(-1,1), axis = 1)
# Project
theta,phi = hp.vec2ang(np.transpose(vec))
x,y = wcs.world_to_pixel(SkyCoord(phi, np.pi/2 - theta,
unit = 'rad', frame = coord))
# Remove discontinuities
dx = np.abs(np.diff(x))
dy = np.abs(np.diff(y))
dist = sqrt(dx*dx + dy*dy)
if all(np.isnan(dist)):
# Supress warning
continue
jumps = dist > 10*np.nanmedian(dist)
jumps = np.append(jumps, False)
if any(jumps):
ijumps = np.nonzero(jumps)[0]+1
x = np.insert(x, ijumps, np.nan)
y = np.insert(y, ijumps, np.nan)
# Plot
lines += ax.plot(x, y, color = color, **kwargs)
return lines, ax
[docs] def moc_sort(self):
"""
Sort the uniq pixels composing a MOC map based on its
rangeset representation
"""
if not self.is_moc:
return
rs, rs_argsort = self.pix_rangesets(hp.MAX_NSIDE, argsort = True)
self._uniq = self._uniq[rs_argsort]
if self._pix_rangesets is not None:
self._pix_rangesets = self._pix_rangesets[rs_argsort]
if self._pix_rangesets_argsort is not None:
self._pix_rangesets_argsort = np.arange(self.npix)