Source code for topsim.user.telescope

# import simpy
# from core.planner import Planner
# import config_data
import pandas as pd
import logging

from topsim.core.instrument import Instrument, RunStatus
from topsim.core.scheduler import ScheduleStatus

LOGGER = logging.getLogger(__name__)


class TelescopeQueue:

    def __init__(self):
        self._queue = []

    def push(self, x):
        self._queue.append(x)

    def pop(self):
        return self._queue.pop(0)

    def size(self):
        return len(self._queue)

    def empty(self):
        return len(self._queue) == 0


[docs] class Telescope(Instrument): """ The Telescope is a high-level abstraction of the Telescope and Telesopce Operations Centre/Monitor. It coordinates between the Scheduler to determine if the Cluster and the Buffer have capacity. Parameters ---------- env : Simpy.Environment object The simulation environment config : core.config.Config object Config object with stored simulation configuration planner : core.planner.Planner object The Planner actor for the current simulation scheduler : core.scheduler.Scheduler object The Scheduler actor for the current simulation Raises ------ OSError This will be raised if we cannot read the Telescope config file. JSONDecodeError This will be raised if the config is not parseable JSON """ name = 'telescope' def __init__(self, env, config, planner, scheduler): super().__init__(env, config, planner, scheduler) self.env = env try: (total_arrays, pipelines, observations, max_ingest) = config.parse_instrument_config(Telescope.name) except OSError: raise #: int: Total number of arrays used to observe self.total_arrays = total_arrays #: `dict` of different `observation: pipeline` pairs self.pipelines = pipelines self.observations = observations self.max_ingest = max_ingest #: :py:obj:`~topsim.core.scheduler.Scheduler` object of Simulation self.scheduler = scheduler #: :py:obj:`~topsim.core.olanner.Planner` object of Simulation self.planner = planner self.observation_types = None self.events = [] self.telescope_status = False self.telescope_use = 0 self.delayed = False def run(self): """ The entry point for the Telescope actor; this will make decisions per timestep and then once these decisions have been resolved, yield a timeout to indicate a single timestep has passed for the Telescope. Decisions made within this function are: * Check telescope capacity for running new observations * Communicate with Scheduler to determine if the Buffer and Cluster have capacity to run a new Observation (Ingest and Storage conditions). * Initiate the generation of a WorkflowPlan using the Planner, once and observation is scheduled for observation. * Finalise an Observation and initiate the 'clean-up'. Notes ----- The procedure here follows an 'observation-led' approach to determining if the observation is able to run (hence `observation.is_ready()`. The idea is the observation is standard across any implementation of Instrument, so it's easier for user-definied instruments to defer to the observation to determine if it is possible to run the observation ( i.e. have the observation check if it's Start/End time conditions have been met). Returns ------- Yields ------ self.env.timeout(1) A single simulation timestep """ while self.has_observations_to_process(): # Check if scheduler is delayed self.events = [] if ( self.scheduler.schedule_status is ScheduleStatus.DELAYED and not self.delayed): self.delayed = True for observation in self.observations: capacity = self.total_arrays - self.telescope_use # IF there is an observation ready for start if observation.is_ready(self.env.now, capacity): # TODO update this with a counter so we know when # this occurs but it doesn't spam the terminal # LOGGER.info('Observation %s scheduled for %s', # observation.name, self.env.now) # Observation is ready - is the Buffer/Cluster? if self.scheduler.check_ingest_capacity(observation, self.pipelines, self.max_ingest): ret = self.begin_observation(observation) observation.ast = self.env.now LOGGER.info('telescope is now using %s arrays', self.telescope_use) process = self.env.process( self.scheduler.allocate_ingest(observation, self.pipelines, self.planner)) self._add_event(observation, "telescope", "started") elif observation.is_finished(self.env.now, self.telescope_status): observation.status = self.finish_observation(observation) self._add_event(observation, "telescope", "finished") LOGGER.info('Telescope is now using %s arrays', self.telescope_use) else: continue yield self.env.timeout(1) self.events = [] def begin_observation(self, observation): """ Update the telescope use status based on observation demand for antennas Parameters ---------- observation : :py:obj:`~topsim.core.telescope.Observation` Observation that is chosen for allocation to the telescope Returns ------- RunStatus.RUNNING """ # TODO We do not actually do any sanity checking here to make sure # the telescope use isn't greater than the total_capacity. self.telescope_use += observation.demand self.telescope_status = True return RunStatus.RUNNING def finish_observation(self, observation): """ Update the telescope use status based on observation demand for antennas Parameters ---------- observation : :py:obj:`~topsim.core.telescope.Observation` Observation that is chosen for allocation to the telescope Returns ------- RunStatus.RUNNING """ # TODO We do not actually do any sanity checking here to make sure # the telescope use isn't greater than the total_capacity. self.telescope_use -= observation.demand if self.telescope_use == 0: self.telescope_status = False return RunStatus.FINISHED def run_observation_on_telescope(self, demand): pass def has_observations_to_process(self): for observation in self.observations: if observation.status == RunStatus.FINISHED: continue else: return True return False def make_greedy_decision(self): """ This method acts as a 'greedy' decision maker, with the following policies: * If there is space in the buffer and cluster, and there is enough room on the telescope, schedule the observation * If there is a delay, drop the lowest priority observation from the plan. * If all observations are of the same priority, drop the shortest * If all observations are of the same length, drop the one with the highest data requirement Returns ------- """ def make_delay_conscious_decision(self): """ This method takes into account delays on the telescope and acts to improve the science output based on the delays This will change based on whether we are in a Global or Independent DAG model. Independent DAG model - drop things Global DAG model - we reschedule based on current awareness of the cluster and the delays (with a buffer). This means we update our global plan, with the workflows potentially being interleaved more effectively as a result of the delay. Then, based on this re-planning, we determine the delay %; if it's still over, we follow the greedy approach Returns ------- """ def observations_waiting(self): """ Calculate the number of observations that are waiting for telescope time Returns ------- """ return sum([1 if x.status == RunStatus.WAITING else 0 for x in self.observations]) def observations_finished(self): """ Calculates the number of observations that are finished observing Returns ------- Integer number of finished observations """ return sum([1 if x.status == RunStatus.FINISHED else 0 for x in self.observations]) def print_state(self): return {'telescope_in_use': self.telescope_status, 'telescope_arrays_used': self.telescope_use, 'observations_waiting': self.observations_waiting()} def is_idle(self): """ Check if telescope has current or pending observations Returns ------- True if there are no pending or current observations """ for observation in self.observations: if observation.status != RunStatus.FINISHED: return False if ((not self.telescope_status) and self.telescope_use == 0): return True return False def to_df(self): df = pd.DataFrame() df['observations_waiting'] = [self.observations_waiting()] df['observations_finished'] = [self.observations_finished()] df['observations_delayed'] = [self._calc_observation_delay()] return df def _calc_observation_delay(self): cum_delay = 0 for observation in self.observations: if self.env.now > observation.est and ( observation.status == RunStatus.WAITING): cum_delay += self.env.now - observation.est return cum_delay def _add_event(self, observation, resource, event): self.events.append( { "time": int(self.env.now), "actor": "instrument", "observation": observation.name, "event": event, "resource": resource } )