Source code for MCEq.core

from time import time

import numpy as np
import six

import MCEq.data
from MCEq import config
from MCEq.misc import info, normalize_hadronic_model_name
from MCEq.particlemanager import ParticleManager

# trapz was finally removed with numpy 2.4
if hasattr(np, "trapezoid"):
    trapz = np.trapezoid
else:
    trapz = np.trapz


[docs] class MCEqRun: """Main class for handling the calculation. This class is the main user interface for the caclulation. It will handle initialization and various error/configuration checks. The setup has to be accomplished before invoking the integration routine is :func:`MCeqRun.solve`. Changes of configuration, such as: - interaction model in :meth:`MCEqRun.set_interaction_model`, - primary flux in :func:`MCEqRun.set_primary_model`, - zenith angle in :func:`MCEqRun.set_theta_deg`, - density profile in :func:`MCEqRun.set_density_model`, - member particles of the special ``obs_`` group in :func:`MCEqRun.set_obs_particles`, can be made on an active instance of this class, while calling :func:`MCEqRun.solve` subsequently to calculate the solution corresponding to the settings. The result can be retrieved by calling :func:`MCEqRun.get_solution`. Args: interaction_model (string): interaction model name, e.g. SIBYLL2.3E primary_model (class, param_tuple): classes derived from :class:`crflux.models.PrimaryFlux` and its parameters as tuple theta_deg (float): zenith angle :math:`\\theta` in degrees, measured positively from vertical direction medium (string, optional): "air", "water", "rock", "co2", "hydrogen", "iron" density_model (instance or tuple): Instance of initialized density model or tuple of strings, such as ('CORSIKA', ('BK_USStd', None)) particle_list (list, optional): Construct a system for only these partices including their decay products. """ def __init__(self, interaction_model, primary_model, theta_deg, **kwargs): config.ensure_db_available() self.medium = kwargs.pop("medium", config.interaction_medium) self._mceq_db = MCEq.data.HDF5Backend(medium=self.medium) interaction_model = normalize_hadronic_model_name(interaction_model) # Save atmospheric parameters self.density_model = kwargs.pop("density_model", config.density_model) self.theta_deg = theta_deg #: Interface to interaction tables of the HDF5 database self._interactions = MCEq.data.Interactions(mceq_hdf_db=self._mceq_db) #: handler for cross-section data of type :class:`MCEq.data.HadAirCrossSections` self._int_cs = MCEq.data.InteractionCrossSections( mceq_hdf_db=self._mceq_db, interaction_model=interaction_model ) #: handler for cross-section data of type :class:`MCEq.data.HadAirCrossSections` self._cont_losses = MCEq.data.ContinuousLosses(mceq_hdf_db=self._mceq_db) #: Interface to decay tables of the HDF5 database self._decays = MCEq.data.Decays(mceq_hdf_db=self._mceq_db) #: Particle manager (initialized/updated in set_interaction_model) self.pman = None # Particle list to keep track of previously initialized particles self._particle_list = None # General Matrix dimensions and shortcuts, controlled by # grid of yield matrices self._energy_grid = self._mceq_db.energy_grid # Initialize solution vector self._solution = np.zeros(1) # Initialize empty state (particle density) vector self._phi0 = np.zeros(1) # Initialize matrix builder (initialized in set_interaction_model) self.matrix_builder = None # Save initial condition (primary flux) to restore after dimensional resizing self._restore_initial_condition = [] # Set interaction model and compute grids and matrices self.set_interaction_model( interaction_model, particle_list=kwargs.pop("particle_list", None), build_matrices=kwargs.pop("build_matrices", True), ) # Default GPU device id for CUDA self._cuda_device = kwargs.pop("cuda_gpu_id", config.cuda_gpu_id) # Print particle list after tracking particles have been initialized self.pman.print_particle_tables(2) # Set atmosphere and geometry self.integration_path, self.int_grid, self.grid_var = None, None, None self._cached_leading_process = None self.set_density_model(self.density_model) # Set initial flux condition if primary_model is not None: try: self.set_primary_model(*primary_model) except TypeError: self.set_primary_model(primary_model) @property def e_grid(self): """Energy grid (bin centers)""" return self._energy_grid.c @property def e_bins(self): """Energy grid (bin edges)""" return self._energy_grid.b @property def e_widths(self): """Energy grid (bin widths)""" return self._energy_grid.w @property def dim(self): """Energy grid (dimension)""" return self._energy_grid.d @property def dim_states(self): """Number of cascade particles times dimension of grid (dimension of the equation system)""" return self.pman.dim_states
[docs] def ptot_grid(self, particle_name, return_bins=False): """Computes and returns the total momentum grid. If `return_bins` `True`, return bins, centers, otherwise just the bin centers. """ ptot_bins = np.sqrt( (self.e_bins + self.pman[particle_name].mass) ** 2 - self.pman[particle_name].mass ** 2 ) ptot_grid = np.sqrt(ptot_bins[1:] * ptot_bins[:-1]) if return_bins: return ptot_bins, ptot_grid else: return ptot_grid
[docs] def etot_grid(self, particle_name, return_bins=False): """Computes and returns the total energy grid. If `return_bins = True` return bins and centers, otherwise just the bin centers. """ etot_bins = self.e_bins + self.pman[particle_name].mass etot_grid = np.sqrt(etot_bins[1:] * etot_bins[:-1]) if return_bins: return etot_bins, etot_grid else: return etot_grid
[docs] def xgrid(self, particle_name, return_as, return_bins=False): """Uniform access to the spectrum variable, depending on the same `return_as` argument as in get_solution.""" if return_as == "kinetic energy": return (self.e_bins, self.e_grid) if return_bins else self.e_grid elif return_as == "total energy": return self.etot_grid(particle_name, return_bins) elif return_as == "total momentum": return self.ptot_grid(particle_name, return_bins) else: raise Exception("Unknown grid type requested.")
[docs] def closest_energy(self, kin_energy): """Convenience function to obtain the nearest grid energy to the `energy` argument, provided as kinetik energy in lab. frame.""" eidx = (np.abs(self._energy_grid.c - kin_energy)).argmin() return self._energy_grid.c[eidx]
def _get_state_vector(self, grid_idx=None): """Returns state vector""" if not hasattr(self, "_solution") and grid_idx is None: raise Exception("State vector not initialized. Run solve() first.") if not hasattr(self, "grid_sol") and grid_idx is not None: raise Exception("Solution not on grid. Re-run solve() with a grid.") if grid_idx is None: state_vec = np.copy(self._solution) elif grid_idx < len(self.grid_sol): state_vec = self.grid_sol[grid_idx, :] else: raise Exception("Invalid grid index", grid_idx) order = [(p.mceqidx, p.name) for p in self.pman.cascade_particles] return order, state_vec def _set_state_vector(self, order_i, state_vec, only_available=False): """Sets the initial to that supplied as state vector.""" order = [(p.mceqidx, p.name) for p in self.pman.cascade_particles] if order_i != order and not only_available: raise Exception( "The orders of the state vecs don't match {0}!={1}".format( order_i, order ) ) elif order_i != order and only_available: particles_requested = [o[1] for o in order_i] for pidx, pname in order: if pname in self.pman.pname2pref: p = self.pman.pname2pref[pname] self._phi0[p.lidx : p.uidx] *= 0.0 if pname in particles_requested: try: self._phi0[p.lidx : p.uidx] = state_vec[ pidx * self.dim : (pidx + 1) * self.dim ] except ValueError: raise Exception("Error when setting state for", p.name) else: self._phi0[:] = state_vec[:]
[docs] def get_solution( self, particle_name, mag=0.0, grid_idx=None, integrate=False, return_as=config.return_as, dont_sum_helicities=False, ): """Retrieves solution of the calculation on the energy grid. Some special prefixes are accepted for lepton names: - the total flux of muons, muon neutrinos etc. from all sources/mothers can be retrieved without a prefix ``mu+`` or with the prefix ``total_mu+``, ``total_numu`` - the conventional flux of muons, muon neutrinos etc. from all sources can be retrieved by the prefix ``conv_``, i.e. ``conv_numu`` - the prompt flux of muons, muon neutrinos etc. from all sources can be retrieved by the prefix ``pr_``, i.e. ``pr_numu`` - correspondigly, the flux of leptons which originated from the decay of a charged pion carries the prefix ``pi_`` and from a kaon ``k_`` Args: particle_name (str): The name of the particle such, e.g. ``total_mu+`` for the total flux spectrum of positive muons or ``pr_antinumu`` for the flux spectrum of prompt anti muon neutrinos mag (float, optional): 'magnification factor': the solution is multiplied by ``sol`` :math:`= \\Phi \\cdot E^{mag}` grid_idx (int, optional): if the integrator has been configured to save intermediate solutions on a depth grid, then ``grid_idx`` specifies the index of the depth grid for which the solution is retrieved. If not specified the flux at the surface is returned integrate (bool, optional): return averge particle number instead of flux (multiply by bin width) return_as (str, optional): the flux can be returned as ``total energy``, ``kinetic energy``, or ``total momentum`` flux. This defaults to ``kinetic energy`` and is in general taken from ``MCEq.config.return_as`` dont_sum_helicities (bool, optional): Per default the lepton flux is summed over the available helicities, e.g. ``total_mu+`` is the muon flux from (-1, 0, +1) helicity for mu+. Returns: (:func: numpy.array): flux of particles on energy grid :attr:`e_grid` """ res = np.zeros(self._energy_grid.d) ref = self.pman.pname2pref sol = None if grid_idx is not None and len(self.grid_sol) == 0: raise Exception("Solution not has not been computed on grid. Check input.") if grid_idx is None: sol = np.copy(self._solution) elif grid_idx >= len(self.grid_sol): sol = self.grid_sol[-1, :] else: sol = self.grid_sol[grid_idx, :] def sum_lr(lep_str, prefix): result = np.zeros(self.dim) nsuccess = 0 if dont_sum_helicities: sum_over = [lep_str] else: sum_over = [lep_str, lep_str + "_l", lep_str + "_r"] for ls in sum_over: if prefix + ls not in ref: info( 15, "No separate left and right handed particles,", f"or, unavailable particle prefix {prefix + ls}.", ) continue result += sol[ref[prefix + ls].lidx : ref[prefix + ls].uidx] nsuccess += 1 if nsuccess == 0 and config.excpt_on_missing_particle: raise Exception(f"Requested particle {particle_name} not found.") return result lep_str = particle_name.split("_")[1] if "_" in particle_name else particle_name default_tracking_prefixes = [ "conv_", "pr_", "pi_", "k_", "K0_", "mulr_", "mu_h0_", "prcas_", "prres_", ] if not config.enable_default_tracking: for track_pref in default_tracking_prefixes: if particle_name.startswith(track_pref): raise Exception( "Tracking category requested but " + "enable_default_tracking is off in config." ) if particle_name.startswith("total_"): # Note: This has changed from previous MCEq versions, # since pi_ and k_ prefixes are mere tracking counters # and no full particle species anymore res = sum_lr(lep_str, prefix="") elif particle_name.startswith("conv_"): # Note: This changed from previous MCEq versions, # conventional is defined as total - prompt res = self.get_solution( "total_" + lep_str, mag=0, grid_idx=grid_idx, integrate=False, return_as="kinetic energy", ) - self.get_solution( "pr_" + lep_str, mag=0, grid_idx=grid_idx, integrate=False, return_as="kinetic energy", ) elif particle_name.startswith("pr_"): if "prcas_" + lep_str in ref: res += sum_lr(lep_str, prefix="prcas_") if "prres_" + lep_str in ref: res += sum_lr(lep_str, prefix="prres_") if "em_" + lep_str in ref: res += sum_lr(lep_str, prefix="em_") else: try: res = sum_lr(particle_name, prefix="") except KeyError: if config.excpt_on_missing_particle: raise Exception(f"Requested particle {particle_name} not found.") else: info(1, f"Requested particle {particle_name} not found.") # When returning in Etot, interpolate on different grid if return_as == "total energy": etot_grid = self.etot_grid(lep_str) if not integrate: return res * etot_grid**mag else: return res * etot_grid**mag * self.e_widths elif return_as == "kinetic energy": if not integrate: return res * self._energy_grid.c**mag else: return res * self._energy_grid.c**mag * self.e_widths elif return_as == "total momentum": ptot_bins, ptot_grid = self.ptot_grid(lep_str, return_bins=True) dEkindp = np.diff(ptot_bins) / self.e_widths if not integrate: return dEkindp * res * ptot_grid**mag else: return dEkindp * res * ptot_grid**mag * np.diff(ptot_bins) else: raise Exception( "Unknown 'return_as' variable choice.", 'the options are "kinetic energy", "total energy", "total momentum"', )
[docs] def set_interaction_model( self, interaction_model, particle_list=None, update_particle_list=True, force=False, build_matrices=True, ): """Sets interaction model and/or an external charm model for calculation. Decay and interaction matrix will be regenerated automatically after performing this call. Args: interaction_model (str): name of interaction model charm_model (str, optional): name of charm model force (bool): force loading interaction model """ interaction_model = normalize_hadronic_model_name(interaction_model) info(1, interaction_model) if ( not force and (self._interactions.iam == interaction_model) and particle_list != self._particle_list ): info(2, "Skip, since current model identical to", interaction_model + ".") return self._int_cs.load(interaction_model) # TODO: simplify this, stuff not needed anymore if not update_particle_list and self._particle_list is not None: info(10, "Re-using particle list.") self._interactions.load(interaction_model, parent_list=self._particle_list) self.pman.set_interaction_model(self._int_cs, self._interactions) self.pman.set_decay_channels(self._decays) self.pman.set_continuous_losses(self._cont_losses) elif self._particle_list is None: info(10, "New initialization of particle list.") # First initialization if particle_list is None: self._interactions.load(interaction_model) else: self._interactions.load(interaction_model, parent_list=particle_list) self._decays.load(parent_list=self._interactions.particles) self._particle_list = self._interactions.particles + self._decays.particles # Create particle database self.pman = ParticleManager( self._particle_list, self._energy_grid, self._int_cs, self.medium ) self.pman.set_interaction_model(self._int_cs, self._interactions) self.pman.set_decay_channels(self._decays) self.pman.set_continuous_losses(self._cont_losses) self.matrix_builder = MatrixBuilder(self.pman) elif update_particle_list and particle_list != self._particle_list: info(10, "Updating particle list.") # Updated particle list received if particle_list is None: self._interactions.load(interaction_model) else: self._interactions.load(interaction_model, parent_list=particle_list) self._decays.load(parent_list=self._interactions.particles) self._particle_list = self._interactions.particles + self._decays.particles self.pman.set_interaction_model( self._int_cs, self._interactions, updated_parent_list=self._particle_list, ) self.pman.set_decay_channels(self._decays) self.pman.set_continuous_losses(self._cont_losses) else: raise Exception("Should not happen in practice.") self._resize_vectors_and_restore() # initialize matrices if not build_matrices: return self.int_m, self.dec_m = self.matrix_builder.construct_matrices( skip_decay_matrix=False )
def _resize_vectors_and_restore(self): """Update solution and grid vectors if the number of particle species or the interaction models change. The previous state, such as the initial spectrum, are restored.""" # Update dimensions if particle dimensions changed self._phi0 = np.zeros(self.dim_states) self._solution = np.zeros(self.dim_states) # Restore insital condition if present if len(self._restore_initial_condition) > 0: for con in self._restore_initial_condition: con[0](*con[1:])
[docs] def set_primary_model(self, model_class_or_object, tag=None): """Sets primary flux model. This functions is quick and does not require re-generation of matrices. Args: interaction_model (:class:`CRFluxModel.PrimaryFlux`): reference to primary model **class** tag (tuple): positional argument list for model class """ assert not isinstance(model_class_or_object, tuple), ( "Primary model can not be supplied as tuples" ) # Check if classs or object supplied if not isinstance(model_class_or_object, type): assert any( [ "PrimaryFlux" in b.__name__ for b in model_class_or_object.__class__.__bases__ ] ), "model_class_or_object is not derived from crflux.models.PrimaryFlux" info(5, "Primary model supplied as object") self.pmodel = model_class_or_object else: # Initialize primary model object info(5, "Primary model supplied as class") self.pmodel = model_class_or_object(tag) info(1, f"Primary model set to {self.pmodel.name}") # Save primary flux model for restauration after interaction model changes self._restore_initial_condition = [(self.set_primary_model, self.pmodel)] # TODO: Maybe needs to catch the removal of the np.vectorize # self.get_nucleon_spectrum = np.vectorize(self.pmodel.p_and_n_flux) self.get_nucleon_spectrum = self.pmodel.p_and_n_flux try: self.dim_states except AttributeError: self.finalize_pmodel = True # Set initial condition minimal_energy = config.minimal_primary_energy if (2212, 0) in self.pman and (2112, 0) in self.pman: e_tot = self._energy_grid.c + 0.5 * ( self.pman[(2212, 0)].mass + self.pman[(2112, 0)].mass ) else: raise Exception( "No nucleons in eqn system, primary flux model can not be used." ) min_idx = np.argmin(np.abs(e_tot - minimal_energy)) self._phi0 *= 0 p_top, n_top = self.get_nucleon_spectrum(e_tot[min_idx:])[1:] if (2212, 0) in self.pman: self._phi0[ min_idx + self.pman[(2212, 0)].lidx : self.pman[(2212, 0)].uidx ] = 1e-4 * p_top else: info( 1, "Protons not in equation system, can not set primary flux.", ) if (2112, 0) in self.pman and not self.pman[(2112, 0)].is_resonance: self._phi0[ min_idx + self.pman[(2112, 0)].lidx : self.pman[(2112, 0)].uidx ] = 1e-4 * n_top elif (2212, 0) in self.pman: info( 2, "Neutrons not part of equation system,", "substituting initial flux with protons.", ) self._phi0[ min_idx + self.pman[(2212, 0)].lidx : self.pman[(2212, 0)].uidx ] += 1e-4 * n_top
[docs] def set_single_primary_particle( self, E, corsika_id=None, pdg_id=None, append=False ): """Set type and kinetic energy of a single primary nucleus to calculation of particle yields. The functions uses the superposition theorem, where the flux of a nucleus with mass A and charge Z is modeled by using Z protons and A-Z neutrons at energy :math:`E_{nucleon}= E_{nucleus} / A` The nucleus type is defined via :math:`\\text{CORSIKA ID} = A*100 + Z`. For example iron has the CORSIKA ID 5226. Single leptons or hadrons can be defined by specifiying `pdg_id` instead of `corsika_id`. The `append` argument can be used to compose an initial state with multiple particles. If it is `False` the initial condition is reset to zero before adding the particle. A continuous input energy range is allowed between :math:`50*A~ \\text{GeV} < E_\\text{nucleus} < 10^{10}*A \\text{GeV}`. Args: E (float): kinetic energy of a nucleus in GeV corsika_id (int): ID of a nucleus (see text) pdg_id (int): PDG ID of a particle append (bool): If True, keep previous state and append a new particle. """ import warnings from scipy.linalg import solve from MCEq.misc import getAZN, getAZN_corsika if corsika_id and pdg_id: raise Exception("Provide either corsika or PDG ID") info(2, f"CORSIKA ID {corsika_id}, PDG ID {pdg_id}, energy {E:5.3g} GeV") if corsika_id: n_nucleons, n_protons, n_neutrons = getAZN_corsika(corsika_id) elif pdg_id: n_nucleons, n_protons, n_neutrons = getAZN(pdg_id) En = E / float(n_nucleons) if n_nucleons > 0 else E if En < np.min(self._energy_grid.c): raise Exception("energy per nucleon too low for primary " + str(corsika_id)) if append is False: self._restore_initial_condition = [ (self.set_single_primary_particle, E, corsika_id, pdg_id) ] self._phi0 *= 0.0 else: self._restore_initial_condition.append( (self.set_single_primary_particle, E, corsika_id, pdg_id) ) egrid = self._energy_grid.c ebins = self._energy_grid.b ewidths = self._energy_grid.w info( 3, ( f"superposition: n_protons={n_protons}, n_neutrons={n_neutrons}, " + f"energy per nucleon={En:5.3g} GeV" ), ) cenbin = np.argwhere(En < ebins)[0][0] - 1 # Equalize the first three moments for 3 normalizations around the central # bin emat = np.vstack( ( ewidths[cenbin - 1 : cenbin + 2], ewidths[cenbin - 1 : cenbin + 2] * egrid[cenbin - 1 : cenbin + 2], ewidths[cenbin - 1 : cenbin + 2] * egrid[cenbin - 1 : cenbin + 2] ** 2, ) ) if n_nucleons == 0: # This case handles other exotic projectiles b_particle = np.array([1.0, En, En**2]) lidx = self.pman[pdg_id].lidx with warnings.catch_warnings(): warnings.simplefilter("ignore") self._phi0[lidx + cenbin - 1 : lidx + cenbin + 2] += solve( emat, b_particle ) return if n_protons > 0: b_protons = np.array([n_protons, En * n_protons, En**2 * n_protons]) p_lidx = self.pman[2212].lidx with warnings.catch_warnings(): warnings.simplefilter("ignore") self._phi0[p_lidx + cenbin - 1 : p_lidx + cenbin + 2] += solve( emat, b_protons ) if n_neutrons > 0: b_neutrons = np.array([n_neutrons, En * n_neutrons, En**2 * n_neutrons]) n_lidx = self.pman[2112].lidx with warnings.catch_warnings(): warnings.simplefilter("ignore") self._phi0[n_lidx + cenbin - 1 : n_lidx + cenbin + 2] += solve( emat, b_neutrons )
[docs] def set_initial_spectrum(self, spectrum, pdg_id, append=False): """Set a user-defined spectrum for an arbitrary species as initial condition. This function is an equivalent to :func:`set_single_primary_particle`. It allows to define an arbitrary spectrum for each available particle species as initial condition for the integration. Set the `append` argument to `True` for subsequent species to define initial spectra combined from different particles. The (differential) spectrum has to be distributed on the energy grid as dN/dptot, i.e. divided by the bin widths and with the total momentum units in GeV(/c). Args: spectrum (np.array): spectrum dN/dptot pdg_id (int): PDG ID in case of a particle """ info(2, f"PDG ID {pdg_id}") if not append: self._restore_initial_condition = [ (self.set_initial_spectrum, spectrum, pdg_id, append) ] self._phi0 *= 0 else: self._restore_initial_condition.append( (self.set_initial_spectrum, spectrum, pdg_id, append) ) if len(spectrum) != self.dim: raise Exception("Lengths of spectrum and energy grid do not match.") self._phi0[self.pman[pdg_id].lidx : self.pman[pdg_id].uidx] += spectrum
[docs] def set_density_model(self, density_model_or_config): """Sets model of the atmosphere. To choose, for example, a CORSIKA parametrization for the Southpole in January, do the following:: mceq_instance.set_density_model(("CORSIKA", ("PL_SouthPole", "January"))) More details about the choices can be found in :mod:`MCEq.geometry.density_profiles`.Calling this method will issue a recalculation of the interpolation and the integration path. From version 1.2 and above, the `density_model_or_config` parameter can be a reference to an instance of a density class directly. The class has to be derived either from :class:`MCEq.geometry.density_profiles.EarthsAtmosphere` or :class:`MCEq.geometry.density_profiles.GeneralizedTarget`. Args: density_model_or_config (obj or tuple of strings): (parametrization type, arguments) """ import MCEq.geometry.density_profiles as dprof # Check if string arguments or an instance of the density class is provided if not isinstance( density_model_or_config, (dprof.EarthsAtmosphere, dprof.GeneralizedTarget) ): base_model, model_config = density_model_or_config available_models = [ "MSIS00", "MSIS00_IC", "CORSIKA", "AIRS", "Isothermal", "GeneralizedTarget", ] if base_model not in available_models: info( 0, "Unknown density model. Available choices are:\n", "\n".join(available_models), ) raise ValueError("Choose a different profile.") info(1, "Setting density profile to", base_model, model_config) if base_model == "MSIS00": self.density_model = dprof.MSIS00Atmosphere(*model_config) elif base_model == "MSIS00_IC": self.density_model = dprof.MSIS00IceCubeCentered(*model_config) elif base_model == "CORSIKA": self.density_model = dprof.CorsikaAtmosphere(*model_config) elif base_model == "AIRS": self.density_model = dprof.AIRSAtmosphere(*model_config) elif base_model == "Isothermal": self.density_model = dprof.IsothermalAtmosphere(*model_config) elif base_model == "GeneralizedTarget": self.density_model = dprof.GeneralizedTarget() else: self.density_model = density_model_or_config if self.theta_deg is not None and isinstance( self.density_model, dprof.EarthsAtmosphere ): if self.theta_deg is None: info(1, "Using default zenith angle theta=0.") self.set_theta_deg(0) else: self.set_theta_deg(self.theta_deg) elif isinstance(self.density_model, dprof.GeneralizedTarget): self.integration_path = None else: raise ValueError(f"Density model {self.density_model} not supported.")
# TODO: Make the pman aware of that density might have changed and # indices as well # self.pmod._gen_list_of_particles()
[docs] def set_theta_deg(self, theta_deg): """Sets zenith angle :math:`\\theta` as seen from a detector. Currently only 'down-going' angles (0-90 degrees) are supported. Args: theta_deg (float): zenith angle in the range 0-90 degrees """ import MCEq.geometry.density_profiles as dprof info(2, f"Zenith angle {theta_deg:6.2f}") if isinstance(self.density_model, dprof.GeneralizedTarget): raise Exception("GeneralizedTarget does not support angles.") if self.density_model.theta_deg == theta_deg: info(2, "Theta selection correponds to cached value, skipping calc.") return self.density_model.set_theta(theta_deg) self.integration_path = None
[docs] def inject_ddm(self, ddm): """Set a DDM object as interaction model. The argument requires a DDM model object. Calling `set_interaction_model` overwrites DDM with a different model. """ from .ddm import isospin_partners, isospin_symmetries injected = [] for (prim, sec), mati in ddm.ddm_matrices(self).items(): info(5, f"Injecting DDM {prim} --> {sec}") iso_part = isospin_partners[prim] self.pman[prim].hadr_yields[self.pman[sec]] = np.asarray(mati) info(5, "Injecting isopart", iso_part, isospin_symmetries[iso_part][sec]) self.pman[iso_part].hadr_yields[ self.pman[isospin_symmetries[iso_part][sec]] ] = np.asarray(mati) injected.append( ((prim, sec), (iso_part, isospin_symmetries[iso_part][sec])) ) if config.debug_level > 2: s = "DDM matrices injected into MCEq:\n" for (prim, sec), (iprim, isec) in injected: s += f"\t{prim}-->{sec}, isospin: {iprim} --> {isec}\n" print(s) self.int_m, self.dec_m = self.matrix_builder.construct_matrices( skip_decay_matrix=False )
[docs] def set_mod_pprod(self, prim_pdg, sec_pdg, x_func, x_func_args, delay_init=False): """Sets combination of projectile/secondary for error propagation. The production spectrum of ``sec_pdg`` in interactions of ``prim_pdg`` is modified according to the function passed to :func:`InteractionYields.init_mod_matrix` Args: prim_pdg (int): interacting (primary) particle PDG ID sec_pdg (int): secondary particle PDG ID x_func (object): reference to function x_func_args (tuple): arguments passed to ``x_func`` delay_init (bool): Prevent init of mceq matrices if you are planning to add more modifications """ info(1, f"{prim_pdg}/{sec_pdg}, {sec_pdg}, {x_func.__name__}, {x_func_args!s}") init = self._interactions._set_mod_pprod(prim_pdg, sec_pdg, x_func, x_func_args) # Need to regenerate matrices completely return int(init)
[docs] def unset_mod_pprod(self, dont_fill=False): """Removes modifications from :func:`MCEqRun.set_mod_pprod`. Args: skip_fill (bool): If `true` do not regenerate matrices (has to be done at a later step by hand) """ from collections import defaultdict info(1, "Particle production modifications reset to defaults.") self._interactions.mod_pprod = defaultdict(dict) # Need to regenerate matrices completely if not dont_fill: self.regenerate_matrices()
[docs] def regenerate_matrices(self, skip_decay_matrix=False): """Call this function after applying particle prod. modifications aka Barr parameters""" # TODO: Not all particles need to be reset and there is some performance loss # This can be optmized by refreshing only the particles that change or through # lazy evaluation, i.e. hadronic channels dict. calls data.int..get_matrix # on demand self.pman.set_interaction_model(self._int_cs, self._interactions, force=True) self._resize_vectors_and_restore() self.int_m, self.dec_m = self.matrix_builder.construct_matrices( skip_decay_matrix=skip_decay_matrix )
[docs] def solve(self, int_grid=None, grid_var="X", **kwargs): """Launches the solver. The setting `integrator` in the config file decides which solver to launch. Args: int_grid (list): list of depths at which results are recorded grid_var (str): Can be depth `X` or something else (currently only `X` supported) kwargs (dict): Arguments are passed directly to the solver methods. """ info(2, f"Launching {config.integrator} solver") if not kwargs.pop("skip_integration_path", False): if int_grid is not None and np.any(np.diff(int_grid) < 0): raise Exception( "The X values in int_grid are required to be strickly", "increasing.", ) # Calculate integration path if not yet happened self._calculate_integration_path(int_grid, grid_var) else: info(2, "Warning: integration path calculation skipped.") phi0 = np.copy(self._phi0) nsteps, dX, rho_inv, grid_idcs = self.integration_path info(2, f"for {nsteps} integration steps.") import MCEq.solvers start = time() if config.kernel_config.lower() == "numpy": kernel = MCEq.solvers.solv_numpy args = (nsteps, dX, rho_inv, self.int_m, self.dec_m, phi0, grid_idcs) elif config.kernel_config.lower() == "accelerate": kernel = MCEq.solvers.solv_spacc_sparse import MCEq.spacc as spacc try: if not np.array_equal(self._spacc_dec_m.data, self.dec_m.data): self._spacc_dec_m = spacc.SpaccMatrix(self.dec_m) if not np.array_equal(self._spacc_int_m.data, self.int_m.data): self._spacc_int_m = spacc.SpaccMatrix(self.int_m) except AttributeError: info(10, "Matrices not yet in Accelerate format") self._spacc_int_m = spacc.SpaccMatrix(self.int_m) self._spacc_dec_m = spacc.SpaccMatrix(self.dec_m) args = ( nsteps, dX, rho_inv, self._spacc_int_m, self._spacc_dec_m, phi0, grid_idcs, ) elif config.kernel_config.lower() == "cuda": kernel = MCEq.solvers.solv_CUDA_sparse try: self.cuda_context.set_matrices(self.int_m, self.dec_m) except AttributeError: from MCEq.solvers import CUDASparseContext self.cuda_context = CUDASparseContext( self.int_m, self.dec_m, device_id=self._cuda_device ) args = (nsteps, dX, rho_inv, self.cuda_context, phi0, grid_idcs) elif config.kernel_config.lower() == "mkl": kernel = MCEq.solvers.solv_MKL_sparse args = (nsteps, dX, rho_inv, self.int_m, self.dec_m, phi0, grid_idcs) else: raise Exception(f"Unsupported integrator setting '{config.kernel_config}'.") self._solution, self.grid_sol = kernel(*args) if isinstance(self.grid_sol, list): self.grid_sol = np.asarray(self.grid_sol) info(2, f"time elapsed during integration: {time() - start:5.2f}sec")
[docs] def solve_from_integration_path(self, nsteps, dX, rho_inv, grid_idcs): """Launches the solver directly for parameters of the integration path. The helper function is useful if you want to skip the calculation of the integration path every time. This function is intended for expert use and is not required for normal operation. The parameters can be obtained after calling _calculate_integration_path with correct settings for density and angle parameters:: nsteps, dX, rho_inv, grid_idcs = self.integration_path Args: phi0 (np.array): initial condition nsteps (int): number of integration steps dX (list): the delta_X's rho_inv (list): the inverse of the density at each step grid_idcs (list): list of steps at which the solution is dumped into `grid_sol` """ info(2, "Launching {0} solver".format(config.integrator)) info(2, "for {0} integration steps.".format(nsteps)) import MCEq.solvers start = time() phi0 = np.copy(self._phi0) if config.kernel_config.lower() == "numpy": kernel = MCEq.solvers.solv_numpy args = (nsteps, dX, rho_inv, self.int_m, self.dec_m, phi0, grid_idcs) elif config.kernel_config.lower() == "accelerate": kernel = MCEq.solvers.solv_spacc_sparse import MCEq.spacc as spacc try: if not np.array_equal(self._spacc_dec_m.data, self.dec_m.data): self._spacc_dec_m = spacc.SpaccMatrix(self.dec_m) if not np.array_equal(self._spacc_int_m.data, self.int_m.data): self._spacc_int_m = spacc.SpaccMatrix(self.int_m) except AttributeError: info(10, "Matrices not yet in Accelerate format") self._spacc_int_m = spacc.SpaccMatrix(self.int_m) self._spacc_dec_m = spacc.SpaccMatrix(self.dec_m) args = ( nsteps, dX, rho_inv, self._spacc_int_m, self._spacc_dec_m, phi0, grid_idcs, ) args = ( nsteps, dX, rho_inv, self._spacc_int_m, self._spacc_dec_m, phi0, grid_idcs, ) elif config.kernel_config.lower() == "cuda": kernel = MCEq.solvers.solv_CUDA_sparse try: self.cuda_context.set_matrices(self.int_m, self.dec_m) except AttributeError: from MCEq.solvers import CUDASparseContext self.cuda_context = CUDASparseContext( self.int_m, self.dec_m, device_id=self._cuda_device ) args = (nsteps, dX, rho_inv, self.cuda_context, phi0, grid_idcs) elif config.kernel_config.lower() == "mkl": kernel = MCEq.solvers.solv_MKL_sparse args = (nsteps, dX, rho_inv, self.int_m, self.dec_m, phi0, grid_idcs) else: raise Exception( "Unsupported integrator setting '{0}'.".format(config.kernel_config) ) self._solution, self.grid_sol = kernel(*args) info(2, "time elapsed during integration: {0:5.2f}sec".format(time() - start))
def _calculate_integration_path(self, int_grid, grid_var, force=False): if ( self.integration_path and np.all(int_grid == self.int_grid) and np.all(self.grid_var == grid_var) and self._cached_leading_process == config.leading_process and not force ): info(5, "skipping calculation.") return self._cached_leading_process = config.leading_process self.int_grid, self.grid_var = int_grid, grid_var if grid_var != "X": raise NotImplementedError( "Grid variables other than the depth X not supported." ) max_X = self.density_model.max_X ri = self.density_model.r_X2rho max_lint = self.matrix_builder.max_lint max_ldec = self.matrix_builder.max_ldec dXmax = config.dXmax info(2, "X_surface = {0:7.2f}g/cm2".format(max_X)) dX_vec = [] rho_inv_vec = [] X = config.X_start if int_grid is not None and min(int_grid) < X: raise ValueError( "Steps in int_grid must be larger than mceq_config.X_start." ) step = 0 grid_step = 0 grid_idcs = [] assert config.leading_process in [ "auto", "decays", "interactions", ], "Set leading process to auto, decays or interactions" if config.leading_process == "decays": info(3, "using decays as leading eigenvalues") def delta_X(X, inv_rho): return min(config.stability_margin / (max_ldec * inv_rho), dXmax) elif config.leading_process == "interactions": info(2, "using interactions as leading eigenvalues") def delta_X(X, inv_rho): return min(config.stability_margin / max_lint, dXmax) else: # This is the case for auto setting. # If no particles in eqn system force decays # as leading eigenvalues if np.allclose(max_lint, 0.0): def delta_X(X, inv_rho): return min(config.stability_margin / (max_ldec * inv_rho), dXmax) else: def delta_X(X, inv_rho): dX = min( config.stability_margin / (max_ldec * inv_rho), config.stability_margin / max_lint, dXmax, ) # if dX / self.density_model.max_X < 1e-7: # raise Exception( # "Stiffness warning: dX <= 1e-7. Check configuration or" # + "manually call MCEqRun._calculate_integration_path(" # + 'int_grid, "X", force=True).' # ) return dX enable_int_grid = np.any(int_grid) len_int_grid = len(int_grid) if enable_int_grid else 0 while X < max_X: inv_rho = ri(X) dX = delta_X(X, inv_rho) if ( enable_int_grid and (grid_step < len_int_grid) and (X + dX >= int_grid[grid_step]) ): dX = int_grid[grid_step] - X grid_idcs.append(step) grid_step += 1 dX_vec.append(dX) rho_inv_vec.append(inv_rho) X += dX step += 1 # Integrate dX_vec = np.array(dX_vec, dtype=config.floatlen) rho_inv_vec = np.array(rho_inv_vec, dtype=config.floatlen) self.integration_path = len(dX_vec), dX_vec, rho_inv_vec, grid_idcs
[docs] def n_particles(self, label, grid_idx=None, min_energy_cutoff=1e-1): """Returns number of particles of type `label` at a grid step above an energy threshold for counting. Args: label (str): Particle name grid_idx (int): Depth grid index (for profiles) min_energy_cutoff (float): Energy threshold > mceq_config.e_min """ ie_min = np.argmin( np.abs(self.e_bins - self.e_bins[self.e_bins >= min_energy_cutoff][0]) ) _e = self.e_bins[ie_min] _e_n = self.e_bins[ie_min + 1] _e_m = self.e_grid[ie_min] info( 10, f"Energy cutoff for particle number calculation {_e:4.3e} GeV", ) info( 15, f"First bin is between {_e:3.2e} and {_e_n:3.2e} with midpoint {_e_m:3.2e}", ) return np.sum( self.get_solution(label, mag=0, integrate=True, grid_idx=grid_idx)[ie_min:] )
[docs] def n_mu(self, grid_idx=None, min_energy_cutoff=1e-1): """Returns the number of positive and negative muons at a grid step above `min_energy_cutoff`. Args: grid_idx (int): Depth grid index (for profiles) min_energy_cutoff (float): Energy threshold > mceq_config.e_min """ return self.n_particles( "total_mu+", grid_idx=grid_idx, min_energy_cutoff=min_energy_cutoff ) + self.n_particles( "total_mu-", grid_idx=grid_idx, min_energy_cutoff=min_energy_cutoff )
[docs] def n_e(self, grid_idx=None, min_energy_cutoff=1e-1): """Returns the number of electrons plus positrons at a grid step above `min_energy_cutoff`. Args: grid_idx (int): Depth grid index (for profiles) min_energy_cutoff (float): Energy threshold > mceq_config.e_min """ return self.n_particles( "e+", grid_idx=grid_idx, min_energy_cutoff=min_energy_cutoff ) + self.n_particles( "e-", grid_idx=grid_idx, min_energy_cutoff=min_energy_cutoff )
[docs] def z_factor( self, projectile_pdg, secondary_pdg, definition="primary_e", min_energy=0.3, use_cs_scaling=True, ): """Energy dependent Z-factor according to Thunman et al. (1996)""" proj = self.pman[projectile_pdg] sec = self.pman[secondary_pdg] if not proj.is_projectile: raise Exception(f"{proj.name} is not a projectile particle.") info(10, f"Computing e-dependent Zfactor for {proj.name} -> {sec.name}") if not proj.is_secondary(sec): raise Exception(f"{sec.name} is not a secondary particle of {proj.name}.") if proj == 2112: nuc_flux = self.pmodel.p_and_n_flux(self.e_grid)[2] else: nuc_flux = self.pmodel.p_and_n_flux(self.e_grid)[1] zfac = np.zeros(self.dim) smat = proj.hadr_yields[sec] proj_cs = proj.prod_cross_section() zfac = np.zeros_like(self.e_grid) if config.has_cuda: import cupy smat = cupy.asnumpy(smat) proj_cs = cupy.asnumpy(proj_cs) # Definition wrt CR energy (different from Thunman) on x-axis min_idx = 0 if definition == "primary_e": for p_eidx, e in enumerate(self.e_grid): if e < min_energy: min_idx = p_eidx continue nuc_fac = nuc_flux[p_eidx] / nuc_flux[min_idx : p_eidx + 1] assert use_cs_scaling is False, ( f"cs_scaling has when definition = {definition}" ) cs_fac = 1.0 zfac[p_eidx] = np.sum( smat[min_idx : p_eidx + 1, p_eidx] * nuc_fac * cs_fac ) return zfac else: # Like in Thunman et al. 1996 for p_eidx, e in enumerate(self.e_grid): if e < min_energy: continue min_idx = p_eidx nuc_fac = nuc_flux[p_eidx] / nuc_flux[min_idx : p_eidx + 1] if use_cs_scaling: cs_fac = np.zeros(p_eidx - min_idx + 1) old_settings = np.seterr(all="ignore") res = proj_cs[p_eidx] / proj_cs[min_idx : p_eidx + 1] np.seterr(**old_settings) cs_fac[(res > 0) & np.isfinite(res)] = res[ (res > 0) & np.isfinite(res) ] else: cs_fac = 1.0 zfac[p_eidx] = np.sum(smat[p_eidx, p_eidx:] * nuc_fac * cs_fac) return zfac
[docs] def decay_z_factor(self, parent_pdg, child_pdg): """Energy dependent Z-factor according to Lipari (1993).""" proj = self.pman[parent_pdg] sec = self.pman[child_pdg] if proj.is_stable: raise Exception(f"{proj.name} does not decay.") info( 10, f"Computing e-dependent decay Zfactor for {proj.name} -> {sec.name}", ) if not proj.is_child(sec): raise Exception(f"{sec.name} is not a a child particle of {proj.name}.") cr_gamma = self.pmodel.nucleon_gamma(self.e_grid) zfac = np.zeros(self.dim) zfac = np.zeros_like(self.e_grid) for p_eidx, e in enumerate(self.e_grid): # if e < min_energy: # min_idx = p_eidx + 1 # continue xlab, xdist = proj.dNdec_dxlab(e, sec) zfac[p_eidx] = trapz(xlab ** (-cr_gamma[p_eidx] - 2.0) * xdist, x=xlab) return zfac
[docs] class MatrixBuilder: """This class constructs the interaction and decay matrices.""" def __init__(self, particle_manager): self._pman = particle_manager self._energy_grid = self._pman._energy_grid self.int_m = None self.dec_m = None self._construct_differential_operator()
[docs] def construct_matrices(self, skip_decay_matrix=False): r"""Constructs the matrices for calculation. These are: - :math:`\boldsymbol{M}_{int} = (-\boldsymbol{1} + \boldsymbol{C}){\boldsymbol{\Lambda}}_{int}`, - :math:`\boldsymbol{M}_{dec} = (-\boldsymbol{1} + \boldsymbol{D}){\boldsymbol{\Lambda}}_{dec}`. For debug_levels >= 2 some general information about matrix shape and the number of non-zero elements is printed. The intermediate matrices :math:`\boldsymbol{C}` and :math:`\boldsymbol{D}` are deleted afterwards to save memory. Set the ``skip_decay_matrix`` flag to avoid recreating the decay matrix. This is not necessary if, for example, particle production is modified, or the interaction model is changed. Args: skip_decay_matrix (bool): Omit re-creating D matrix """ from itertools import product info( 3, f"Start filling matrices. Skip_decay_matrix = {skip_decay_matrix}", ) self._fill_matrices(skip_decay_matrix=skip_decay_matrix) cparts = self._pman.cascade_particles # interaction part # -I + C # In first interaction mode it is just C self.max_lint = 0.0 for parent, child in product(cparts, cparts): idx = (child.mceqidx, parent.mceqidx) # Main diagonal if child.mceqidx == parent.mceqidx and parent.can_interact: # Subtract unity from the main diagonals info(10, "subtracting main C diagonal from", child.name, parent.name) self.C_blocks[idx][np.diag_indices(self.dim)] -= 1.0 if idx in self.C_blocks: # Multiply with Lambda_int and keep track the maximal # interaction length for the calculation of integration steps self.max_lint = np.max( [self.max_lint, np.max(parent.inverse_interaction_length())] ) self.C_blocks[idx] *= np.asarray( parent.inverse_interaction_length(), dtype=config.floatlen ) if child.mceqidx == parent.mceqidx and parent.has_contloss: pid = abs(parent.pdg_id[0]) if config.enable_energy_loss: if ( pid == 13 or (config.enable_em_ion and pid == 11) or (config.generic_losses_all_charged and pid != 11) ): info(5, "Cont. loss for", parent.name) self.C_blocks[idx] += self.cont_loss_operator(parent.pdg_id) self.int_m = self._csr_from_blocks(self.C_blocks) # -I + D if not skip_decay_matrix or self.dec_m is None: self.max_ldec = 0.0 for parent, child in product(cparts, cparts): idx = (child.mceqidx, parent.mceqidx) # Main diagonal if child.mceqidx == parent.mceqidx and not parent.is_stable: # Subtract unity from the main diagonals info( 10, "subtracting main D diagonal from", child.name, parent.name ) self.D_blocks[idx][np.diag_indices(self.dim)] -= 1.0 if idx not in self.D_blocks: info(25, parent.pdg_id[0], child.pdg_id, "not in D_blocks") continue # Multiply with Lambda_dec and keep track of the # maximal decay length for the calculation of integration steps self.max_ldec = max( [self.max_ldec, np.max(parent.inverse_decay_length())] ) self.D_blocks[idx] *= np.asarray( parent.inverse_decay_length(), dtype=config.floatlen ) self.dec_m = self._csr_from_blocks(self.D_blocks) for mname, mat in [("C", self.int_m), ("D", self.dec_m)]: mat_density = float(mat.nnz) / float(np.prod(mat.shape)) info(5, f"{mname} Matrix info:") info(5, f" density : {mat_density:3.2%}") info(5, " shape : {0} x {1}".format(*mat.shape)) info(5, f" nnz : {mat.nnz}") info(10, " sum :", mat.sum()) info(3, "Done filling matrices.") return self.int_m, self.dec_m
def _average_operator(self, op_mat): """Averages the continuous loss operator by performing 1/max_step explicit euler steps""" n_steps = int(1.0 / config.loss_step_for_average) info( 10, f"Averaging continuous loss using {n_steps} intermediate steps.", ) op_step = np.eye(self._energy_grid.d) + op_mat * config.loss_step_for_average return np.linalg.matrix_power(op_step, n_steps) - np.eye(self._energy_grid.d)
[docs] def cont_loss_operator(self, pdg_id): """Returns continuous loss operator that can be summed with appropriate position in the C matrix.""" op_mat = -np.diag(1 / self._energy_grid.c).dot( self.op_matrix.dot(np.diag(self._pman[pdg_id].dEdX)) ) if config.average_loss_operator: return self._average_operator(op_mat) else: return op_mat
@property def dim(self): """Energy grid (dimension)""" return int(self._pman.dim) @property def dim_states(self): """Number of cascade particles times dimension of grid (dimension of the equation system)""" return int(self._pman.dim_states) def _zero_mat(self): """Returns a new square zero valued matrix with dimensions of grid.""" return np.zeros((self._pman.dim, self._pman.dim), dtype=config.floatlen) def _csr_from_blocks(self, blocks): """Construct a csr matrix from a dictionary of submatrices (blocks) Note:: It's super pain the a** to construct a properly indexed sparse matrix directly from the blocks, since bmat totally messes up the order. """ from scipy.sparse import csr_matrix new_mat = np.zeros((self.dim_states, self.dim_states), dtype=config.floatlen) for (c, p), d in six.iteritems(blocks): rc, rp = self._pman.mceqidx2pref[c], self._pman.mceqidx2pref[p] try: new_mat[rc.lidx : rc.uidx, rp.lidx : rp.uidx] = d except ValueError: _d = self.dim_states _n = rp.name _l = rp.lidx _u = rp.uidx _nc = rc.name _lc = rc.lidx _uc = rc.uidx raise Exception( "Dimension mismatch: matrix " + f"{_d}x{_d}, p={_n}:({_l},{_u}), c={_nc}:({_lc},{_uc})" ) return csr_matrix(new_mat) def _follow_chains(self, p, pprod_mat, p_orig, idcs, propmat, reclev=0): """Some recursive magic.""" info(40, reclev * "\t", "entering with", p.name) # print 'orig, p', p_orig.pdg_id, p.pdg_id for d in p.children: info(40, reclev * "\t", "following to", d.name) if not d.is_resonance: # print 'adding stuff', p_orig.pdg_id, p.pdg_id, d.pdg_id dprop = self._zero_mat() p._assign_decay_idx(d, idcs, d.hadridx, dprop) propmat[(d.mceqidx, p_orig.mceqidx)] += dprop.dot(pprod_mat) if config.debug_level >= 20: pstr = "res" dstr = "Mchain" if idcs == p.hadridx: pstr = "prop" dstr = "Mprop" info( 40, reclev * "\t", "setting {0}[({1},{3})->({2},{4})]".format( dstr, p_orig.name, d.name, pstr, "prop" ), ) if d.is_mixed or d.is_resonance: dres = self._zero_mat() p._assign_decay_idx(d, idcs, d.residx, dres) reclev += 1 self._follow_chains( d, dres.dot(pprod_mat), p_orig, d.residx, propmat, reclev ) else: info(20, reclev * "\t", "\t terminating at", d.name) def _fill_matrices(self, skip_decay_matrix=False): """Generates the interaction and decay matrices from scratch.""" from collections import defaultdict # Fill decay matrix blocks if not skip_decay_matrix or self.dec_m is None: # Initialize empty D matrix self.D_blocks = defaultdict(lambda: self._zero_mat()) for p in self._pman.cascade_particles: # Fill parts of the D matrix related to p as mother if not p.is_stable and bool(p.children) and not p.is_tracking: self._follow_chains( p, np.diag(np.ones(self.dim)).astype(config.floatlen), p, p.hadridx, self.D_blocks, reclev=0, ) else: info(20, p.name, "stable or not added to D matrix") # Initialize empty C blocks self.C_blocks = defaultdict(lambda: self._zero_mat()) for p in self._pman.cascade_particles: # if p doesn't interact, skip interaction matrices if not p.is_projectile: if p.is_hadron: info(1, f"No interactions by {p.name} ({p.pdg_id}).") continue for s in p.hadr_secondaries: # if s not in self.pman.cascade_particles: # print 'Doing nothing with', p.pdg_id, s.pdg_id # continue if not s.is_resonance: cmat = self._zero_mat() p._assign_hadr_dist_idx(s, p.hadridx, s.hadridx, cmat) self.C_blocks[(s.mceqidx, p.mceqidx)] += cmat cmat = self._zero_mat() p._assign_hadr_dist_idx(s, p.hadridx, s.residx, cmat) self._follow_chains(s, cmat, p, s.residx, self.C_blocks, reclev=1) def _construct_differential_operator(self): """Constructs a derivative operator for the contiuous losses. This implmentation uses a 6th-order finite differences operator, only depends on the energy grid. This is an operator for a sub-matrix of dimension (energy grid, energy grid) for a single particle. It can be likewise applied to all particle species. The dEdX values are applied later in ... """ # First rows of operator matrix (values are truncated at the edges # of a matrix.) diags_leftmost = [0, 1, 2, 3] coeffs_leftmost = [-11, 18, -9, 2] denom_leftmost = 6 diags_left_1 = [-1, 0, 1, 2, 3] coeffs_left_1 = [-3, -10, 18, -6, 1] denom_left_1 = 12 diags_left_2 = [-2, -1, 0, 1, 2, 3] coeffs_left_2 = [3, -30, -20, 60, -15, 2] denom_left_2 = 60 # Centered diagonals # diags = [-3, -2, -1, 1, 2, 3] # coeffs = [-1, 9, -45, 45, -9, 1] # denom = 60. diags = diags_left_2 coeffs = coeffs_left_2 denom = 60.0 # Last rows at the right of operator matrix diags_right_2 = [-d for d in diags_left_2[::-1]] coeffs_right_2 = [-d for d in coeffs_left_2[::-1]] denom_right_2 = denom_left_2 diags_right_1 = [-d for d in diags_left_1[::-1]] coeffs_right_1 = [-d for d in coeffs_left_1[::-1]] denom_right_1 = denom_left_1 diags_rightmost = [-d for d in diags_leftmost[::-1]] coeffs_rightmost = [-d for d in coeffs_leftmost[::-1]] denom_rightmost = denom_leftmost h = np.log(self._energy_grid.b[1:] / self._energy_grid.b[:-1]) dim_e = int(self._energy_grid.d) last = dim_e - 1 op_matrix = np.zeros((dim_e, dim_e), dtype=config.floatlen) op_matrix[0, np.asarray(diags_leftmost)] = np.asarray(coeffs_leftmost) / ( denom_leftmost * h[0] ) op_matrix[1, 1 + np.asarray(diags_left_1)] = np.asarray(coeffs_left_1) / ( denom_left_1 * h[1] ) op_matrix[2, 2 + np.asarray(diags_left_2)] = np.asarray(coeffs_left_2) / ( denom_left_2 * h[2] ) op_matrix[last, last + np.asarray(diags_rightmost)] = np.asarray( coeffs_rightmost ) / (denom_rightmost * h[last]) op_matrix[last - 1, last - 1 + np.asarray(diags_right_1)] = np.asarray( coeffs_right_1 ) / (denom_right_1 * h[last - 1]) op_matrix[last - 2, last - 2 + np.asarray(diags_right_2)] = np.asarray( coeffs_right_2 ) / (denom_right_2 * h[last - 2]) for row in range(3, dim_e - 3): op_matrix[row, row + np.asarray(diags)] = np.asarray(coeffs) / ( denom * h[row] ) self.op_matrix = op_matrix