import six
import numpy as np
import h5py
from collections import defaultdict
import mceq_config as config
from os.path import join, isfile
from .misc import normalize_hadronic_model_name, info
# TODO: Convert this to some functional generic class. Very erro prone to
# enter stuff by hand
equivalences = {
'SIBYLL23': {
-4132: 4122,
-4122: 4122,
-3334: -3312,
-3322: -2112,
-3212: -3122,
-413: -411,
113: 211,
221: 211,
111: 211,
310: 130,
413: 411,
3212: 3122,
3334: 3312
},
'SIBYLL21': {
-3322: 2112,
-3312: 2212,
-3222: 2212,
-3212: 2112,
-3122: 2112,
-3112: 2212,
-2212: 2212,
-2112: 2112,
310: 130,
111: 211,
3112: 2212,
3122: 2112,
3212: 2112,
3222: 2212,
3312: 2212,
3322: 2112
},
'QGSJET01': {
-4122: 2212,
-3322: 2212,
-3312: 2212,
-3222: 2212,
-3122: 2212,
-3112: 2212,
-2212: 2212,
-2112: 2212,
-421: 321,
-411: 321,
-211: 211,
-321: 321,
111: 211,
221: 211,
130: 321,
310: 321,
411: 321,
421: 321,
2112: 2212,
3112: 2212,
3122: 2212,
3222: 2212,
3312: 2212,
3322: 2212,
4122: 2212
},
'QGSJETII': {
-3122: -2112,
111: 211,
113: 211,
221: 211,
310: 130,
3122: 2112,
},
'DPMJET': {
-4122: -3222,
-3334: -3312,
-3212: -3122,
-431: -321,
-421: -321,
-413: -321,
-411: -321,
310: 130,
113: 211,
221: 211,
111: 211,
411: 321,
413: 321,
421: 321,
431: 321,
3212: 3122,
3334: 3312,
4122: 3222,
},
'EPOSLHC': {
-3334: 2212,
-3322: -3122,
-3312: 2212,
-3222: -2212,
-3212: -3122,
-3112: 2212,
111: 211,
113: 211,
221: 211,
310: 130,
3112: -2212,
3212: 3122,
3222: 2212,
3312: -2212,
3322: 3122,
3334: -2212
},
'PYTHIA8': {
-3122: -2112,
-431: -321,
-421: -321,
-413: -321,
-411: -321,
111: 211,
113: 211,
221: 211,
310: 321,
130: 321,
411: 321,
413: 321,
421: 321,
431: 321,
3122: 2112,
}
}
[docs]class HDF5Backend(object):
"""Provides access to tabulated data stored in an HDF5 file.
The file contains all necessary ingredients to run MCEq, i.e. no
other files are required. This database is not maintained in git
and it will change infrequently.
"""
def __init__(self):
info(2, 'Opening HDF5 file', config.mceq_db_fname)
self.had_fname = join(config.data_dir, config.mceq_db_fname)
if not isfile(self.had_fname):
raise Exception(
'MCEq DB file {0} not found in "data" directory.'.format(
config.mceq_db_fname))
self.em_fname = join(config.data_dir, config.em_db_fname)
if config.enable_em and not isfile(self.had_fname):
raise Exception(
'Electromagnetic DB file {0} not found in "data" directory.'.
format(config.em_db_fname))
with h5py.File(self.had_fname, 'r') as mceq_db:
from MCEq.misc import energy_grid
ca = mceq_db['common'].attrs
self.version = (mceq_db.attrs['version']
if 'version' in mceq_db.attrs else '1.0.0')
self.min_idx, self.max_idx, self._cuts = self._eval_energy_cuts(
ca['e_grid'])
self._energy_grid = energy_grid(
ca['e_grid'][self._cuts],
ca['e_bins'][self.min_idx:self.max_idx + 1],
ca['widths'][self._cuts], self.max_idx - self.min_idx)
self.dim_full = ca['e_dim']
@property
def energy_grid(self):
return self._energy_grid
def _eval_energy_cuts(self, e_centers):
min_idx, max_idx = 0, len(e_centers)
slice0, slice1 = None, None
if config.e_min is not None:
min_idx = slice0 = np.argmin(np.abs(e_centers - config.e_min))
if config.e_max is not None:
max_idx = slice1 = np.argmin(
np.abs(e_centers - config.e_max)) + 1
return min_idx, max_idx, slice(slice0, slice1)
def _gen_db_dictionary(self, hdf_root, indptrs, equivalences={}):
from scipy.sparse import csr_matrix
index_d = {}
relations = defaultdict(lambda: [])
particle_list = []
if 'description' in hdf_root.attrs:
description = hdf_root.attrs['description']
else:
description = None
mat_data = hdf_root[:, :]
indptr_data = indptrs[:]
len_data = hdf_root.attrs['len_data']
if hdf_root.attrs['tuple_idcs'].shape[1] == 4:
model_particles = sorted(
list(
set(hdf_root.attrs['tuple_idcs'][:,
(0,
2)].flatten().tolist())))
else:
model_particles = sorted(
list(set(hdf_root.attrs['tuple_idcs'].flatten().tolist())))
exclude = config.adv_set["disabled_particles"]
read_idx = 0
available_parents = [
(pdg, parity)
for (pdg, parity) in (hdf_root.attrs['tuple_idcs'][:, :2])
]
available_parents = sorted(list(set(available_parents)))
# Reverse equivalences
eqv_lookup = defaultdict(lambda: [])
for k in equivalences:
eqv_lookup[(equivalences[k], 0)].append((k, 0))
for tupidx, tup in enumerate(hdf_root.attrs['tuple_idcs']):
if len(tup) == 4:
parent_pdg, child_pdg = tuple(tup[:2]), tuple(tup[2:])
elif len(tup) == 2:
parent_pdg, child_pdg = (tup[0], 0), (tup[1], 0)
else:
raise Exception('Failed decoding parent-child relation.')
if (abs(parent_pdg[0]) in exclude) or (abs(
child_pdg[0]) in exclude):
read_idx += len_data[tupidx]
continue
parent_pdg = int(parent_pdg[0]), (parent_pdg[1])
child_pdg = int(child_pdg[0]), (child_pdg[1])
particle_list.append(parent_pdg)
particle_list.append(child_pdg)
index_d[(parent_pdg, child_pdg)] = (csr_matrix(
(mat_data[0, read_idx:read_idx + len_data[tupidx]],
mat_data[1, read_idx:read_idx + len_data[tupidx]],
indptr_data[tupidx, :]),
shape=(self.dim_full, self.dim_full
))[self._cuts, self.min_idx:self.max_idx]).toarray()
relations[parent_pdg].append(child_pdg)
info(20,
'This parent {0} is used for interactions of'.format(
parent_pdg[0]), [p[0] for p in eqv_lookup[parent_pdg]],
condition=len(equivalences) > 0)
if config.assume_nucleon_interactions_for_exotics:
for eqv_parent in eqv_lookup[parent_pdg]:
if eqv_parent[0] not in model_particles:
info(10, 'No equiv. replacement needed of', eqv_parent, 'for',
parent_pdg, 'parent.')
continue
elif eqv_parent in available_parents:
info(
10, 'Parent {0} has dedicated simulation.'.format(
eqv_parent[0]))
continue
particle_list.append(eqv_parent)
index_d[(eqv_parent, child_pdg)] = index_d[(parent_pdg,
child_pdg)]
relations[eqv_parent] = relations[parent_pdg]
info(
15, 'equivalence of {0} and {1} interactions'.format(
eqv_parent[0], parent_pdg[0]))
read_idx += len_data[tupidx]
return {
'parents': sorted(list(relations)),
'particles': sorted(list(set(particle_list))),
'relations': dict(relations),
'index_d': dict(index_d),
'description': description
}
def _check_subgroup_exists(self, subgroup, mname):
available_models = list(subgroup)
if mname not in available_models:
info(0, 'Invalid choice/model', mname)
info(0, 'Choose from:\n', '\n'.join(available_models))
raise Exception('Unknown selections.')
def interaction_db(self, interaction_model_name):
mname = normalize_hadronic_model_name(interaction_model_name)
info(10, 'Generating interaction db. mname={0}'.format(mname))
with h5py.File(self.had_fname, 'r') as mceq_db:
self._check_subgroup_exists(mceq_db['hadronic_interactions'],
mname)
if 'SIBYLL21' in mname:
eqv = equivalences['SIBYLL21']
elif 'SIBYLL23' in mname:
eqv = equivalences['SIBYLL23']
elif 'QGSJET01' in mname:
eqv = equivalences['QGSJET01']
elif 'QGSJETII' in mname:
eqv = equivalences['QGSJETII']
elif 'DPMJET' in mname:
eqv = equivalences['DPMJET']
elif 'EPOSLHC' in mname:
eqv = equivalences['EPOSLHC']
elif 'PYTHIA8' in mname:
eqv = equivalences['PYTHIA8']
int_index = self._gen_db_dictionary(
mceq_db['hadronic_interactions'][mname],
mceq_db['hadronic_interactions'][mname + '_indptrs'],
equivalences=eqv)
# Append electromagnetic interaction matrices from EmCA
if config.enable_em:
with h5py.File(self.em_fname, 'r') as em_db:
info(2, 'Injecting EmCA matrices into interaction_db.')
self._check_subgroup_exists(em_db, 'electromagnetic')
em_index = self._gen_db_dictionary(
em_db['electromagnetic']['emca_mats'],
em_db['electromagnetic']['emca_mats' + '_indptrs'])
int_index['parents'] = sorted(int_index['parents'] +
em_index['parents'])
int_index['particles'] = sorted(
list(set(int_index['particles'] + em_index['particles'])))
int_index['relations'].update(em_index['relations'])
int_index['index_d'].update(em_index['index_d'])
if int_index['description'] is not None:
int_index['description'] += '\nInteraction model name: ' + mname
else:
int_index['description'] = 'Interaction model name: ' + mname
return int_index
def decay_db(self, decay_dset_name):
info(10, 'Generating decay db. dset_name={0}'.format(decay_dset_name))
with h5py.File(self.had_fname, 'r') as mceq_db:
self._check_subgroup_exists(mceq_db['decays'], decay_dset_name)
dec_index = self._gen_db_dictionary(
mceq_db['decays'][decay_dset_name],
mceq_db['decays'][decay_dset_name + '_indptrs'])
if config.muon_helicity_dependence:
info(2, 'Using helicity dependent decays.')
custom_index = self._gen_db_dictionary(
mceq_db['decays']['custom_decays'],
mceq_db['decays']['custom_decays' + '_indptrs'])
info(5, 'Replacing decay from custom decay_db.')
dec_index['index_d'].update(custom_index['index_d'])
# Remove manually TODO: Kaon decays to muons assumed
# only two-body
_ = dec_index['index_d'].pop(((211, 0), (-13, 0)))
_ = dec_index['index_d'].pop(((-211, 0), (13, 0)))
_ = dec_index['index_d'].pop(((321, 0), (-13, 0)))
_ = dec_index['index_d'].pop(((-321, 0), (13, 0)))
# _ = dec_index['index_d'].pop(((211,0),(14,0)))
# _ = dec_index['index_d'].pop(((-211,0),(-14,0)))
# _ = dec_index['index_d'].pop(((321,0),(14,0)))
# _ = dec_index['index_d'].pop(((-321,0),(-14,0)))
dec_index['relations'] = defaultdict(lambda: [])
dec_index['particles'] = []
for idx_tup in dec_index['index_d']:
parent, child = idx_tup
dec_index['relations'][parent].append(child)
dec_index['particles'].append(parent)
dec_index['particles'].append(child)
dec_index['parents'] = sorted(list(dec_index['relations']))
dec_index['particles'] = sorted(
list(set(dec_index['particles'])))
return dec_index
def cs_db(self, interaction_model_name):
mname = normalize_hadronic_model_name(interaction_model_name)
with h5py.File(self.had_fname, 'r') as mceq_db:
self._check_subgroup_exists(mceq_db['cross_sections'], mname)
cs_db = mceq_db['cross_sections'][mname]
cs_data = cs_db[:]
index_d = {}
parents = list(cs_db.attrs['projectiles'])
for ip, p in enumerate(parents):
index_d[p] = cs_data[self._cuts, ip]
# Append electromagnetic interaction cross sections from EmCA
if config.enable_em:
with h5py.File(self.em_fname, 'r') as em_db:
info(2, 'Injecting EmCA matrices into interaction_db.')
self._check_subgroup_exists(em_db, 'electromagnetic')
em_cs = em_db["electromagnetic"]['cs'][:]
em_parents = list(
em_db["electromagnetic"]['cs'].attrs['projectiles'])
for ip, p in enumerate(em_parents):
if p in index_d:
raise Exception(
'EM cross sections already in database?')
index_d[p] = em_cs[ip, self._cuts]
parents += em_parents
return {'parents': parents, 'index_d': index_d}
def continuous_loss_db(self, medium='air'):
with h5py.File(self.had_fname, 'r') as mceq_db:
self._check_subgroup_exists(mceq_db['continuous_losses'], medium)
cl_db = mceq_db['continuous_losses'][medium]
index_d = {}
for pstr in list(cl_db):
for hel in [0, 1, -1]:
index_d[(int(pstr), hel)] = cl_db[pstr][self._cuts]
if config.enable_em:
with h5py.File(self.em_fname, 'r') as em_db:
info(2, 'Injecting EmCA matrices into interaction_db.')
self._check_subgroup_exists(em_db, 'electromagnetic')
for hel in [0, 1, -1]:
index_d[(11,
hel)] = em_db["electromagnetic"]['dEdX 11'][
self._cuts]
index_d[(-11,
hel)] = em_db["electromagnetic"]['dEdX -11'][
self._cuts]
return {'parents': sorted(list(index_d)), 'index_d': index_d}
[docs]class Interactions(object):
"""Class for managing the dictionary of interaction yield matrices.
Args:
mceq_hdf_db (object): instance of :class:`MCEq.data.HDF5Backend`
"""
def __init__(self, mceq_hdf_db):
from collections import defaultdict
#: MCEq HDF5Backend reference
self.mceq_db = mceq_hdf_db
#: reference to energy grid
self.energy_grid = mceq_hdf_db.energy_grid
#: List of active parents
self.parents = None
#: List of all known particles
self.particles = None
#: Dictionary parent/child relations
self.relations = None
#: Dictionary containing the distribuiton matrices
self.index_d = None
#: String containing the desciption of the model
self.description = None
#: (str) Interaction Model name
self.iam = None
# #: (tuple) selection of a band of coeffictients (in xf)
# self.band = None
#: (tuple) modified particle combination for error prop.
self.mod_pprod = defaultdict(lambda: {})
def load(self, interaction_model, parent_list=None):
from MCEq.misc import is_charm_pdgid
self.iam = normalize_hadronic_model_name(interaction_model)
# Load tables and index from file
index = self.mceq_db.interaction_db(self.iam)
disabled_particles = config.adv_set['disabled_particles']
self.parents = [p for p in index['parents']
if p[0] not in disabled_particles]
self.relations = index['relations']
self.index_d = index['index_d']
self.description = index['description']
# Advanced options
if parent_list is not None:
self.parents = [p for p in self.parents if p in parent_list and p[0]
not in disabled_particles]
if (config.adv_set['disable_charm_pprod']):
self.parents = [
p for p in self.parents if not is_charm_pdgid(p[0])
]
if (config.adv_set['disable_interactions_of_unstable']):
self.parents = [
p for p in self.parents
if p[0] not in [2212, 2112, -2212, -2112]
]
if (config.adv_set['allowed_projectiles']):
self.parents = [
p for p in self.parents
if p[0] in config.adv_set['allowed_projectiles']
]
self.particles = []
for p in list(self.relations):
if p not in self.parents:
_ = self.relations.pop(p, None)
continue
self.particles.append(p)
self.particles += [d for d in self.relations[p]
if d not in disabled_particles]
self.particles = sorted(list(set(self.particles)))
if config.adv_set['disable_direct_leptons']:
for p in list(self.relations):
self.relations[p] = [
c for c in self.relations[p] if not 10 < abs(c[0]) < 20
]
if len(disabled_particles) > 0:
for p in list(self.relations):
self.relations[p] = [
c for c in self.relations[p] if c[0] not in
disabled_particles
]
if not self.particles:
info(2, 'None of the parent_list particles interact. Returning custom list.')
self.particles = parent_list
def __getitem__(self, key):
return self.get_matrix(*key)
def __contains__(self, key):
"""Defines the `in` operator to look for particles"""
return key in self.parents
def _gen_mod_matrix(self, x_func, *args):
"""Creates modification matrix using an (x,E)-dependent function.
:math:`x = \\frac{E_{\\rm primary}}{E_{\\rm secondary}}` is the
fraction of secondary particle energy. ``x_func`` can be an
arbitrary function modifying the :math:`x_\\text{lab}` distribution.
Run this method each time you change ``x_func``, or its parameters,
not each time you change modified particle.
The ``args`` are passed to the function.
Args:
x_func (object): reference to function
args (tuple): arguments of `x_func`
Returns:
(numpy.array): modification matrix
"""
from MCEq.misc import gen_xmat
info(2, 'Generating modification matrix for', x_func.__name__, args)
xmat = gen_xmat(self.energy_grid)
# select the relevant slice of interaction matrix
modmat = x_func(xmat, self.energy_grid.c, *args)
# Set lower triangular indices to 0. (should be not necessary)
modmat[np.tril_indices(self.energy_grid.d, -1)] = 0.
return modmat
def _set_mod_pprod(self, prim_pdg, sec_pdg, x_func, args):
"""Sets combination of parent/secondary for error propagation.
The production spectrum of ``sec_pdg`` in interactions of
``prim_pdg`` is modified according to the function passed to
:func:`InteractionYields.init_mod_matrix`
Args:
prim_pdg (int): interacting (primary) particle PDG ID
sec_pdg (int): secondary particle PDG ID
"""
# Short cut for the pprod list
mpli = self.mod_pprod
pstup = (prim_pdg, sec_pdg)
if config.use_isospin_sym and prim_pdg not in [2212, 2112]:
raise Exception('Unsupported primary for isospin symmetries.')
if (x_func.__name__, args) in mpli[(pstup)]:
info(
5, ' no changes to particle production' +
' modification matrix of {0}/{1} for {2},{3}'.format(
prim_pdg, sec_pdg, x_func.__name__, args))
return False
# Check function with same mode but different parameter is supplied
for (xf_name, fargs) in list(mpli[pstup]):
if (xf_name == x_func.__name__) and (fargs[0] == args[0]):
info(1, 'Warning. If you modify only the value of a function,',
'unset and re-apply all changes')
return False
info(
2, 'modifying modify particle production' +
' matrix of {0}/{1} for {2},{3}'.format(prim_pdg, sec_pdg,
x_func.__name__, args))
kmat = self._gen_mod_matrix(x_func, *args)
mpli[pstup][(x_func.__name__, args)] = kmat
info(5, 'modification "strength"',
np.sum(kmat) / np.count_nonzero(kmat))
if not config.use_isospin_sym:
return True
prim_pdg, symm_pdg = 2212, 2112
if prim_pdg == 2112:
prim_pdg = 2112
symm_pdg = 2212
# p->pi+ = n-> pi-, p->pi- = n-> pi+
if abs(sec_pdg) == 211:
# Add the same mod to the isospin symmetric particle combination
mpli[(symm_pdg, -sec_pdg)][('isospin', args)] = kmat
# Assumption: Unflavored production coupled to the average
# of pi+ and pi- production
if np.any([p in self.parents for p in [221, 223, 333]]):
unflv_arg = None
if (prim_pdg, -sec_pdg) not in mpli:
# Only pi+ or pi- (not both) have been modified
unflv_arg = (args[0], 0.5 * args[1])
if (prim_pdg, -sec_pdg) in mpli:
# Compute average of pi+ and pi- modification matrices
# Save the 'average' argument (just for meaningful output)
for arg_name, arg_val in mpli[(prim_pdg, -sec_pdg)]:
if arg_name == args[0]:
unflv_arg = (args[0], 0.5 * (args[1] + arg_val))
unflmat = self._gen_mod_matrix(x_func, *unflv_arg)
# modify eta, omega, phi, 221, 223, 333
for t in [(prim_pdg, 221), (prim_pdg, 223), (prim_pdg, 333),
(symm_pdg, 221), (symm_pdg, 223), (symm_pdg, 333)]:
mpli[t][('isospin', unflv_arg)] = unflmat
# Charged and neutral kaons
elif abs(sec_pdg) == 321:
# approx.: p->K+ ~ n-> K+, p->K- ~ n-> K-
mpli[(symm_pdg, sec_pdg)][('isospin', args)] = kmat
k0_arg = (args[0], 0.5 * args[1])
if (prim_pdg, -sec_pdg) in mpli:
# Compute average of K+ and K- modification matrices
# Save the 'average' argument (just for meaningful printout)
for arg_name, arg_val in mpli[(prim_pdg, -sec_pdg)]:
if arg_name == args[0]:
k0_arg = (args[0], 0.5 * (args[1] + arg_val))
k0mat = self._gen_mod_matrix(x_func, *k0_arg)
# modify K0L/S
for t in [(prim_pdg, 310), (prim_pdg, 130), (symm_pdg, 310),
(symm_pdg, 130)]:
mpli[t][('isospin', k0_arg)] = k0mat
elif abs(sec_pdg) == 411:
ssec = np.sign(sec_pdg)
mpli[(prim_pdg, ssec * 421)][('isospin', args)] = kmat
mpli[(prim_pdg, ssec * 431)][('isospin', args)] = kmat
mpli[(symm_pdg, sec_pdg)][('isospin', args)] = kmat
mpli[(symm_pdg, ssec * 421)][('isospin', args)] = kmat
mpli[(symm_pdg, ssec * 431)][('isospin', args)] = kmat
# Leading particles
elif abs(sec_pdg) == prim_pdg:
mpli[(symm_pdg, symm_pdg)][('isospin', args)] = kmat
elif abs(sec_pdg) == symm_pdg:
mpli[(symm_pdg, prim_pdg)][('isospin', args)] = kmat
else:
raise Exception('No isospin relation found for secondary' +
str(sec_pdg))
# Tell MCEqRun to regenerate the matrices if something has changed
return True
[docs] def print_mod_pprod(self):
"""Prints the active particle production modification.
"""
for i, (prim_pdg, sec_pdg) in enumerate(sorted(self.mod_pprod)):
for j, (argname, argv) in enumerate(self.mod_pprod[(prim_pdg,
sec_pdg)]):
info(2,
'{0}: {1} -> {2}, func: {3}, arg: {4}'.format(
i + j, prim_pdg, sec_pdg, argname, argv),
no_caller=True)
[docs] def get_matrix(self, parent, child):
"""Returns a ``DIM x DIM`` yield matrix.
Args:
parent (int): PDG ID of parent particle
child (int): PDG ID of final state child/secondary particle
Returns:
numpy.array: yield matrix
"""
info(10, 'Called for', parent, child)
if child not in self.relations[parent]:
raise Exception(("trying to get empty matrix {0} -> {1}").format(
parent, child))
m = self.index_d[(parent, child)]
if config.adv_set['disable_leading_mesons'] and abs(child) < 2000 \
and (parent, -child) in list(self.index_d):
m_anti = self.index_d[(parent, -child)]
ie = 50
info(5, 'sum in disable_leading_mesons',
(np.sum(m[:, ie - 30:ie]) - np.sum(m_anti[:, ie - 30:ie])))
if (np.sum(m[:, ie - 30:ie]) - np.sum(m_anti[:, ie - 30:ie])) > 0:
info(5, 'inverting meson due to leading particle veto.', child,
'->', -child)
m = m_anti
else:
info(5, 'no inversion since child not leading', child)
else:
info(20, 'no meson inversion in leading particle veto.', parent,
child)
if (parent[0], child[0]) in list(self.mod_pprod):
info(
5, 'using modified particle production for {0}/{1}'.format(
parent[0], child[0]))
i = 0
m = np.copy(m)
for args, mmat in six.iteritems(self.mod_pprod[(parent[0], child[0])]):
info(10, i, (parent[0], child[0]), args, np.sum(mmat),
np.sum(m))
i += 1
m *= mmat
return m
[docs]class Decays(object):
"""Class for managing the dictionary of decay yield matrices.
Args:
mceq_hdf_db (object): instance of :class:`MCEq.data.HDF5Backend`
"""
def __init__(self, mceq_hdf_db, default_decay_dset='full_decays'):
#: MCEq HDF5Backend reference
self.mceq_db = mceq_hdf_db
#: (list) List of particles in the decay matrices
self.parent_list = []
self._default_decay_dset = default_decay_dset
def load(self, parent_list=None, decay_dset=None):
# Load tables and index from file
if decay_dset is None:
decay_dset = self._default_decay_dset
index = self.mceq_db.decay_db(decay_dset)
self.parents = index['parents']
self.particles = index['particles']
self.relations = index['relations']
self.index_d = index['index_d']
self.description = index['description']
# Advanced options
regenerate_index = False
if (parent_list):
# Take only the parents provided by the list
# Add the decay products, which can become new parents
def _follow_decay_chain(p, plist):
if p in self.relations:
plist.append(p)
for d in self.relations[p]:
_follow_decay_chain(d, plist)
else:
return plist
plist = []
for p in parent_list:
_follow_decay_chain(p, plist)
self.parents = sorted(list(set(plist)))
regenerate_index = True
if regenerate_index:
self.particles = []
for p in list(self.relations):
if p not in self.parents:
_ = self.relations.pop(p, None)
continue
self.particles.append(p)
self.particles += self.relations[p]
self.particles = sorted(list(set(self.particles)))
def __getitem__(self, key):
return self.get_matrix(*key)
def __contains__(self, key):
"""Defines the `in` operator to look for particles"""
return key in self.parents
def children(self, parent_pdg):
if parent_pdg not in self.relations:
raise Exception(
'Parent {0} not in decay database.'.format(parent_pdg))
return self.relations[parent_pdg]
[docs] def get_matrix(self, parent, child):
"""Returns a ``DIM x DIM`` decay matrix.
Args:
parent (int): PDG ID of parent particle
child (int): PDG ID of final state child particle
Returns:
numpy.array: decay matrix
"""
info(20, 'entering with', parent, child)
if child not in self.relations[parent]:
raise Exception(("trying to get empty matrix {0} -> {1}").format(
parent, child))
return self.index_d[(parent, child)]
[docs]class InteractionCrossSections(object):
"""Class for managing the dictionary of hadron-air cross-sections.
Args:
mceq_hdf_db (object): instance of :class:`MCEq.data.HDF5Backend`
interaction_model (str): name of the interaction model
"""
#: unit - :math:`\text{GeV} \cdot \text{fm}`
GeVfm = 0.19732696312541853
#: unit - :math:`\text{GeV} \cdot \text{cm}`
GeVcm = GeVfm * 1e-13
#: unit - :math:`\text{GeV}^2 \cdot \text{mbarn}`
GeV2mbarn = 10.0 * GeVfm**2
#: unit conversion - :math:`\text{mbarn} \to \text{cm}^2`
mbarn2cm2 = GeVcm**2 / GeV2mbarn
def __init__(self, mceq_hdf_db, interaction_model='SIBYLL2.3c'):
#: MCEq HDF5Backend reference
self.mceq_db = mceq_hdf_db
#: reference to energy grid
self.energy_grid = mceq_hdf_db.energy_grid
#: List of active parents
self.parents = None
#: Dictionary containing the distribuiton matrices
self.index_d = None
#: (str) Interaction Model name
self.iam = normalize_hadronic_model_name(interaction_model)
# Load defaults
self.load(interaction_model)
def __getitem__(self, parent):
"""Return the cross section in :math:`\\text{cm}^2` as a dictionary
lookup."""
return self.get_cs(parent)
def __contains__(self, key):
"""Defines the `in` operator to look for particles"""
return key in self.parents
def load(self, interaction_model):
#: (str) Interaction Model name
self.iam = normalize_hadronic_model_name(interaction_model)
# Load tables and index from file
index = self.mceq_db.cs_db(self.iam)
self.parents = index['parents']
self.index_d = index['index_d']
[docs] def get_cs(self, parent, mbarn=False):
"""Returns inelastic ``parent``-air cross-section
:math:`\\sigma_{inel}^{proj-Air}(E)` as vector spanned over
the energy grid.
Args:
parent (int): PDG ID of parent particle
mbarn (bool,optional): if ``True``, the units of the cross-section
will be :math:`mbarn`,
else :math:`\\text{cm}^2`
Returns:
numpy.array: cross-section in :math:`mbarn` or :math:`\\text{cm}^2`
"""
message_templ = 'replacing {0} with {1} cross section'
if isinstance(parent, tuple):
parent = parent[0]
if parent in list(self.index_d):
cs = self.index_d[parent]
elif abs(parent) in list(self.index_d):
cs = self.index_d[abs(parent)]
elif 100 < abs(parent) < 300 and abs(parent) != 130:
cs = self.index_d[211]
elif 300 < abs(parent) < 1000 or abs(parent) in [130, 10313, 10323]:
info(15, message_templ.format(parent, 'K+-'))
cs = self.index_d[321]
elif abs(parent) > 1000 and abs(parent) < 5000:
info(15, message_templ.format(parent, 'nucleon'))
cs = self.index_d[2212]
elif 5 < abs(parent) < 23:
info(15, 'returning 0 cross-section for lepton', parent)
return np.zeros(self.energy_grid.d)
else:
info(
1,
'Strange case for parent, using zero cross section.')
cs = 0.
if not mbarn:
return self.mbarn2cm2 * cs
else:
return cs
[docs]class ContinuousLosses(object):
"""Class for managing the dictionary of hadron-air cross-sections.
Args:
mceq_hdf_db (object): instance of :class:`MCEq.data.HDF5Backend`
material (str): name of the material (not fully implemented)
"""
def __init__(self, mceq_hdf_db, material=config.dedx_material):
#: MCEq HDF5Backend reference
self.mceq_db = mceq_hdf_db
#: reference to energy grid
self.energy_grid = mceq_hdf_db.energy_grid
#: List of active parents
self.parents = None
#: Dictionary containing the distribuiton matrices
self.index_d = None
# Load defaults
self.load_db(material)
def __getitem__(self, parent):
"""Return the cross section in :math:`\\text{cm}^2` as
a dictionary lookup."""
return self.index_d[parent]
def __contains__(self, key):
"""Defines the `in` operator to look for particles"""
return key in self.parents
def load_db(self, material):
# Load tables and index from file
index = self.mceq_db.continuous_loss_db(material)
self.parents = index['parents']
self.index_d = index['index_d']