Source code for higgs_dna.tools.chained_quantile

from higgs_dna.metaconditions import corrections
from higgs_dna.tools.xgb_loader import load_bdt

import json
from importlib import resources
import os
from typing import Any, Dict, List, Optional
import awkward
import numpy
import xgboost

import logging

logger = logging.getLogger(__name__)

corrections_dir = os.path.dirname(corrections.__file__)


[docs]class wrapped_xgb: def __init__( self, model: xgboost.Booster, scale: Optional[float], center: Optional[float], variables: Optional[List[str]], ) -> None: self._model = model self._scale = scale self._center = center self._variables = variables def __call__(self, array: numpy.ndarray) -> numpy.ndarray: asDMatrix = xgboost.DMatrix(array) out = self._model.predict(asDMatrix) return out * (self._scale or 1.0) + (self._center or 0.0) @property def scale(self) -> Optional[float]: return self._scale @property def center(self) -> Optional[float]: return self._center @property def variables(self) -> Optional[List[str]]: return self._variables
[docs]def create_evaluator( weights: str, scale: Optional[float] = None, center: Optional[float] = None, variables: Optional[List[str]] = None, **kwargs: Dict[Any, Any], ) -> wrapped_xgb: full_path_weights = f"{corrections_dir}/{weights}" model = load_bdt(full_path_weights) if model is None: raise RuntimeError(f"Could not load {full_path_weights}, check warnings!") return wrapped_xgb(model=model, scale=scale, center=center, variables=variables)
[docs]class ChainedQuantileRegression: def __init__( self, corrections_summary: str, SS_variables: List[str], correctShowerShapes: bool = True, correctIsolations: bool = True, correctPreshower: bool = False, ): with resources.open_text( "higgs_dna.metaconditions.corrections", corrections_summary ) as f: logger.debug(f"Loading corrections summary from {corrections_summary}") cq_config = json.load(f) self.transforms: Dict[str, Any] = {} for vargroup, varlist in cq_config.items(): if not correctIsolations and vargroup == "isolations": continue if not correctShowerShapes and vargroup == "shower_shapes": continue self.transforms[vargroup] = {} for varname, regions in varlist.items(): self.transforms[vargroup][varname] = {} for region, steps in regions.items(): if vargroup == "isolations": for stepname, config in steps.items(): keys = list(config.keys()) if "variables" in keys: config["variables"] = [ var.split(":=")[-1].strip() for var in config["variables"] ] if "weights_data" in keys and "weights_mc" in keys: if ( f"{stepname}_data" not in self.transforms[vargroup][varname] ): self.transforms[vargroup][varname][ f"{stepname}_data" ] = {} if ( f"{stepname}_mc" not in self.transforms[vargroup][varname] ): self.transforms[vargroup][varname][ f"{stepname}_mc" ] = {} config["weights"] = config["weights_data"] self.transforms[vargroup][varname][f"{stepname}_data"][ region ] = create_evaluator(**config) config["weights"] = config["weights_mc"] self.transforms[vargroup][varname][f"{stepname}_mc"][ region ] = create_evaluator(**config) else: if stepname not in self.transforms[vargroup][varname]: self.transforms[vargroup][varname][stepname] = {} self.transforms[vargroup][varname][stepname][ region ] = create_evaluator(**config) elif vargroup == "shower_shapes": self.transforms[vargroup][varname][region] = create_evaluator( **steps ) else: raise Exception( f"{vargroup} is not understood by ChainedQuantileRegression" ) self.ssvs = [ssv.split(":=")[-1].strip() for ssv in SS_variables]
[docs] def apply_shower_shapes( self, photons: awkward.Array, rho: awkward.Array, isEB: numpy.ndarray, isEE: numpy.ndarray, ) -> awkward.Array: xforms = self.transforms["shower_shapes"] photons["uncorr_r9"] = photons.r9 photons["uncorr_s4"] = photons.s4 photons["uncorr_sieie"] = photons.sieie photons["uncorr_sieip"] = photons.sieip photons["uncorr_etaWidth"] = photons.etaWidth photons["uncorr_phiWidth"] = photons.phiWidth # Now for the xgboosty bits. # r9 irho = self.ssvs.index("fixedGridRhoAll") stack_vars = [awkward.to_numpy(photons[name]) for name in self.ssvs[:irho]] stack_vars.append(awkward.to_numpy(rho)) stack_vars.extend( [awkward.to_numpy(photons[name]) for name in self.ssvs[irho + 1 :]] ) eval_vars = numpy.column_stack(stack_vars) eval_vars_eb = eval_vars[isEB] eval_vars_ee = eval_vars[~isEB] for var, xform in xforms.items(): npy = awkward.to_numpy(photons[var]) npy[isEB] = npy[isEB] + xform["EB"](eval_vars_eb) npy[isEE] = npy[isEE] + xform["EE"](eval_vars_ee) photons[var] = npy return photons
[docs] def apply_photon_isolation( self, photons: awkward.Array, rho: awkward.Array, isEB: numpy.ndarray, isEE: numpy.ndarray, ) -> awkward.Array: xforms = self.transforms["isolations"]["phoIso"] clf_mc = xforms["peak_tail_clfs_mc"] clf_data = xforms["peak_tail_clfs_data"] p2t = xforms["peak2tail"] morphing = xforms["morphing"] photons["uncorr_pfPhoIso03"] = photons.pfPhoIso03 # clfs (input variables are the same) clf_vars = clf_mc["EB"].variables irho = clf_vars.index("fixedGridRhoAll") clf_stack_vars = [awkward.to_numpy(photons[name]) for name in clf_vars[:irho]] clf_stack_vars.append(awkward.to_numpy(rho)) clf_stack_vars.extend( [awkward.to_numpy(photons[name]) for name in clf_vars[irho + 1 :]] ) clf_eval_vars = numpy.column_stack(clf_stack_vars) # conversion from T[-1,1] to probability [0,1] p_tail_data = numpy.ones((clf_eval_vars.shape[0],)) p_tail_mc = numpy.ones_like(p_tail_data) p_tail_data[isEB] = 1.0 / ( 1.0 + numpy.sqrt(2.0 / (1.0 + clf_data["EB"](clf_eval_vars[isEB])) - 1.0) ) p_tail_data[isEE] = 1.0 / ( 1.0 + numpy.sqrt(2.0 / (1.0 + clf_data["EE"](clf_eval_vars[isEE])) - 1.0) ) p_tail_mc[isEB] = 1.0 / ( 1.0 + numpy.sqrt(2.0 / (1.0 + clf_mc["EB"](clf_eval_vars[isEB])) - 1.0) ) p_tail_mc[isEE] = 1.0 / ( 1.0 + numpy.sqrt(2.0 / (1.0 + clf_mc["EE"](clf_eval_vars[isEE])) - 1.0) ) p_peak_data = 1 - p_tail_data p_peak_mc = 1 - p_tail_mc migration = numpy.random.uniform(size=clf_eval_vars.shape[0]) pfPhoIso = awkward.to_numpy(photons.pfPhoIso03) p_move_to_tail = (p_tail_data - p_tail_mc) / p_peak_mc p_move_to_peak = (p_peak_data - p_peak_mc) / p_tail_mc # peak2tail to_tail = ( (pfPhoIso == 0) & (p_tail_data > p_tail_mc) & (migration < p_move_to_tail) ) to_peak = ( (pfPhoIso > 0) & (p_peak_data > p_peak_mc) & (migration <= p_move_to_peak) ) p2t_vars = p2t["EB"].variables irho = p2t_vars.index("fixedGridRhoAll") irnd = p2t_vars.index("peak2tail_rnd") p2t_stack_vars = [awkward.to_numpy(photons[name]) for name in p2t_vars[:irho]] p2t_stack_vars.append(awkward.to_numpy(rho)) p2t_stack_vars.extend( [awkward.to_numpy(photons[name]) for name in p2t_vars[irho + 1 : irnd]] ) # https://github.com/cms-analysis/flashgg/blob/dev_legacy_runII/Taggers/plugins/DifferentialPhoIdInputsCorrector.cc#L301 p2t_stack_vars.append( numpy.random.uniform(low=0.01, high=0.99, size=clf_eval_vars.shape[0]) ) p2t_stack_vars.extend( [awkward.to_numpy(photons[name]) for name in p2t_vars[irnd + 1 :]] ) p2t_eval_vars = numpy.column_stack(p2t_stack_vars) if numpy.any(isEB & to_tail): pfPhoIso[isEB & to_tail] = p2t["EB"](p2t_eval_vars[isEB & to_tail]) if numpy.any(isEE & to_tail): pfPhoIso[isEE & to_tail] = p2t["EE"](p2t_eval_vars[isEE & to_tail]) pfPhoIso[to_peak] = 0 # update photon for morph photons["pfPhoIso03"] = pfPhoIso # morphing needs_morph = pfPhoIso > 0 morph_vars = morphing["EB"].variables irho = morph_vars.index("fixedGridRhoAll") morph_stack_vars = [ awkward.to_numpy(photons[name]) for name in morph_vars[:irho] ] morph_stack_vars.append(awkward.to_numpy(rho)) morph_stack_vars.extend( [awkward.to_numpy(photons[name]) for name in morph_vars[irho + 1 :]] ) morph_eval_vars = numpy.column_stack(morph_stack_vars) pfPhoIso[isEB & needs_morph] = pfPhoIso[isEB & needs_morph] + morphing["EB"]( morph_eval_vars[isEB & needs_morph] ) pfPhoIso[isEE & needs_morph] = pfPhoIso[isEE & needs_morph] + morphing["EE"]( morph_eval_vars[isEE & needs_morph] ) photons["pfPhoIso03"] = pfPhoIso return photons
[docs] def apply_charged_isolation( self, photons: awkward.Array, rho: awkward.Array, isEB: numpy.ndarray, isEE: numpy.ndarray, ) -> awkward.Array: xforms = self.transforms["isolations"]["chIso"] clf_mc = xforms["peak_tail_clfs_mc"] clf_data = xforms["peak_tail_clfs_data"] p2t = xforms["chIso_peak2tail"] p2t_worst = xforms["chIsoWorst_peak2tail"] morphing = xforms["chIso_morphing"] morphing_worst = xforms["chIsoWorst_morphing"] photons["uncorr_pfChargedIsoPFPV"] = photons.pfChargedIsoPFPV photons["uncorr_pfChargedIsoWorstVtx"] = photons.pfChargedIsoWorstVtx # clfs (input variables are the same) clf_vars = clf_mc["EB"].variables irho = clf_vars.index("fixedGridRhoAll") clf_stack_vars = [awkward.to_numpy(photons[name]) for name in clf_vars[:irho]] clf_stack_vars.append(awkward.to_numpy(rho)) clf_stack_vars.extend( [awkward.to_numpy(photons[name]) for name in clf_vars[irho + 1 :]] ) clf_eval_vars = numpy.column_stack(clf_stack_vars) # ---Charge isolations # ----------------+-------------------------+ # ChIsoWorst tail | 01 | 11 | # ChIsoWorst peak | 00 | X | # ----------------+-------------------------+ # | ChIso peak | ChIso tail | # +-------------------------+ probs_data = numpy.ones((clf_eval_vars.shape[0], 3)) probs_mc = numpy.ones_like(probs_data) # [[00, 01, 11], ...] probs_data[isEB] = clf_data["EB"](clf_eval_vars[isEB]) probs_data[isEE] = clf_data["EE"](clf_eval_vars[isEE]) probs_mc[isEB] = clf_mc["EB"](clf_eval_vars[isEB]) probs_mc[isEE] = clf_mc["EE"](clf_eval_vars[isEE]) migration = numpy.random.uniform(size=clf_eval_vars.shape[0]) migration_subcat = numpy.random.uniform(size=clf_eval_vars.shape[0]) pfChgIso = awkward.to_numpy(photons.pfChargedIsoPFPV) pfChgIsoWorst = awkward.to_numpy(photons.pfChargedIsoWorstVtx) can_migrate = probs_mc > probs_data should_migrate = migration[:, None] < (1 - probs_data / probs_mc) # peak2tail to_00 = ( (pfChgIso == 0) & (pfChgIsoWorst == 0) & can_migrate[:, 0] & should_migrate[:, 0] ) to_01 = ( (pfChgIso == 0) & (pfChgIsoWorst > 0) & can_migrate[:, 1] & should_migrate[:, 1] ) to_11 = ( (pfChgIso > 0) & (pfChgIsoWorst > 0) & can_migrate[:, 2] & should_migrate[:, 2] ) p2t_vars = p2t["EB"].variables irho = p2t_vars.index("fixedGridRhoAll") irnd = p2t_vars.index("peak2tail_chIso_rnd") p2t_stack_vars = [awkward.to_numpy(photons[name]) for name in p2t_vars[:irho]] p2t_stack_vars.append(awkward.to_numpy(rho)) p2t_stack_vars.extend( [awkward.to_numpy(photons[name]) for name in p2t_vars[irho + 1 : irnd]] ) # https://github.com/cms-analysis/flashgg/blob/dev_legacy_runII/Taggers/plugins/DifferentialPhoIdInputsCorrector.cc#L301 p2t_stack_vars.append( numpy.random.uniform(low=0.01, high=0.99, size=clf_eval_vars.shape[0]) ) p2t_stack_vars.extend( [awkward.to_numpy(photons[name]) for name in p2t_vars[irnd + 1 :]] ) # worst p2t_worst_vars = p2t_worst["EB"].variables irho_worst = p2t_worst_vars.index("fixedGridRhoAll") irnd_worst = p2t_worst_vars.index("peak2tail_chIsoWorst_rnd") p2t_worst_stack_vars = [ awkward.to_numpy(photons[name]) for name in p2t_worst_vars[:irho_worst] ] p2t_worst_stack_vars.append(awkward.to_numpy(rho)) p2t_worst_stack_vars.extend( [ awkward.to_numpy(photons[name]) for name in p2t_worst_vars[irho_worst + 1 : irnd_worst] ] ) # https://github.com/cms-analysis/flashgg/blob/dev_legacy_runII/Taggers/plugins/DifferentialPhoIdInputsCorrector.cc#L301 p2t_worst_stack_vars.append( numpy.random.uniform(low=0.01, high=0.99, size=clf_eval_vars.shape[0]) ) p2t_worst_stack_vars.extend( [ awkward.to_numpy(photons[name]) for name in p2t_worst_vars[irnd_worst + 1 :] ] ) def get_z(cat1: int, cat2: int) -> numpy.ndarray: return (probs_data[:, cat1] - probs_mc[:, cat1]) / ( probs_mc[:, cat2] - probs_data[:, cat2] ) p2t_eval_vars = numpy.column_stack(p2t_stack_vars) p2t_worst_eval_vars = numpy.column_stack(p2t_worst_stack_vars) # 00 # 00 -> 01 to_00_01 = to_00 & (~can_migrate[:, 1]) & can_migrate[:, 2] to_00_01_EB = isEB & to_00_01 to_00_01_EE = isEE & to_00_01 if numpy.any(to_00_01_EB): pfChgIsoWorst[to_00_01_EB] = p2t_worst["EB"]( p2t_worst_eval_vars[to_00_01_EB] ) if numpy.any(to_00_01_EE): pfChgIsoWorst[to_00_01_EE] = p2t_worst["EE"]( p2t_worst_eval_vars[to_00_01_EE] ) # 00 -> 11 to_00_11 = to_00 & can_migrate[:, 1] & (~can_migrate[:, 2]) to_00_11_EB = isEB & to_00_11 to_00_11_EE = isEE & to_00_11 if numpy.any(to_00_11_EB): pfChgIso[to_00_11_EB] = p2t["EB"](p2t_eval_vars[to_00_11_EB]) pfChgIsoWorst[to_00_11_EB] = p2t_worst["EB"]( p2t_worst_eval_vars[to_00_11_EB] ) if numpy.any(to_00_11_EE): pfChgIso[to_00_11_EE] = p2t["EE"](p2t_eval_vars[to_00_11_EE]) pfChgIsoWorst[to_00_11_EE] = p2t_worst["EE"]( p2t_worst_eval_vars[to_00_11_EE] ) # 00 -> either to_00_either = to_00 & (~can_migrate[:, 1]) & (~can_migrate[:, 2]) to_00_either_EB = isEB & to_00_either to_00_either_EE = isEE & to_00_either which_01_11 = migration_subcat <= get_z(1, 0) if numpy.any(to_00_either_EB & which_01_11): pfChgIsoWorst[to_00_either_EB & which_01_11] = p2t_worst["EB"]( p2t_worst_eval_vars[to_00_either_EB & which_01_11] ) if numpy.any(to_00_either_EE & which_01_11): pfChgIsoWorst[to_00_either_EE & which_01_11] = p2t_worst["EE"]( p2t_worst_eval_vars[to_00_either_EE & which_01_11] ) if numpy.any(to_00_either_EB & (~which_01_11)): pfChgIso[to_00_either_EB & (~which_01_11)] = p2t["EB"]( p2t_eval_vars[to_00_either_EB & (~which_01_11)] ) pfChgIsoWorst[to_00_either_EB & (~which_01_11)] = p2t_worst["EB"]( p2t_worst_eval_vars[to_00_either_EB & (~which_01_11)] ) if numpy.any(to_00_either_EE & (~which_01_11)): pfChgIso[to_00_either_EE & (~which_01_11)] = p2t["EE"]( p2t_eval_vars[to_00_either_EE & (~which_01_11)] ) pfChgIsoWorst[to_00_either_EE & (~which_01_11)] = p2t_worst["EE"]( p2t_worst_eval_vars[to_00_either_EE & (~which_01_11)] ) # 01 # 01 -> 00 to_01_00 = to_01 & (~can_migrate[:, 0]) & can_migrate[:, 2] pfChgIso[to_01_00] = 0 pfChgIsoWorst[to_01_00] = 0 # 01 -> 11 to_01_11 = to_01 & can_migrate[:, 0] & (~can_migrate[:, 2]) to_01_11_EB = isEB & to_01_11 to_01_11_EE = isEE & to_01_11 if numpy.any(to_01_11_EB): pfChgIso[to_01_11_EB] = p2t["EB"](p2t_eval_vars[to_01_11_EB]) if numpy.any(to_01_11_EE): pfChgIso[to_01_11_EE] = p2t["EE"](p2t_eval_vars[to_01_11_EE]) # 01 -> either to_01_either = to_01 & (~can_migrate[:, 0]) & (~can_migrate[:, 2]) to_01_either_EB = isEB & to_01_either to_01_either_EE = isEE & to_01_either which_00_11 = migration_subcat <= get_z(0, 1) if numpy.any(to_01_either_EB & (~which_00_11)): pfChgIso[to_01_either_EB & (~which_00_11)] = p2t["EB"]( p2t_eval_vars[to_01_either_EB & (~which_00_11)] ) if numpy.any(to_01_either_EE & (~which_00_11)): pfChgIso[to_01_either_EE & (~which_00_11)] = p2t["EE"]( p2t_eval_vars[to_01_either_EE & (~which_00_11)] ) pfChgIsoWorst[to_01_either & which_01_11] = 0 # 11 # 11 -> 00 to_11_00 = to_11 & (~can_migrate[:, 0]) & can_migrate[:, 1] pfChgIso[to_01_00] = 0 pfChgIsoWorst[to_11_00] = 0 # 11 -> 01 to_11_01 = to_11 & can_migrate[:, 0] & (~can_migrate[:, 1]) pfChgIsoWorst[to_11_01] = 0 # 11 -> either to_11_either = to_00 & (~can_migrate[:, 0]) & (~can_migrate[:, 1]) which_00_01 = migration_subcat <= get_z(0, 2) pfChgIso[to_11_either] = 0 pfChgIsoWorst[to_11_either & which_00_01] = 0 # update photon for morph photons["pfChargedIsoPFPV"] = pfChgIso photons["pfChargedIsoWorstVtx"] = pfChgIsoWorst # tail morphing PV needs_morph = pfChgIso > 0 morph_vars = morphing["EB"].variables irho = morph_vars.index("fixedGridRhoAll") morph_stack_vars = [ awkward.to_numpy(photons[name]) for name in morph_vars[:irho] ] morph_stack_vars.append(awkward.to_numpy(rho)) morph_stack_vars.extend( [awkward.to_numpy(photons[name]) for name in morph_vars[irho + 1 :]] ) morph_eval_vars = numpy.column_stack(morph_stack_vars) pfChgIso[isEB & needs_morph] = pfChgIso[isEB & needs_morph] + morphing["EB"]( morph_eval_vars[isEB & needs_morph] ) pfChgIso[isEE & needs_morph] = pfChgIso[isEE & needs_morph] + morphing["EE"]( morph_eval_vars[isEE & needs_morph] ) photons["pfChargedIsoPFPV"] = pfChgIso # tail morphing worst vertex needs_morph_worst = pfChgIsoWorst > 0 morph_worst_vars = morphing_worst["EB"].variables irho = morph_worst_vars.index("fixedGridRhoAll") morph_worst_stack_vars = [ awkward.to_numpy(photons[name]) for name in morph_worst_vars[:irho] ] morph_worst_stack_vars.append(awkward.to_numpy(rho)) morph_worst_stack_vars.extend( [awkward.to_numpy(photons[name]) for name in morph_worst_vars[irho + 1 :]] ) morph_worst_eval_vars = numpy.column_stack(morph_worst_stack_vars) pfChgIsoWorst[isEB & needs_morph_worst] = pfChgIsoWorst[ isEB & needs_morph_worst ] + morphing_worst["EB"](morph_worst_eval_vars[isEB & needs_morph_worst]) pfChgIsoWorst[isEE & needs_morph_worst] = pfChgIsoWorst[ isEE & needs_morph_worst ] + morphing_worst["EE"](morph_worst_eval_vars[isEE & needs_morph_worst]) photons["pfChargedIsoWorstVtx"] = pfChgIsoWorst return photons
[docs] def apply(self, photons: awkward.Array, events: awkward.Array) -> awkward.Array: # We're going to work in flattened data within this # function. Less mind-bending. rho = awkward.ones_like(photons.pt) * events.Rho.fixedGridRhoAll counts = awkward.num(photons, axis=1) photons = awkward.flatten(photons) rho = awkward.flatten(rho) isEB = awkward.to_numpy(numpy.abs(photons.eta) < 1.5) isEE = ~isEB if "shower_shapes" in self.transforms: photons = self.apply_shower_shapes(photons, rho, isEB, isEE) if "isolations" in self.transforms: for key in self.transforms["isolations"].keys(): if "phoIso" == key: photons = self.apply_photon_isolation(photons, rho, isEB, isEE) if "chIso" == key: photons = self.apply_charged_isolation(photons, rho, isEB, isEE) return awkward.unflatten(photons, counts)