Source code for topsim.core.config

# Copyright (C) 2020 RW Bunney

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.


import json
from collections import defaultdict
from pathlib import Path
from topsim.core.instrument import Observation
from topsim.core.machine import Machine
from topsim.core.buffer import HotBuffer, ColdBuffer

import logging

LOGGER = logging.getLogger(__name__)


[docs] class Config: """ Process the configuration of the current simulation Parameters ---------- config : path File path to the JSON config file Attributes ----------- instrument : dict Data for the instrument constructor cluster : dict Data for the cluster constructor buffer : dict Data for buffer constructor timestep_unit: str String value that specifies what granualirity of time is being used in the simulation Raises ------ OSError This is raised if we cannot read the configuration file/it does not exist json.JSONDecodeError This is raised if the JSON file is in the wrong format KeyError Raised if one of the attribute-keys is not found in the provided JSON file. """ def __init__(self, config): try: with open(config, 'r') as infile: cfg = json.load(infile) except OSError: LOGGER.warning("File %s not found", config) raise except json.JSONDecodeError: raise self.path = Path(config) if 'instrument' in cfg and cfg['instrument'] is not None: self.instrument = cfg['instrument'] else: raise KeyError("'instrument' is not present in JSON") if 'cluster' in cfg and cfg['cluster'] is not None: self.cluster = cfg['cluster'] else: raise KeyError("'cluster' is not present in JSON") if 'buffer' in cfg and cfg['buffer'] is not None: self.buffer = cfg['buffer'] else: raise KeyError("'buffer' is not present in JSON") if 'timestep' in cfg and cfg['timestep'] is not None: self.timestep_unit = cfg['timestep'] else: self.timestep_unit = 'seconds' def parse_cluster_config(self): try: (self.cluster['system'] and self.cluster['system']['resources'] and self.cluster['system']["system_bandwidth"]) except KeyError: LOGGER.warning( "'system' is not in %s, check your JSON is correctly formatted", self.cluster) raise machine_list = [] timestep_multiplier = 1 if self.timestep_unit == 'minutes': timestep_multiplier = 60 if self.timestep_unit == 'hours': timestep_multiplier = 3600 elif isinstance(self.timestep_unit, int): # This is a custom unit timestep_multiplier = self.timestep_unit else: # Seconds timestep_multiplier = timestep_multiplier if not self._get_machine_count(): self._update_config() machines_types = self.cluster['system']['resources'] for name, spec in machines_types.items(): num_machines = spec.get("count") for i in range(num_machines): cpu = spec["flops"] * timestep_multiplier machine_list.append( Machine(id=f"{name}_{i}", cpu=cpu, memory=1, # * timestep_multiplier, disk=1, # * timestep_multiplier, bandwidth=(machines_types[name]["compute_bandwidth"] * timestep_multiplier), ethernet=self.cluster['system']["system_bandwidth"] * timestep_multiplier ) ) bandwidth = self.cluster['system']["system_bandwidth"] * timestep_multiplier return machine_list, bandwidth def _get_machine_count(self): """ Utility function that tries and fetches the count from the initial system-resources sub-dictionary. If there is no count, it will return None, and this is used to trigger the re-creation of the existing configuration file into the new format. """ machines_types = self.cluster['system']['resources'] example_key = list(machines_types.keys())[0] return machines_types[example_key].get("count") def _update_config(self): """ Convert the deprecated "cluster" dictionary to the new more concise approach. Here, we make note of each unique machine 'spec' (flops, bandwidth, memory, and disk), count how many of them there are, and overwrite the existing "cluster" dictionary with the new format. We also over-write the existing configuration file so we do not have to perform this everytime we load an older configuration. """ resources = self.cluster["system"]["resources"] grouped_specs = defaultdict(list) for name, spec in resources.items(): # Use dictionary data as key to determine unique entries: spec["name"] = name.split("_")[0] spec_key = json.dumps(spec, sort_keys=True) grouped_specs[spec_key].append(spec) updated_resources = {} for spec_string, spec_list in grouped_specs.items(): spec = json.loads(spec_string) name = spec["name"] del spec["name"] spec["count"] = len(spec_list) updated_resources[name] = spec self.cluster["system"]["resources"] = updated_resources with open(self.path, 'r') as fp: d = json.load(fp) with open(self.path, 'w') as fp: d["cluster"]["system"]["resources"] = updated_resources json.dump(d, fp, indent=2) def parse_instrument_config(self, instrument_name): timestep_multiplier = 1 if self.timestep_unit == 'minutes': timestep_multiplier = 60 elif self.timestep_unit == 'hours': timestep_multiplier = 3600 elif isinstance(self.timestep_unit, int): # This is a custom unit timestep_multiplier = self.timestep_unit else: # Seconds timestep_multiplier = timestep_multiplier cfg = self.instrument total_arrays = cfg[instrument_name]['total_arrays'] pipelines = cfg[instrument_name]['pipelines'] observations = [] for observation in cfg[instrument_name]['observations']: try: name = observation['name'] workflow_path = pipelines[name]['workflow'] ingest_demand = pipelines[name]['ingest_demand'] if 'min_workflow_resources' in observation: min_resources = observation['min_workflow_resources'] else: min_resources = -1 # No minimum requirement if 'max_workflow_resources' in observation: max_resources = observation['max_workflow_resources'] else: max_resources = -1 # No maximum resources o = Observation(name=name, start=observation['start'] / timestep_multiplier, duration=observation['duration'] / timestep_multiplier, demand=observation['instrument_demand'], workflow=((self.path.parent / workflow_path).as_posix()), data_rate=( round(observation['data_product_rate'] * timestep_multiplier) ), timestep=self.timestep_unit) observations.append(o) except KeyError: raise max_ingest_resources = cfg[instrument_name]['max_ingest_resources'] return total_arrays, pipelines, observations, max_ingest_resources def parse_buffer_config(self): config = self.buffer timestep_multiplier = 1 if self.timestep_unit == 'minutes': timestep_multiplier = 60 if self.timestep_unit == 'hours': timestep_multiplier = 3600 elif isinstance(self.timestep_unit, int): # This is a custom unit timestep_multiplier = self.timestep_unit else: # Seconds timestep_multiplier = timestep_multiplier hot = HotBuffer(capacity=config['hot']['capacity'], max_ingest_data_rate=config['hot'][ 'max_ingest_rate'] * timestep_multiplier) cold = ColdBuffer(capacity=config['cold']['capacity'], max_data_rate=config['cold'][ 'max_data_rate'] * timestep_multiplier) return {0: hot}, {0: cold} def get_max_ingest(self, instrument_name): return self.instrument[instrument_name]['max_ingest_resources']