Source code for higgs_dna.samples.fetch

#!/usr/bin/env python
from higgs_dna.utils.logger_utils import setup_logger

import argparse
import json
import os
from typing import List, Iterable, Dict

xrootd_pfx = {
    "Americas": "root://cmsxrootd.fnal.gov/",
    "Eurasia": "root://xrootd-cms.infn.it/",
    "Yolo": "root://cms-xrd-global.cern.ch/",
}


[docs]def get_fetcher_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Query dasgoclient for dataset file lists" ) parser.add_argument( "-i", "--input", help="What input dataset definition file to process.", required=True, ) parser.add_argument( "-w", "--where", help="Where are you running your jobs? (default: %(default)s)", default="Americas", choices=["Americas", "Eurasia", "Yolo"], ) parser.add_argument( "-x", "--xrootd", help="Override xrootd prefix with the one given.", default=None, ) parser.add_argument( "--dbs-instance", dest="instance", help="The DBS instance to use for querying datasets. (default: %(default)s)", type=str, default="prod/global", choices=["prod/global", "prod/phys01", "prod/phys02", "prod/phys03"], ) return parser.parse_args()
[docs]def get_dataset_dict( fset: Iterable[Iterable[str]], xrd: str, dbs_instance: str ) -> Dict[str, List[str]]: """ Get a dictionary of dataset and the files in it. :param fset: A list of tuples with the format (dataset-short-name, path) :type fset: list :return fdict: A dictionary of dataset-short-name: list-of-files :rtype fdict: dict: """ fdict = {} for name, dataset in fset: flist = ( os.popen( # use the cvmfs source for dasgoclient because it works everyone # Both local infrastructures with cvmfs and lxplus! ("/cvmfs/cms.cern.ch/common/dasgoclient -query='instance={} file dataset={}'").format( dbs_instance, dataset ) ) .read() .split("\n") ) if name not in fdict: fdict[name] = [xrd + f for f in flist if len(f) > 1] else: # needed to collect all data samples into one common key "Data" (using append() would introduce a new element for the key) fdict[name].extend([xrd + f for f in flist if len(f) > 1]) return fdict
if __name__ == "__main__": args = get_fetcher_args() logger = setup_logger(level="INFO") if ".txt" not in args.input: raise Exception("Input file must have '.txt' extension and be a text file!") fset = [] with open(args.input) as fp: for i, line in enumerate(fp.readlines()): if line.strip().startswith("#"): continue fset.append(tuple(line.strip().split())) if len(fset[-1]) != 2: raise Exception( f"Text file format should be '<short name> <dataset path>' and nothing else.\nInvalid spec on line {i+1}: '{line}'" ) logger.info(f"Using following combination of datasets names and paths: {fset}") xrd = xrootd_pfx[args.where] if args.xrootd is None else args.xrootd logger.info(f"Using xrootd prefix: {xrd}") fdict = get_dataset_dict(fset, xrd, args.instance) # pprint.pprint(fdict, depth=1) with open(args.input[: args.input.rfind(".txt")] + ".json", "w") as fp: json.dump(fdict, fp, indent=4)