# -*- coding: utf-8 -*-
"""
:mod:`MCEq.density_profiles` - models of the Earth's atmosphere
===============================================================
This module includes classes and functions modeling the Earth's atmosphere.
Currently, two different types models are supported:
- Linsley-type/CORSIKA-style parameterization
- Numerical atmosphere via external routine (NRLMSISE-00)
Both implementations have to inherit from the abstract class
:class:`EarthAtmosphere`, which provides the functions for other parts of
the program. In particular the function :func:`EarthAtmosphere.get_density`
Typical interaction::
$ atm_object = CorsikaAtmosphere("BK_USStd")
$ atm_object.set_theta(90)
$ print 'density at X=100', atm_object.X2rho(100.)
The class :class:`MCEqRun` will only the following routines::
- :func:`EarthAtmosphere.set_theta`,
- :func:`EarthAtmosphere.r_X2rho`.
If you are extending this module make sure to provide these
functions without breaking compatibility.
Example:
An example can be run by executing the module::
$ python MCEq/atmospheres.py
"""
from abc import ABCMeta, abstractmethod
from os.path import join
import numpy as np
import MCEq.geometry
from MCEq.misc import theta_deg, theta_rad
from numba import jit, double # @UnresolvedImport
from mceq_config import dbg, config
def _load_cache():
"""Loads atmosphere cache from file.
If file does not exist, function returns
a new empty dictionary.
Returns:
dict: Dictionary containing splines.
"""
import cPickle as pickle
if dbg > 0:
print "atmospheres::_load_cache(): loading cache."
fname = join(config['data_dir'],
config['atm_cache_file'])
try:
return pickle.load(open(fname, 'rb'))
except IOError:
print "density_profiles::_load_cache(): creating new cache.."
return {}
def _dump_cache(cache):
"""Stores atmosphere cache to file.
Args:
(dict) current cache
Raises:
IOError:
"""
import cPickle as pickle
if dbg > 0:
print "density_profiles::_dump_cache() dumping cache."
fname = join(config['data_dir'],
config['atm_cache_file'])
print fname
try:
pickle.dump(cache, open(fname, 'wb'), protocol=-1)
except IOError:
raise IOError("density_profiles::_dump_cache(): " +
'could not (re-)create cache. Wrong working directory?')
class GeneralizedTarget(object):
len_target = config['len_target'] * 1e2 # cm
env_density = config['env_density'] # g/cm3
env_name = config['env_name']
def __init__(self):
self.reset()
def reset(self):
"""Resets material list to defaults."""
self.mat_list = [[0., self.len_target,
self.env_density,
self.env_name]]
self._update_variables()
def _update_variables(self):
"""Updates internal variables. Not needed to call by user."""
self.start_bounds, self.end_bounds, \
self.densities = zip(*self.mat_list)[:-1]
self.densities = np.array(self.densities)
self.start_bounds = np.array(self.start_bounds)
self.end_bounds = np.array(self.end_bounds)
self.max_den = np.max(self.densities)
self._integrate()
def set_length(self, new_length_cm):
if new_length_cm < self.mat_list[-1][0]:
raise Exception("GeneralizedTarget::set_length(): " +
"can not set length below lower boundary of last " +
"material.")
self.len_target = new_length_cm
self.mat_list[-1][1] = new_length_cm
self._update_variables()
def add_material(self, start_position_cm, density, name):
"""Adds one additional material to a composite target.
Args:
start_position_cm (float): position where the material starts
counted from target origin l|X = 0 in cm
density (float): density of material in g/cm**3
name (str): any user defined name
Raises:
Exception: If requested start_position_cm is not properly defined.
"""
if start_position_cm < 0. or start_position_cm > self.len_target:
raise Exception("GeneralizedTarget::add_material(): " +
"distance exceeds target dimensions.")
elif start_position_cm < self.mat_list[-1][0]:
raise Exception("GeneralizedTarget::add_material(): " +
"start_position_cm is ahead of previous material.")
self.mat_list[-1][1] = start_position_cm
self.mat_list.append([start_position_cm,
self.len_target, density, name])
if dbg > 0:
("{0}::add_material(): Material '{1}' added. " +
"location on path {2} to {3} m").format(
self.__class__.__name__, name,
self.mat_list[-1][0], self.mat_list[-1][1])
self._update_variables()
def set_theta(self, *args):
"""This method is not defined for the generalized target. The purpose
is to catch usage errors.
Raises:
NotImplementedError: always
"""
raise NotImplementedError('GeneralizedTarget::set_theta(): Method'
+ 'not defined for this target class.')
def _integrate(self):
"""Walks through material list and computes the depth along the
position (path). Computes the spline for the position-depth relation
and determines the maximum depth for the material selection.
Method does not need to be called by the user, instead the class
calls it when necessary.
"""
from scipy.interpolate import UnivariateSpline
self.density_depth = None
self.knots = [0.]
self.X_int = [0.]
for start, end, density, name in self.mat_list:
self.knots.append(end)
self.X_int.append(density * (end - start) + self.X_int[-1])
self.s_X2h = UnivariateSpline(self.X_int, self.knots, k=1, s=0.)
self.s_h2X = UnivariateSpline(self.knots, self.X_int, k=1, s=0.)
self.max_X = self.X_int[-1]
def get_density_X(self, X):
"""Returns the density in g/cm**3 as a function of depth X.
Args:
X (float): depth in g/cm**2
Returns:
float: density in g/cm**3
Raises:
Exception: If requested depth exceeds target.
"""
X = np.atleast_1d(X)
# allow for some small constant extrapolation for odepack solvers
if X[-1] > self.max_X and X[-1] < self.max_X * 1.003:
X[-1] = self.max_X
if np.min(X) < 0. or np.max(X) > self.max_X:
raise Exception(("GeneralizedTarget::get_density_X(): " +
"requested depth {0:4.3f} " +
"exceeds target.").format(np.max(X)))
return self.get_density(self.s_X2h(X))
def r_X2rho(self, X):
"""Returns the inverse density :math:`\\frac{1}{\\rho}(X)`.
The spline `s_X2rho` is used, which was calculated or retrieved
from cache during the :func:`set_theta` call.
Args:
X (float): slant depth in g/cm**2
Returns:
float: :math:`1/\\rho` in cm**3/g
"""
return 1. / self.get_density_X(X)
def get_density(self, l_cm):
"""Returns the density in g/cm**3 as a function of position l in cm.
Args:
l (float): position in target in cm
Returns:
float: density in g/cm**3
Raises:
Exception: If requested position exceeds target length.
"""
l = np.atleast_1d(l_cm)
res = np.zeros_like(l)
if np.min(l) < 0 or np.max(l) > self.len_target:
raise Exception("GeneralizedTarget::get_density(): " +
"requested position exceeds target legth.")
for i, li in enumerate(l):
bi = 0
while not (li >= self.start_bounds[bi] and
li <= self.end_bounds[bi]):
bi += 1
res[i] = self.densities[bi]
return res
def draw_materials(self, axes=None):
"""Makes a plot of depth and density profile as a function
of the target length. The list of materials is printed out, too.
Args:
axes (plt.axes, optional): handle for matplotlib axes
"""
import matplotlib.pyplot as plt
if not axes:
plt.figure(figsize=(5, 2.5))
axes = plt.gca()
ymax = np.max(self.X_int) * 1.01
for nm, mat in enumerate(self.mat_list):
xstart = mat[0]
xend = mat[1]
alpha = 0.188 * mat[2] + 0.248
if alpha > 1:
alpha = 1.
elif alpha < 0.:
alpha = 0.
axes.fill_between((xstart / 1e2, xend / 1e2), (ymax, ymax),
(0., 0.), label=mat[2], facecolor='grey',
alpha=alpha)
axes.text(0.5e-2 * (xstart + xend), 0.5 * ymax, str(nm))
plt.plot([xl / 1e2 for xl in self.knots],
self.X_int, lw=1.7, color='r')
axes.set_ylim(0., ymax)
axes.set_xlabel('distance in target [m]')
axes.set_ylabel(r'depth [g/cm$^2$]')
self.print_table()
def print_table(self):
"""Prints table of materials to standard output.
"""
templ = '{0:^3} | {1:15} | {2:^9.3f} | {3:^9.3f} | {4:^8.5f}'
print '********************* List of materials *************************'
head = '{0:3} | {1:15} | {2:9} | {3:9} | {4:9}'.format(
'no', 'name', 'start [m]', 'end [m]', 'density [g/cm**3]')
print '-' * len(head)
print head
print '-' * len(head)
for nm, mat in enumerate(self.mat_list):
print templ.format(nm, mat[3], mat[0] / 1e2, mat[1] / 1e2, mat[2])
[docs]class EarthAtmosphere():
"""Abstract class containing common methods on atmosphere.
You have to inherit from this class and implement the virtual method
:func:`get_density`.
Note:
Do not instantiate this class directly.
Attributes:
thrad (float): current zenith angle :math:`\\theta` in radiants
theta_deg (float): current zenith angle :math:`\\theta` in degrees
max_X (float): Slant depth at the surface according to the geometry
defined in the :mod:`MCEq.geometry`
"""
__metaclass__ = ABCMeta
def __init__(self, *args, **kwargs):
self.geom = MCEq.geometry.EarthGeometry()
self.thrad = None
self.theta_deg = None
self.max_X = None
self.max_den = 1.240e-03
@abstractmethod
[docs] def get_density(self, h_cm):
"""Abstract method which implementation should return the density in g/cm**3.
Args:
h_cm (float): height in cm
Returns:
float: density in g/cm**3
Raises:
NotImplementedError:
"""
raise NotImplementedError("EarthAtmosphere::get_density(): " +
"Base class called.")
[docs] def calculate_density_spline(self, n_steps=2000):
"""Calculates and stores a spline of :math:`\\rho(X)`.
Args:
n_steps (int, optional): number of :math:`X` values
to use for interpolation
Raises:
Exception: if :func:`set_theta` was not called before.
"""
from scipy.integrate import quad, cumtrapz
from time import time
from scipy.interpolate import UnivariateSpline
if self.theta_deg is None:
raise Exception(('{0}::calculate_density_spline(): ' +
'zenith angle not set').format(
self.__class__.__name__))
else:
print ('{0}::calculate_density_spline(): ' +
'Calculating spline of rho(X) for zenith ' +
'{1} degrees.').format(self.__class__.__name__,
self.theta_deg)
thrad = self.thrad
path_length = self.geom.l(thrad)
vec_rho_l = np.vectorize(
lambda delta_l: self.get_density(self.geom.h(delta_l, thrad)))
dl_vec = np.linspace(0, path_length, n_steps)
now = time()
# TODO: Remove the caching functionality and all this stuff related
# to quad integration. Cumptrapz is so fast, that caching is nonsense.
# One possible thing is to substitute the integral to use some log
# features of the integrand and reduce further the number of steps
# Calculate integral for each depth point
X_int = cumtrapz(vec_rho_l(dl_vec), dl_vec)#
dl_vec = dl_vec[1:]
# X_int = np.zeros_like(dl_vec, dtype='float64')
# X_int[0] = 0.
# for i in range(1, len(dl_vec)):
# X_int[i] = X_int[i - 1] + quad(vec_rho_l,
# dl_vec[i - 1], dl_vec[i],
# epsrel=0.01)[0]
print '.. took {0:1.2f}s'.format(time() - now)
# Save depth value at h_obs
self.max_X = X_int[-1]
self.max_den = self.get_density(self.geom.h(0, thrad))
# Interpolate with bi-splines without smoothing
h_intp = [self.geom.h(dl, thrad) for dl in reversed(dl_vec[1:])]
X_intp = [X for X in reversed(X_int[1:])]
# print splrep(np.array(h_intp),
# np.log(X_intp),
# k=2, s=0.0)
self.s_h2X = UnivariateSpline(h_intp, np.log(X_intp),
k=2, s=0.0)
self.s_X2rho = UnivariateSpline(X_int, vec_rho_l(dl_vec),
k=2, s=0.0)
# print np.log(X_intp), h_intp
self.s_lX2h = UnivariateSpline(np.log(X_intp)[::-1], h_intp[::-1],
k=2, s=0.0)
# print 'Average spline error:', np.std(vec_rho_l(dl_vec) /
# self.s_X2rho(X_int))
[docs] def set_theta(self, theta_deg, force_spline_calc=False):
"""Configures geometry and initiates spline calculation for
:math:`\\rho(X)`.
If the option 'use_atm_cache' is enabled in the config, the
function will check, if a corresponding spline is available
in the cache and use it. Otherwise it will call
:func:`calculate_density_spline`, make the function
:func:`r_X2rho` available to the core code and store the spline
in the cache.
Args:
theta_deg (float): zenith angle :math:`\\theta` at detector
force_spline_calc (bool): forces (re-)calculation of the
spline for each call
"""
def calculate_and_store(key, cache):
self.thrad = theta_rad(theta_deg)
self.theta_deg = theta_deg
self.calculate_density_spline()
cache[key][theta_deg] = (self.max_X, self.s_h2X,
self.s_X2rho, self.s_lX2h)
_dump_cache(cache)
if self.theta_deg == theta_deg and not force_spline_calc:
print (self.__class__.__name__ +
'::set_theta(): Using previous' +
'density spline.')
return
elif config['use_atm_cache'] and not force_spline_calc:
from MCEq.misc import _get_closest
cache = _load_cache()
key = (self.__class__.__name__, self.location, self.season)
if cache and key in cache.keys():
try:
closest = _get_closest(theta_deg, cache[key].keys())[1]
if abs(closest - theta_deg) < 1.:
self.thrad = theta_rad(closest)
self.theta_deg = closest
self.max_X, self.s_h2X, self.s_X2rho, self.s_lX2h = cache[key][closest]
else:
calculate_and_store(key, cache)
except:
cache[key] = {}
calculate_and_store(key, cache)
else:
cache[key] = {}
calculate_and_store(key, cache)
else:
self.thrad = theta_rad(theta_deg)
self.theta_deg = theta_deg
self.calculate_density_spline()
[docs] def r_X2rho(self, X):
"""Returns the inverse density :math:`\\frac{1}{\\rho}(X)`.
The spline `s_X2rho` is used, which was calculated or retrieved
from cache during the :func:`set_theta` call.
Args:
X (float): slant depth in g/cm**2
Returns:
float: :math:`1/\\rho` in cm**3/g
"""
return 1. / self.s_X2rho(X)
[docs] def h2X(self, h):
"""Returns the depth along path as function of height above
surface.
The spline `s_X2rho` is used, which was calculated or retrieved
from cache during the :func:`set_theta` call.
Args:
h (float): vertical height above surface in cm
Returns:
float: X slant depth in g/cm**2
"""
return np.exp(self.s_h2X(h))
[docs] def X2rho(self, X):
"""Returns the density :math:`\\rho(X)`.
The spline `s_X2rho` is used, which was calculated or retrieved
from cache during the :func:`set_theta` call.
Args:
X (float): slant depth in g/cm**2
Returns:
float: :math:`\\rho` in cm**3/g
"""
return self.s_X2rho(X)
[docs] def moliere_air(self, h_cm):
"""Returns the Moliere unit of air for US standard atmosphere. """
return 9.3 / (self.get_density(h_cm) * 100.)
[docs] def nref_rel_air(self, h_cm):
"""Returns the refractive index - 1 in air (density parametrization
as in CORSIKA).
"""
return 0.000283 * self.get_density(h_cm) / self.get_density(0)
[docs] def gamma_cherenkov_air(self, h_cm):
"""Returns the Lorentz factor gamma of Cherenkov threshold in air (MeV).
"""
nrel = self.nref_rel_air(h_cm)
return (1. + nrel) / np.sqrt(2. * nrel + nrel ** 2)
[docs] def theta_cherenkov_air(self, h_cm):
"""Returns the Cherenkov angle in air (degrees).
"""
return np.arccos(1. / (1. + self.nref_rel_air(h_cm))) * 180. / np.pi
#=========================================================================
# CorsikaAtmosphere
#=========================================================================
[docs]class CorsikaAtmosphere(EarthAtmosphere):
"""Class, holding the parameters of a Linsley type parameterization
similar to the Air-Shower Monte Carlo
`CORSIKA <https://web.ikp.kit.edu/corsika/>`_.
The parameters pre-defined parameters are taken from the CORSIKA
manual. If new sets of parameters are added to :func:`init_parameters`,
the array _thickl can be calculated using :func:`calc_thickl` .
Attributes:
_atm_param (numpy.array): (5x5) Stores 5 atmospheric parameters
_aatm, _batm, _catm, _thickl, _hlay
for each of the 5 layers
Args:
location (str): see :func:`init_parameters`
season (str,optional): see :func:`init_parameters`
"""
_atm_param = None
def __init__(self, location, season=None):
self.init_parameters(location, season)
EarthAtmosphere.__init__(self)
[docs] def init_parameters(self, location, season):
"""Initializes :attr:`_atm_param`.
+--------------+-------------------+------------------------------+
| location | CORSIKA Table | Description/season |
+==============+===================+==============================+
| "USStd" | 1 | US Standard atmosphere |
+--------------+-------------------+------------------------------+
| "BK_USStd" | 31 | Bianca Keilhauer's USStd |
+--------------+-------------------+------------------------------+
| "Karlsruhe" | 18 | AT115 / Karlsruhe |
+--------------+-------------------+------------------------------+
| "SouthPole" | 26 and 28 | MSIS-90-E for Dec and June |
+--------------+-------------------+------------------------------+
|"PL_SouthPole"| 29 and 30 | P. Lipari's Jan and Aug |
+--------------+-------------------+------------------------------+
Args:
location (str): see table
season (str, optional): choice of season for supported locations
Raises:
Exception: if parameter set not available
"""
_aatm, _batm, _catm, _thickl, _hlay = None, None, None, None, None
self.max_X = None
if location == "USStd":
_aatm = np.array([-186.5562, -94.919, 0.61289, 0.0, 0.01128292])
_batm = np.array([1222.6562, 1144.9069, 1305.5948, 540.1778, 1.0])
_catm = np.array([994186.38, 878153.55, 636143.04, 772170., 1.0e9])
_thickl = np.array(
[1036.102549, 631.100309, 271.700230, 3.039494, 0.001280])
_hlay = np.array([0, 4.0e5, 1.0e6, 4.0e6, 1.0e7])
elif location == "BK_USStd":
_aatm = np.array(
[-149.801663, -57.932486, 0.63631894, 4.3545369e-4, 0.01128292])
_batm = np.array([1183.6071, 1143.0425, 1322.9748, 655.69307, 1.0])
_catm = np.array(
[954248.34, 800005.34, 629568.93, 737521.77, 1.0e9])
_thickl = np.array(
[1033.804941, 418.557770, 216.981635, 4.344861, 0.001280])
_hlay = np.array([0.0, 7.0e5, 1.14e6, 3.7e6, 1.0e7])
elif location == "Karlsruhe":
_aatm = np.array(
[-118.1277, -154.258, 0.4191499, 5.4094056e-4, 0.01128292])
_batm = np.array(
[1173.9861, 1205.7625, 1386.7807, 555.8935, 1.0])
_catm = np.array(
[919546., 963267.92, 614315., 739059.6, 1.0e9])
_thickl = np.array(
[1055.858707, 641.755364, 272.720974, 2.480633, 0.001280])
_hlay = np.array([0.0, 4.0e5, 1.0e6, 4.0e6, 1.0e7])
elif location == 'SouthPole':
if season == 'December':
_aatm = np.array(
[-128.601, -39.5548, 1.13088, -0.00264960, 0.00192534])
_batm = np.array([1139.99, 1073.82, 1052.96, 492.503, 1.0])
_catm = np.array(
[861913., 744955., 675928., 829627., 5.8587010e9])
_thickl = np.array(
[1011.398804, 588.128367, 240.955360, 3.964546, 0.000218])
_hlay = np.array([0.0, 4.0e5, 1.0e6, 4.0e6, 1.0e7])
elif season == "June":
_aatm = np.array(
[-163.331, -65.3713, 0.402903, -0.000479198, 0.00188667])
_batm = np.array([1183.70, 1108.06, 1424.02, 207.595, 1.0])
_catm = np.array(
[875221., 753213., 545846., 793043., 5.9787908e9])
_thickl = np.array(
[1020.370363, 586.143464, 228.374393, 1.338258, 0.000214])
_hlay = np.array([0.0, 4.0e5, 1.0e6, 4.0e6, 1.0e7])
else:
raise Exception('CorsikaAtmosphere(): Season "' + season +
'" not parameterized for location SouthPole.')
elif location == 'PL_SouthPole':
if season == 'January':
_aatm = np.array(
[-113.139, -7930635, -54.3888, -0.0, 0.00421033])
_batm = np.array([1133.10, 1101.20, 1085.00, 1098.00, 1.0])
_catm = np.array(
[861730., 826340., 790950., 682800., 2.6798156e9])
_thickl = np.array(
[1019.966898, 718.071682, 498.659703, 340.222344, 0.000478])
_hlay = np.array([0.0, 2.67e5, 5.33e5, 8.0e5, 1.0e7])
elif season == "August":
_aatm = np.array(
[-59.0293, -21.5794, -7.14839, 0.0, 0.000190175])
_batm = np.array([1079.0, 1071.90, 1182.0, 1647.1, 1.0])
_catm = np.array(
[764170., 699910., 635650., 551010., 59.329575e9])
_thickl = np.array(
[1019.946057, 391.739652, 138.023515, 43.687992, 0.000022])
_hlay = np.array([0.0, 6.67e5, 13.33e5, 2.0e6, 1.0e7])
else:
raise Exception('CorsikaAtmosphere(): Season "' + season +
'" not parameterized for location SouthPole.')
else:
raise Exception("CorsikaAtmosphere:init_parameters(): Location " +
str(location) + " not parameterized.")
self._atm_param = np.array([_aatm, _batm, _catm, _thickl, _hlay])
self.location, self.season = location, season
# Clear cached theta value to force spline recalculation
self.theta_deg = None
[docs] def depth2height(self, x_v):
"""Converts column/vertical depth to height.
Args:
x_v (float): column depth :math:`X_v` in g/cm**2
Returns:
float: height in cm
"""
_aatm, _batm, _catm, _thickl, _hlay = self._atm_param
if x_v >= _thickl[1]:
height = _catm[0] * np.log(_batm[0] / (x_v - _aatm[0]))
elif x_v >= _thickl[2]:
height = _catm[1] * np.log(_batm[1] / (x_v - _aatm[1]))
elif x_v >= _thickl[3]:
height = _catm[2] * np.log(_batm[2] / (x_v - _aatm[2]))
elif x_v >= _thickl[4]:
height = _catm[3] * np.log(_batm[3] / (x_v - _aatm[3]))
else:
height = (_aatm[4] - x_v) * _catm[4]
return height
[docs] def get_density(self, h_cm):
""" Returns the density of air in g/cm**3.
Uses the optimized module function :func:`corsika_get_density_jit`.
Args:
h_cm (float): height in cm
Returns:
float: density :math:`\\rho(h_{cm})` in g/cm**3
"""
return corsika_get_density_jit(h_cm, self._atm_param)
[docs] def get_mass_overburden(self, h_cm):
""" Returns the mass overburden in atmosphere in g/cm**2.
Uses the optimized module function :func:`corsika_get_m_overburden_jit`.
Args:
h_cm (float): height in cm
Returns:
float: column depth :math:`T(h_{cm})` in g/cm**2
"""
return corsika_get_m_overburden_jit(h_cm, self._atm_param)
[docs] def rho_inv(self, X, cos_theta):
"""Returns reciprocal density in cm**3/g using planar approximation.
This function uses the optimized function :func:`planar_rho_inv_jit`
Args:
h_cm (float): height in cm
Returns:
float: :math:`\\frac{1}{\\rho}(X,\\cos{\\theta})` cm**3/g
"""
return planar_rho_inv_jit(X, cos_theta, self._atm_param)
[docs] def calc_thickl(self):
"""Calculates thickness layers for :func:`depth2height`
The analytical inversion of the CORSIKA parameterization
relies on the knowledge about the depth :math:`X`, where
trasitions between layers/exponentials occur.
Example:
Create a new set of parameters in :func:`init_parameters`
inserting arbitrary values in the _thikl array::
$ cor_atm = CorsikaAtmosphere(new_location, new_season)
$ cor_atm.calc_thickl()
Replace _thickl values with printout.
"""
from scipy.integrate import quad
thickl = []
for h in self._atm_param[4]:
thickl.append('{0:4.6f}'.format(quad(self.get_density, h,
112.8e5, epsrel=1e-4)[0]))
print '_thickl = np.array([' + ', '.join(thickl) + '])'
@jit(double(double, double, double[:, :]), target='cpu')
def planar_rho_inv_jit(X, cos_theta, param):
"""Optimized calculation of :math:`1/\\rho(X,\\theta)` in
planar approximation.
This function can be used for calculations where
:math:`\\theta < 70^\\circ`.
Args:
X (float): slant depth in g/cm**2
cos_theta (float): :math:`\\cos(\\theta)`
Returns:
float: :math:`1/\\rho(X,\\theta)` in cm**3/g
"""
a = param[0]
b = param[1]
c = param[2]
t = param[3]
res = 0.0
x_v = X * cos_theta
layer = 0
for i in xrange(t.size):
if not x_v >= t[i]:
layer = i
if layer == 4:
res = c[4] / b[4]
else:
l = layer
res = c[l] / (x_v - a[l])
return res
@jit(double(double, double[:, :]), target='cpu')
def corsika_get_density_jit(h_cm, param):
"""Optimized calculation of :math:`\\rho(h)` in
according to CORSIKA type parameterization.
Args:
h_cm (float): height above surface in cm
param (numpy.array): 5x5 parameter array from
:class:`CorsikaAtmosphere`
Returns:
float: :math:`\\rho(h)` in g/cm**3
"""
b = param[1]
c = param[2]
hl = param[4]
res = 0.0
layer = 0
for i in xrange(hl.size):
if not h_cm <= hl[i]:
layer = i
if layer == 4:
res = b[4] / c[4]
else:
l = layer
res = b[l] / c[l] * np.exp(-h_cm / c[l])
return res
@jit(double(double, double[:, :]), target='cpu')
def corsika_get_m_overburden_jit(h_cm, param):
"""Optimized calculation of :math:`\\T(h)` in
according to CORSIKA type parameterization.
Args:
h_cm (float): height above surface in cm
param (numpy.array): 5x5 parameter array from
:class:`CorsikaAtmosphere`
Returns:
float: :math:`\\rho(h)` in g/cm**3
"""
a = param[0]
b = param[1]
c = param[2]
hl = param[4]
res = 0.0
layer = 0
for i in xrange(hl.size):
if not h_cm <= hl[i]:
layer = i
if layer == 4:
res = a[4] - b[4] / c[4] * h_cm
else:
l = layer
res = a[l] + b[l] * np.exp(-h_cm / c[l])
return res
[docs]class IsothermalAtmosphere(EarthAtmosphere):
"""Isothermal model of the atmosphere.
This model is widely used in semi-analytical calculations. The isothermal
approximation is valid in a certain range of altitudes and usually
one adjust the parameters to match a more realistic density profile
at altitudes between 10 - 30 km, where the high energy muon production
rate peaks. Such parametrizations are given in the book "Cosmic Rays and
Particle Physics", Gaisser, Engel and Resconi (2016). The default values are
from M. Thunman, G. Ingelman, and P. Gondolo, Astropart. Physics 5, 309 (1996).
Args:
location (str): no effect
season (str): no effect
hiso_km (float): isothermal scale height in km
X0 (float): Ground level overburden
"""
def __init__(self, location, season, hiso_km=6.3, X0=1300.):
self.hiso_cm = hiso_km * 1e5
self.X0 = X0
self.location = location
self.season = season
EarthAtmosphere.__init__(self)
[docs] def get_density(self, h_cm):
""" Returns the density of air in g/cm**3.
Args:
h_cm (float): height in cm
Returns:
float: density :math:`\\rho(h_{cm})` in g/cm**3
"""
return self.X0 / self.hiso_cm * np.exp(-h_cm / self.hiso_cm)
[docs] def get_mass_overburden(self, h_cm):
""" Returns the mass overburden in atmosphere in g/cm**2.
Args:
h_cm (float): height in cm
Returns:
float: column depth :math:`T(h_{cm})` in g/cm**2
"""
return self.X0*np.exp(-h_cm/self.hiso_cm)
[docs]class MSIS00Atmosphere(EarthAtmosphere):
"""Wrapper class for a python interface to the NRLMSISE-00 model.
`NRLMSISE-00 <http://ccmc.gsfc.nasa.gov/modelweb/atmos/nrlmsise00.html>`_
is an empirical model of the Earth's atmosphere. It is available as
a FORTRAN 77 code or as a verson traslated into
`C by Dominik Borodowski <http://www.brodo.de/english/pub/nrlmsise/>`_.
Here a PYTHON wrapper has been used.
Attributes:
_msis : NRLMSISE-00 python wrapper object handler
Args:
location (str): see :func:`init_parameters`
season (str,optional): see :func:`init_parameters`
"""
def __init__(self, location, season):
from MCEq.msis_wrapper import cNRLMSISE00, pyNRLMSISE00
self._msis = (cNRLMSISE00() if config['msis_python'] == 'ctypes' else pyNRLMSISE00())
self.init_parameters(location, season)
EarthAtmosphere.__init__(self)
[docs] def init_parameters(self, location, season):
"""Sets location and season in :class:`NRLMSISE-00`.
Translates location and season into day of year
and geo coordinates.
Args:
location (str): Supported are "SouthPole" and "Karlsruhe"
season (str): months of the year: January, February, etc.
"""
self._msis.set_location(location)
self._msis.set_season(season)
self.location, self.season = location, season
# Clear cached value to force spline recalculation
self.theta_deg = None
[docs] def get_density(self, h_cm):
""" Returns the density of air in g/cm**3.
Wraps around ctypes calls to the NRLMSISE-00 C library.
Args:
h_cm (float): height in cm
Returns:
float: density :math:`\\rho(h_{cm})` in g/cm**3
"""
return self._msis.get_density(h_cm)
[docs]class AIRSAtmosphere(EarthAtmosphere):
"""Interpolation class for tabulated atmospheres.
This class is intended to read preprocessed AIRS Satellite data.
Args:
location (str): see :func:`init_parameters`
season (str,optional): see :func:`init_parameters`
"""
def __init__(self, location, season, extrapolate=True, *args, **kwargs):
if location != 'SouthPole':
raise Exception(self.__class__.__name__ +
"(): Only South Pole location supported. " + location)
self.extrapolate = extrapolate
self.month2doy = {'January':1,
'February':32,
'March':60,
'April':91,
'May':121,
'June':152,
'July':182,
'August':213,
'September':244,
'October':274,
'November':305,
'December':335}
self.season = season
self.init_parameters(location, **kwargs)
EarthAtmosphere.__init__(self)
[docs] def init_parameters(self, location, **kwargs):
"""Loads tables and prepares interpolation.
Args:
location (str): supported is only "SouthPole"
doy (int): Day Of Year
"""
from matplotlib.dates import strpdate2num, UTC, num2date
from os import path
data_path = (join(path.expanduser('~'),
'work/projects/atmospheric_variations/'))
if 'table_path' in kwargs:
data_path = kwargs['table_path']
files = [
('dens', 'airs_amsu_dens_180_daily.txt'),
('temp', 'airs_amsu_temp_180_daily.txt'),
('alti', 'airs_amsu_alti_180_daily.txt')]
data_collection = {}
# limit SouthPole pressure to <= 600
min_press_idx = 4
IC79_idx_1 = None
IC79_idx_2 = None
for d_key, fname in files:
fname = data_path + 'tables/' + fname
tab = np.loadtxt(fname,
converters={0: strpdate2num('%Y/%m/%d')},
usecols=[0] + range(2, 27))
with open(fname, 'r') as f:
comline = f.readline()
# print comline
p_levels = [float(s.strip()) for s in
comline.split(' ')[3:] if s != ''][min_press_idx:]
dates = num2date(tab[:, 0])
for di, date in enumerate(dates):
if date.month == 6 and date.day == 1:
if date.year == 2010:
IC79_idx_1 = di
elif date.year == 2011:
IC79_idx_2 = di
surf_val = tab[:, 1]
cols = tab[:, min_press_idx + 2:]
data_collection[d_key] = (dates, surf_val, cols)
self.interp_tab = {}
self.dates = {}
dates = data_collection['alti'][0]
msis = MSIS00Atmosphere(location,'January')
for didx, date in enumerate(dates):
h_vec = np.array(data_collection['alti'][2][didx,:]*1e2)
d_vec = np.array(data_collection['dens'][2][didx,:])
if self.extrapolate:
#Extrapolate using msis
h_extra = np.linspace(h_vec[-1],config['h_atm']*1e2,250)
msis._msis.set_doy(self._get_y_doy(date)[1]-1)
msis_extra = np.array([msis.get_density(h) for h in h_extra])
# Interpolate last few altitude bins
ninterp = 5
for ni in range(ninterp):
cl = (1 - np.exp(-ninterp+ni + 1))
ch = (1 - np.exp(-ni))
norm = 1./(cl + ch)
d_vec[-ni-1] = (d_vec[-ni-1]*cl*norm +
msis.get_density(h_vec[-ni-1])*ch*norm)
# Merge the two datasets
h_vec = np.hstack([h_vec[:-1], h_extra])
d_vec = np.hstack([d_vec[:-1], msis_extra])
self.interp_tab[self._get_y_doy(date)] = (h_vec, d_vec)
self.dates[self._get_y_doy(date)] = date
self.IC79_start = self._get_y_doy(dates[IC79_idx_1])
self.IC79_end = self._get_y_doy(dates[IC79_idx_2])
self.IC79_days = (dates[IC79_idx_2] - dates[IC79_idx_1]).days
self.location = location
if self.season is None:
self.set_IC79_day(0)
else:
self.set_season(self.season)
# Clear cached value to force spline recalculation
self.theta_deg = None
def set_date(self, year, doy):
self.h, self.dens = self.interp_tab[(year, doy)]
self.date = self.dates[(year, doy)]
# Compatibility with caching
self.season = self.date
def _set_doy(self, doy, year=2010):
self.h, self.dens = self.interp_tab[(year, doy)]
self.date = self.dates[(year, doy)]
def set_season(self, month):
self.season = month
self._set_doy(self.month2doy[month])
self.season = month
def set_IC79_day(self, IC79_day):
import datetime
if IC79_day > self.IC79_days:
raise Exception(self.__class__.__name__ +
"::set_IC79_day(): IC79_day above range.")
target_day = self._get_y_doy(self.dates[self.IC79_start] +
datetime.timedelta(days=IC79_day))
print 'setting IC79_day', IC79_day
self.h, self.dens = self.interp_tab[target_day]
self.date = self.dates[target_day]
# Compatibility with caching
self.season = self.date
def _get_y_doy(self, date):
return date.timetuple().tm_year, date.timetuple().tm_yday
[docs] def get_density(self, h_cm):
""" Returns the density of air in g/cm**3.
Wraps around ctypes calls to the NRLMSISE-00 C library.
Args:
h_cm (float): height in cm
Returns:
float: density :math:`\\rho(h_{cm})` in g/cm**3
"""
ret = np.exp(np.interp(h_cm, self.h, np.log(self.dens)))
try:
ret[h_cm > self.h[-1]] = np.nan
except TypeError:
if h_cm > self.h[-1]: return np.nan
return ret
[docs]class MSIS00IceCubeCentered(MSIS00Atmosphere):
"""Extension of :class:`MSIS00Atmosphere` which couples the latitude
setting with the zenith angle of the detector.
Args:
location (str): see :func:`init_parameters`
season (str,optional): see :func:`init_parameters`
"""
def __init__(self, location, season):
if location != 'SouthPole':
if dbg > 0:
print ('{0} location forced to SouthPole in' +
' class').format(self.__class__.__name__)
location = 'SouthPole'
MSIS00Atmosphere.__init__(self, location, season)
[docs] def latitude(self, det_zenith_deg):
""" Returns the geographic latitude of the shower impact point.
Assumes a spherical earth. The detector is 1948m under the
surface.
Credits: geometry fomulae by Jakob van Santen, DESY Zeuthen.
Args:
det_zenith_deg (float): zenith angle at detector in degrees
Returns:
float: latitude of the impact point in degrees
"""
r = config['r_E']
d = 1948 # m
theta_rad = det_zenith_deg / 180. * np.pi
x = (np.sqrt(2. * r * d + ((r - d) * np.cos(theta_rad))**2 - d**2)
- (r - d) * np.cos(theta_rad))
return -90. + np.arctan2(x * np.sin(theta_rad),
r - d + x * np.cos(theta_rad)) / np.pi * 180.
def set_theta(self, theta_deg, force_spline_calc=True):
self._msis.set_location_coord(longitude=0.,
latitude=self.latitude(theta_deg))
if dbg > 0:
print ('{0}::set_theta(): latitude = {1} for ' +
'zenith angle = {2}').format(self.__class__.__name__,
self.latitude(theta_deg),
theta_deg)
if theta_deg > 90.:
if dbg > 0:
print ('{0}::set_theta(): theta = {1} below horizon.' +
'using theta = {2}').format(self.__class__.__name__,
theta_deg,
180. - theta_deg)
theta_deg = 180. - theta_deg
MSIS00Atmosphere.set_theta(self, theta_deg,
force_spline_calc=force_spline_calc)
if __name__ == '__main__':
import matplotlib.pyplot as plt
plt.figure(figsize=(5, 4))
atm_obj = CorsikaAtmosphere('PL_SouthPole', 'January')
atm_obj.set_theta(0.0)
x_vec = np.linspace(0, atm_obj.max_X, 10000)
plt.plot(x_vec, 1 / atm_obj.r_X2rho(x_vec), lw=1.5,
label="PL_SouthPole/January")
atm_obj.init_parameters('PL_SouthPole', 'August')
atm_obj.set_theta(0.0)
x_vec = np.linspace(0, atm_obj.max_X, 10000)
plt.plot(x_vec, 1 / atm_obj.r_X2rho(x_vec), lw=1.5,
label="PL_SouthPole/August")
plt.legend()
plt.tight_layout()
plt.show()