from higgs_dna.workflows.base import HggBaseProcessor
from higgs_dna.systematics import object_systematics as available_object_systematics
from higgs_dna.systematics import object_corrections as available_object_corrections
from higgs_dna.systematics import weight_systematics as available_weight_systematics
from higgs_dna.systematics import weight_corrections as available_weight_corrections
from higgs_dna.selections.photon_selections import photon_preselection
from higgs_dna.selections.lumi_selections import select_lumis
from higgs_dna.utils.dumping_utils import diphoton_list_to_pandas, dump_pandas
from higgs_dna.tools.SC_eta import add_photon_SC_eta
from typing import Any, Dict, List, Optional
import awkward as ak
import logging
import functools
import warnings
from coffea.analysis_tools import Weights
logger = logging.getLogger(__name__)
[docs]class DYStudiesProcessor(HggBaseProcessor):
def __init__(
self,
metaconditions: Dict[str, Any],
systematics: Dict[str, List[Any]] = None,
corrections: Dict[str, List[Any]] = None,
apply_trigger: bool = False,
output_location: Optional[str] = None,
taggers: Optional[List[Any]] = None,
skipCQR: bool = False,
skipJetVetoMap: bool = False,
year: Dict[str, List[str]] = None,
doDeco: bool = False,
output_format: str = "parquet"
) -> None:
super().__init__(
metaconditions,
systematics=systematics,
corrections=corrections,
apply_trigger=apply_trigger,
output_location=output_location,
taggers=taggers,
trigger_group=".*DoubleEG.*",
analysis="mainAnalysis",
skipCQR=skipCQR,
skipJetVetoMap=skipJetVetoMap,
year=year,
doDeco=doDeco,
output_format=output_format
)
self.trigger_group = ".*DoubleEG.*"
self.analysis = "mainAnalysis"
[docs] def postprocess(self, accumulant: Dict[Any, Any]) -> Any:
pass
[docs]class TagAndProbeProcessor(HggBaseProcessor):
def __init__(
self,
metaconditions: Dict[str, Any],
systematics: Dict[str, List[Any]] = None,
corrections: Optional[Dict[str, List[str]]] = None,
apply_trigger: bool = False,
output_location: Optional[str] = None,
taggers: Optional[List[Any]] = None,
skipCQR: bool = False,
skipJetVetoMap: bool = False,
year: Optional[Dict[str, List[str]]] = None,
doDeco: bool = False,
output_format: str = "parquet"
) -> None:
super().__init__(
metaconditions,
systematics=systematics,
corrections=corrections,
apply_trigger=apply_trigger,
output_location=output_location,
taggers=taggers,
trigger_group=".*SingleEle.*",
analysis="tagAndProbe",
skipCQR=skipCQR,
skipJetVetoMap=False,
year=year if year is not None else {},
doDeco=doDeco,
output_format=output_format
)
self.trigger_group = ".*SingleEle.*"
self.analysis = "tagAndProbe"
self.prefixes = {"tag": "tag", "probe": "probe"}
[docs] def process(self, events: ak.Array) -> Dict[Any, Any]:
dataset_name = events.metadata["dataset"]
# data or mc?
self.data_kind = "mc" if "GenPart" in ak.fields(events) else "data"
# lumi mask
if self.data_kind == "data":
try:
lumimask = select_lumis(self.year[dataset_name][0], events, logger)
events = events[lumimask]
except:
logger.info(
f"[ lumimask ] Skip now! Unable to find year info of {dataset_name}"
)
# apply filters and triggers
events = self.apply_filters_and_triggers(events)
# we need ScEta for corrections and systematics, which is not present in NanoAODv11 but can be calculated using PV
events.Photon = add_photon_SC_eta(events.Photon, events.PV)
# read which systematics and corrections to process
try:
correction_names = self.corrections[dataset_name]
except KeyError:
correction_names = []
try:
systematic_names = self.systematics[dataset_name]
except KeyError:
systematic_names = []
# object corrections:
for correction_name in correction_names:
if correction_name in available_object_corrections.keys():
logger.info(
f"\nApplying correction {correction_name} to dataset {dataset_name}\n"
)
varying_function = available_object_corrections[correction_name]
events = varying_function(events=events)
elif correction_name in available_weight_corrections:
# event weight corrections will be applied after photon preselection / application of further taggers
continue
else:
# may want to throw an error instead, needs to be discussed
warnings.warn(f"Could not process correction {correction_name}.")
continue
original_photons = events.Photon
# systematic object variations
for systematic_name in systematic_names:
if systematic_name in available_object_systematics.keys():
systematic_dct = available_object_systematics[systematic_name]
if systematic_dct["object"] == "Photon":
logger.info(
f"Adding systematic {systematic_name} to photons collection of dataset {dataset_name}"
)
original_photons.add_systematic(
# passing the arguments here explicitly since I want to pass the events to the varying function. If there is a more elegant / flexible way, just change it!
name=systematic_name,
kind=systematic_dct["args"]["kind"],
what=systematic_dct["args"]["what"],
varying_function=functools.partial(
systematic_dct["args"]["varying_function"], events=events, year=self.year[dataset_name][0]
)
# name=systematic_name, **systematic_dct["args"]
)
elif systematic_name in available_weight_systematics:
# event weight systematics will be applied after photon preselection / application of further taggers
continue
else:
# may want to throw an error instead, needs to be discussed
warnings.warn(
f"Could not process systematic variation {systematic_name}."
)
continue
photons_dct = {}
photons_dct["nominal"] = original_photons
logger.debug(original_photons.systematics.fields)
for systematic in original_photons.systematics.fields:
for variation in original_photons.systematics[systematic].fields:
photons_dct[f"{systematic}_{variation}"] = original_photons.systematics[
systematic
][variation]
if self.data_kind == "mc":
event_weights = Weights(size=len(events))
# _weight will correspond to "nominal" weight, what else has to be included here? (lumi? xSec? MC sum of weights?)
event_weights._weight = events["genWeight"]
for variation, photons in photons_dct.items():
logger.debug(f"Variation: {variation}")
if self.chained_quantile is not None:
photons = self.chained_quantile.apply(photons, events)
# recompute photonid_mva on the fly
if self.photonid_mva_EB and self.photonid_mva_EE:
photons = self.add_photonid_mva(photons, events)
# photon preselection
photons = photon_preselection(
self, photons, events, apply_electron_veto=False
)
if self.data_kind == "mc":
# TODO: add weight systs and corrections! (if needed)
# need to annotate the photons already here with a weight since later, each photon can be tag and probe and this changes the length of the array
photons["weight"] = event_weights.weight()
# keep only photons matched to gen e+ or e-
photons = photons[photons.genPartFlav == 11]
# make sure that the matched e+/e- comes from a Z
gen_particles = events.GenPart
gen_indices = photons.genPartIdx[photons.genPartIdx != -1]
gen_particles = gen_particles[gen_indices]
gen_particles = gen_particles[gen_particles.genPartIdxMother == 23]
# other event related variables need to be added before the tag&probe combination
# nPV just for validation of pileup reweighting
photons["nPV"] = events.PV.npvs
photons["fixedGridRhoAll"] = events.Rho.fixedGridRhoAll
# TODO: HLT matching for data
# double the number of diphoton candidates (each item in the pair can be both a tag and a probe)
tnp = ak.combinations(photons, 2, fields=["tag", "probe"])
pnt = ak.combinations(photons, 2, fields=["probe", "tag"])
tnp_candidates = ak.concatenate([tnp, pnt], axis=1)
# check that the e+/e- matched to tag and probe are not the same particle
if self.data_kind == "mc":
tnp_candidates = tnp_candidates[
tnp_candidates.tag.genPartIdx != tnp_candidates.probe.genPartIdx
]
# tag selections
tag_mask = (
(tnp_candidates.tag.pt > 40)
& (tnp_candidates.tag.electronIdx != -1)
& (tnp_candidates.tag.pixelSeed)
& (
tnp_candidates.tag.pfChargedIsoPFPV < 20
) # was: (tnp_candidates.tag.chargedHadronIso < 20)
& (
tnp_candidates.tag.pfChargedIsoPFPV / tnp_candidates.tag.pt < 0.3
) # was: (tnp_candidates.tag.chargedHadronIso / tnp_candidates.tag.pt < 0.3)
)
# probe selections
probe_mask = (
tnp_candidates.probe.pfChargedIsoPFPV < 20
) & ( # was: (tnp_candidates.probe.chargedHadronIso < 20)
tnp_candidates.probe.pfChargedIsoPFPV / tnp_candidates.probe.pt
< 0.3 # was: tnp_candidates.probe.chargedHadronIso / tnp_candidates.probe.pt < 0.3
)
# apply selections
tnp_candidates = tnp_candidates[tag_mask & probe_mask]
# candidates need to be flattened since we have each photon as a tag and probe, otherwise it can't be exported to numpy
tnp_candidates = ak.flatten(tnp_candidates)
if self.output_location is not None:
df = diphoton_list_to_pandas(self, tnp_candidates)
# since we annotated the photons with event variables, these exist now for tag and probe. This concerns weights as well as nPV and fixedGridRhoAll Remove:
if self.data_kind == "mc":
df["weight"] = df["tag_weight"]
df.drop(["tag_weight", "probe_weight"], axis=1, inplace=True)
df["nPV"] = df["tag_nPV"]
df.drop(["tag_nPV", "probe_nPV"], axis=1, inplace=True)
df["fixedGridRhoAll"] = df["tag_fixedGridRhoAll"]
df.drop(
["tag_fixedGridRhoAll", "probe_fixedGridRhoAll"],
axis=1,
inplace=True,
)
fname = (
events.behavior["__events_factory__"]._partition_key.replace(
"/", "_"
)
+ ".%s" % self.output_format
)
subdirs = []
if "dataset" in events.metadata:
subdirs.append(events.metadata["dataset"])
subdirs.append(variation)
dump_pandas(self, df, fname, self.output_location, subdirs)
return {}
[docs] def process_extra(self, events: ak.Array) -> ak.Array:
return events, {}
[docs] def postprocess(self, accumulant: Dict[Any, Any]) -> Any:
pass