Source code for topsim.core.cluster

import pandas as pd
import logging

from topsim.core.task import Task, TaskStatus
from topsim.common.globals import TIMESTEP

logger = logging.getLogger(__name__)


[docs] class Cluster: """ A class used to represent the Cluster, the abstract representation of computing resources in the Science Data Processor The Cluster runs on a per-timestep capacity through its `run()` function in the same way other actors do; however, it's runtime work is minimal. The main purpose of the Cluster is to provide access methods for requesting machine data (either aggregate or individual), and for requesting allocations to schedule. The majority of the Cluster is therefore 'read-only' from a user perspective. The only situations in which a user will change the system on the cluster is by using a non-default resource provisioning policy (see Notes below). Parameters ---------- env : :py:obj:`simpy.Environment` The environment for the current simulation. config : :py:obj:`~topsim.core.config.Config` The configuration object for the simulation. See :py:obj:`~topsim.core.simulation.Simulation` for more details. Notes ----- TopSim defaults to a 'free-for-all' style of resource allocation; unless otherwise stated, a resource that is marked as 'available' may be used for any task (provided capacity restrictions are met etc.). If a SLURM-type resource provisioning approach is wanted, where a portion of resources are allocated to a specific workflow for the duration of that workflow, it is possible to use the :py:meth:`~topsim.core.cluster.Cluster.provision_batch_resources` class method in your (online) Scheduling algorithm. This associates a set of machines for your workflow based on a provisioning scheme of your design. The clean-up of resources is completed by the Scheduler once all :py:obj:`~topsim.core.task.Task` objects in the :py:obj:`~topsim.core.planner.WorkflowPlan` have finished running, and requires no additional code on behalf of the user. """ def __init__(self, env, config): """ Initialising a Cluster object requires only the Simpy environment and a Config object. """ self.env = env #: Simulation Environment object machines, system_bandwidth = config.parse_cluster_config() self.machines = machines #: `list` of :py:obj:`~topsim.core.machine.Machine objects` self.system_bandwidth = system_bandwidth #: System bandwidth across the cluster self.machine_ids = {machine.id: machine for machine in self.machines} self.cl = ['default'] self._resources = {'ingest': [], 'occupied': [], 'idle': {}, 'available': [machine for machine in self.machines], 'total': len(self.machines)} self._tasks = {'running': [], 'finished': {}, 'waiting': [], } # Dictionary of tasks on system self._ingest = {'status': False, 'pipeline': None, 'observation': None, 'completed': 0, 'demand': 0} # Dictionary of current ingest information # create output data frame self._usage_data = {'occupied': 0, 'ingest': 0, 'available': len(self._resources['available']), 'running_tasks': 0, 'finished_tasks': 0} self.num_provisioned_obs = 0 self.events = [] self._clusters = { 'default': {'resources': self._resources, 'tasks': self._tasks, 'ingest': self._ingest, 'usage_data': self._usage_data, 'ingest_pipeline': None, 'ingest_observation': None, }} def run(self): """ Start the runtime loop for the cluster, and manage checks on the machine Yields ------- Standard TIMESTEP timeout for the simulation. """ while True: self.events = [] # Manage each cluster for c in self.cl: if not self._clusters[c]['ingest']['status']: self._clusters[c]['usage_data']['ingest'] = 0 self._clusters[c]['ingest']['demand'] = 0 yield self.env.timeout(TIMESTEP) def check_ingest_capacity(self, pipeline_demand, max_ingest_resources, c='default'): """ Check if the Cluster has the machine capacity to process the observation Ingest pipeline Observation objects have an observation type - this corresponds to an ingest pipeline that is set out in the Telescope. This pipeline type determines the number of machines in the cluster, and the duration, which must be reserved for the observation. The cluster also has a maximum number of ingest Parameters ---------- pipeline_demand : int The number of machines in the cluster required to run ingest pipeline max_ingest_resources : int The number of resources that may be allowed c : str cluster Returns ------- True if the cluster has capacity False if the cluster does not have capacity to run the pipeline """ # Pipeline demand is the number of machines required for the pipeline # Length is how long the pipeline will take to # ingest/observation will take num_available = len(self._clusters[c]['resources']['available']) num_ingest = len(self._clusters[c]['resources']['ingest']) if pipeline_demand > max_ingest_resources: return False if len(self._clusters[c]['resources'][ 'available']) >= pipeline_demand and len( self._clusters[c]['resources'][ 'ingest']) + pipeline_demand <= max_ingest_resources: return True else: return False def provision_ingest_resources(self, demand, observation, c='default'): """ Based on the requirements of the pipeline, provision a certain number of resources for ingest Parameters ---------- c observation demand : int The type of ingest pipeline - see Observation Returns ------- """ if demand > len(self.current_available_resources()): raise RuntimeError(f"Failed to check system capacity" f" before allocating resources to ingest!") tasks = self._generate_ingest_tasks(demand, observation) pairs = [] temp_ingest_resources = ( self._clusters[c]['resources']['available'][:demand]) for i, machine in enumerate(temp_ingest_resources): pairs.append((machine, tasks[i])) # TODO update how we allocate tasks to resources here so we don't generate same pairs self._clusters[c]['ingest']['status'] = True self._clusters[c]['ingest']['demand'] = demand id = observation.name while True: for pair in pairs: (machine, task) = pair self._clusters[c]['resources']['ingest'].append(machine) self._clusters[c]['resources']['available'].remove(machine) ret = self.env.process( self.allocate_task_to_cluster(task, machine, observation=id, ingest=True)) else: break yield self.env.timeout(TIMESTEP) def clean_up_ingest(self, c='default'): """ Once we finished 'provision ingest', we want to update the cluster status before starting the new timestep Returns ------- """ self._clusters[c]['ingest']['completed'] += 1 self._clusters[c]['ingest']['status'] = False def current_available_resources(self): """ Produce a list of current available resources We use a list comprehension here because otherwise we would return an actual reference to the resources, causing issues if we want to remove elements from the list wherever we are using it. Returns ------- A list that duplicates the entries for current available resources """ return [x for x in self._clusters['default']['resources']['available']] def allocate_task_to_cluster(self, task, machine, predecessor_allocations=None, observation=None, ingest=False, c='default'): """ Receive task from scheduler for allocation to specified machine Parameters ---------- task : machine : predecessor_allocations ingest observation pred : list of predecessors machine allocations, for use if the task is allocated to a different machine. Returns ------- True if task successfully completed """ ret = None while True: if task not in self._clusters[c]['tasks']['running']: # THIS CHECK DOESN"T WORK FIX IT SOMEHOW if (machine not in self._clusters[c]['resources'][ 'available'] and (machine not in self._clusters[c]['resources'][ 'ingest'] and machine not in self.get_idle_resources( observation))): raise RuntimeError if ingest: # Ingest resources allocated separately from scheduler self._clusters[c]['tasks']['running'].append(task) self._clusters[c]['tasks']['finished'][task] = False self._clusters[c]['usage_data']['available'] -= 1 self._clusters[c]['usage_data']['running_tasks'] += 1 self._clusters[c]['usage_data']['ingest'] += 1 task.task_status = TaskStatus.SCHEDULED ret = machine.run(task, self.env, predecessor_allocations) else: self._set_machine_occupied(machine, observation) self._clusters[c]['tasks']['running'].append(task) self._clusters[c]['usage_data']['available'] -= 1 self._clusters[c]['usage_data']['running_tasks'] += 1 task.task_status = TaskStatus.SCHEDULED ret = self.env.process(task.do_work(self.env, machine, predecessor_allocations)) yield self.env.timeout(1) if ret.triggered: # machine.stop_task(task) self._clusters[c]['tasks']['running'].remove(task) self._clusters[c]['usage_data']['running_tasks'] -= 1 self._clusters[c]['tasks']['finished'][task] = True self._clusters[c]['usage_data']['finished_tasks'] += 1 if ingest: self._clusters[c]['resources']['ingest'].remove(machine) self._clusters[c]['resources']['available'].append(machine) self._clusters[c]['usage_data']['ingest'] -= 1 else: self._set_machine_available(machine, observation) self._clusters[c]['usage_data']['available'] += 1 task.task_status = TaskStatus.FINISHED task.delay_flag = task.delay_flag return task.task_status else: yield self.env.timeout(TIMESTEP) def is_idle(self): """ Check to see if anything is running on the cluster This is a way of determining if we are able to finish the simulation. Returns ------- True if nothing is running, False otherwise """ no_tasks_running = ( (len(self._clusters['default']['tasks']['running']) == 0) or ( len(self._clusters['default']['tasks']['waiting']) == 0)) no_resources_occupied = ((len( self._clusters['default']['resources']['occupied']) == 0) or (len( self._clusters['default']['resources']['ingest']) == 0)) if no_tasks_running and no_resources_occupied: return True else: return False def is_occupied(self, machine, observation=None, c='default'): """ Check if the machine is occupied Parameters ---------- machine : topsim.core.machine.Machine The machine with which we are concerned. observation : topsim.core.Observation (option) The observation that is associated with the current check. This is useful for ensuring machines are not reserved for a batch-processing c : object The identifier for the cluster that is being accessed (in the event of multiple clusters). Access the 'default' cluster by default (nothing needs changing in this scenario, in which only one cluster is specified). Returns ------- """ # TODO add 'is provisioned' check here return (machine in self._clusters[c]['resources'][ 'occupied'] or machine in self._clusters[c]['resources']['ingest']) def provision_batch_resources(self, size, name, c='default'): """ Mark a machine on the cluster as being allocated to a workflow, without the allocation place just yet. Parameters ---------- size: int The number of resources to be provisioned based on the observation workflow name : the observation that is associated with the provisioning c : str The name of the cluster (defaults to 'default' if we are only using one). Returns ------- """ available_resources = self.get_available_resources() tmp = len(available_resources) if size > tmp > 0: size = tmp for m in range(0, size): self._add_idle_resource(name, available_resources[m]) logger.info(f"{size} machines provisioned for {name}") self.num_provisioned_obs += 1 return True def release_batch_resources(self, observation, c='default'): """ For a given observation, release these observations from the idle-resources section and add them to the available resources 'pile' Parameters ---------- observation Returns ------- """ if observation in self._clusters[c]['resources']['idle']: self._update_available_resources(observation) self._reset_idle_resources(observation) def get_machine_from_id(self, id, c='default'): """ Parameters ---------- id : str The str-id of the machine we want to access c Returns ------- :py:obj:`~topsim.core.machine.Machine` """ return self.machine_ids[id] def _update_available_resources(self, observation, c='default'): """ De-allacote resources to a given observation (batch-reservation) and add them to the 'available' pool of resources. Parameters ---------- machine Returns ------- """ idle_resources = self.get_idle_resources(observation) for m in idle_resources: self._clusters[c]['resources']['available'].append(m) def get_available_resources(self, c='default'): # TODO Figure out why this breaks in one but not the other return [x for x in self._clusters[c]['resources']['available']] def is_observation_provisioned(self, observation, c='default'): return observation in self._clusters[c]['resources']['idle'] def get_idle_resources(self, observation, c='default'): """ List the resources that are currently provisioned for the specified observation. Parameters ---------- observation : :py:obj: c Returns ------- """ if observation in self._clusters[c]['resources']['idle']: return [x for x in self._clusters[c]['resources']['idle'][observation]] else: return [] @property def finished_tasks(self, c='default'): """ Returns the tasks that are finished on the cluster Returns ------- `list` of finished :py:obj:`topsim.core.task.Task` objects """ return [x for x in self._clusters[c]['tasks']['finished']] def is_task_finished(self, task, c='default'): """ Check if task is finished or still running on the cluster Parameters ---------- task : task object Returns ------- True if task is finished (in cluster finished dictionary) """ if task not in self._clusters[c]['tasks']['finished']: return False else: return self._clusters[c]['tasks']['finished'][task] def get_finished_tasks(self, c='default'): """ Return a list of the tasks that are finished Parameters ---------- c Returns ------- """ return [x for x in self._clusters[c]['tasks']['finished']] def _set_machine_occupied(self, machine, observation, ingest=False, c='default'): """ Parameters ---------- machine observation ingest : bool If True, we update which pool we keep track of, as we keep keep ingest resources separate from 'occupied' for reporting reasons. Returns ------- """ if ingest: pool = 'ingest' else: pool = 'occupied' if machine in self.get_available_resources(): self._clusters[c]['resources']['available'].remove(machine) self._clusters[c]['resources'][pool].append(machine) return True elif observation in self._clusters[c]['resources']['idle']: self._clusters[c]['resources']['idle'][observation].remove(machine) self._clusters[c]['resources'][pool].append(machine) return True else: raise RuntimeError("Incorrect allocation") def _set_machine_available(self, machine, observation, ingest=False, c='default'): """ Take a machine marked as 'occupied' on the cluster and mark it either as available or return it to an observation's pool of resources. Parameters ---------- observation Returns ------- """ if ingest: self._clusters[c]['resources']['ingest'].remove(machine) else: self._clusters[c]['resources']['occupied'].remove(machine) # Return to provisioned or unprovisioned resource pool if observation in self._clusters[c]['resources']['idle']: self._clusters[c]['resources']['idle'][observation].append(machine) else: self._clusters[c]['resources']['available'].append(machine) def _clean_up_finished_task(self, task, machine, observation): pass # TODO # def _set_task_running(self, task, c='default'): # # return self.clusters[c]['tasks']['running'].append(task) def _add_idle_resource(self, observation, machine, c='default'): """ Add a resource to the dictionary of idle batch resources for the current observation Parameters ---------- observation machine c Returns ------- """ if observation not in self._clusters[c]['resources']['idle']: self._clusters[c]['resources']['idle'][observation] = [] if machine in self.get_available_resources(): self._clusters[c]['resources']['idle'][observation].append(machine) self._remove_available_resource(machine) else: raise RuntimeError( 'Attempting to provision resources on machine that is not ' 'available') def _get_batch_observations(self, c='default'): return list(self._clusters[c]['resources']['idle'].keys()) def _reset_idle_resources(self, observation, c='default'): """ Remove observation from idle resource dictionary. This clears the machine allocations from the idle resources Parameters ---------- observation c Returns ------- """ if self._clusters[c]['resources']['idle'][observation]: self._clusters[c]['resources']['idle'].pop(observation) self.num_provisioned_obs -= 1 return None def _remove_available_resource(self, machine, c='default'): """ Parameters ---------- machine c Returns ------- """ self._clusters[c]['resources']['available'].remove(machine) def _add_event(self, observation, resource, event): self.events.append( { "time": self.env.now, "actor": "cluster", "observation": observation.name, "event": event, "resource": resource } ) def _set_machine_task_occupied(self): """ Update allocation dictionaries and output data dictionaries Returns ------- """ pass def _set_machine_task_available(self): """ Update allocation dictionaries and output data dictionaries Returns ------- """ pass def _set_machine_task_ingest(self): """ Update allocation dictionaries to indicate machine is being used for ingest Returns ------- """ def _generate_ingest_tasks(self, demand, observation): """ Parameters ---------- demand : int Number of machines that are provisioned for ingest duration : int Duration of observation (in simulation timesteps) Returns ------- tasks : list() List of core.Planner.Task objects """ tasks = [] for i in range(demand): t = Task(f"{observation.name}_ingest_t{i}", 0, 0, None, None, 0, 0, 0, None) # obseration.start_time t.duration = observation.duration t.task_status = TaskStatus.SCHEDULED tasks.append(t) return tasks def to_df(self): """ Notes ----- Currently only works for single resource Returns ------- """ df = pd.DataFrame() df['available_resources'] = [ self._clusters['default']['usage_data']['available']] df['ingest_resources'] = [ self._clusters['default']['usage_data']['ingest']] df['running_tasks'] = [ self._clusters['default']['usage_data']['running_tasks']] df['finished_tasks'] = [ self._clusters['default']['usage_data']['finished_tasks']] df['provisioned_observations'] = [ len(self._clusters['default']['resources']['idle'])] # df['waiting_tasks'] = [len(self.tasks['waiting'])] return df def _update_usage_data(self, resource: str, value): """ Update the usage statistics of the resource with the new value Parameters ---------- resource : str A string value from the 'usage_data' dictionary Returns ------- The new value """ self._clusters['default']['usage_data']['resource'] = value return value def finished_task_time_data(self): """ Each task in the 'finished' component of 'self.clusters' has the estimated start times, end times, and the actual start and finish times. We can use this to track the expected finish time across the two Returns ------- """ finished_tasks = self._clusters['default']['tasks']['finished'] task_data = {} for task in finished_tasks: task_data[task.id] = {} task_data[task.id]['est'] = task.est + task.workflow_offset task_data[task.id]['eft'] = task.eft + task.workflow_offset task_data[task.id]['ast'] = task.ast task_data[task.id]['aft'] = task.aft task_data[task.id]['workflow_offset'] = task.workflow_offset task_data[task.id]['observation_id'] = task.id.split('_')[ 0] # task_data['pred'] = [pred for pred in task.pred] return pd.DataFrame(task_data).infer_objects() def __len__(self): return len(self.machines)