# Copyright (C) 10/19 RW Bunney
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
buffer.py contains three main class that encapsulate the behaviour of the
Buffer actor in a simulation.
* Buffer : This is the high-level buffer that other actors interact with. It
is the one that is initalised at the beginning of a simulation, and who's
process is invoked with simpy.env.process.
* HotBuffer : This is the streaming buffer in which instrument ingest occurs,
and from which ingest processing pipelines may be run.
* ColdBuffer : This is the main storage buffer, where post-ingest data is
moved and from where post-processing pipelines access workflow data.
"""
import logging
import json
from time import sleep
import pandas as pd
from tqdm import tqdm
from topsim.common.globals import TIMESTEP
from topsim.core.instrument import RunStatus
LOGGER = logging.getLogger(__name__)
[docs]
class Buffer:
"""
Parameters
----------
env : core.Simpy.Environment object
The simulation environment
Attributes
---------
Methods
-------
"""
def __init__(self, env, cluster, planner, config):
"""
Parameters
----------
env : simpy.Environment
The environment object for the Simulation
cluster : topsim.core.cluster.Cluster
Cluster (Actor) object for the simulation
config : topsim.core.config.Config
Config object
"""
self.env = env
self.cluster = cluster
# We are reading this from a file, check it works
try:
self.hot, self.cold = config.parse_buffer_config()
except OSError:
print("Error processing Buffer config file")
raise
self.planner = planner
self.hot_count = len(self.hot)
self.cold_count = len(self.cold)
self._data_left_to_transfer = 0
self.waiting_observation_list = []
self.events = []
self.threshold = 0.6
self.stored_times = []
def run(self):
"""
This is the main process of the Buffer to have it running continually
during the simulation.
At each timestep, the buffer checks if there is an observation stored
on the HotBuffer, and schedules it to move to longer-term storage on
the cold buffer, in order to make room for new ingest/observations.
Yields
-------
A simpy.env.timeout() of duration topsim.common.globals.TIMESTEP
"""
while True:
self.events = []
if self.env.now % 1000 == 0:
LOGGER.debug(
"\nHotBuffer: %s \nColdBuffer: %s @ %d",
[self.hot[b].current_capacity for b in self.hot],
[self.cold[b].current_capacity for b in self.cold],
self.env.now
)
for b in self.hot:
if self.check_buffer_over_data_threshold(b):
if self.env.now in self.stored_times:
continue
if self.has_observations_stored(b) and self.cold[b].has_capacity_for(
self.hot[b].observations['stored'][-1].total_data_size):
if not self.transfer_in_progress(b):
self.env.process(self.move_hot_to_cold(b))
# If the capacity leftover after this observation has completed
# is less than the threshold we have set, then we check to see if we can
# move an observation
if ((1-(self.hot[b].current_capacity + self._data_left_to_transfer)
/ self.hot[b].total_capacity) < self.threshold):
if self.cold[b].observations['stored']:
if self.project_buffer_capacity(self.cold[b].observations['stored'][0], b):
if not self.transfer_in_progress(b):
self.env.process(self.move_cold_to_hot(b))
else:
continue
else:
continue
yield self.env.timeout(TIMESTEP)
def transfer_in_progress(self, b):
return self.hot[b].observations['transfer'] or self.cold[b].observations['transfer']
def check_buffer_over_data_threshold(self,b):
return ((self.hot[b].total_capacity - self.hot[b].current_capacity)
/ self.hot[b].total_capacity) > self.threshold
def has_observations_stored(self, b):
return len(self.hot[b].observations['stored']) > 1
def project_buffer_capacity(self, obs, b):
numerator = self.hot[b].total_capacity - (self.hot[b].current_capacity)
numerator += obs.total_data_size
return numerator / self.hot[b].total_capacity < self.threshold
def check_buffer_capacity(self, observation):
"""
Determines if there is capacity in both the Hot and Cold Buffers
Parameters
----------
observation : topsim.core.telescope.Observation
The observation intended to be added to the buffer
Returns
-------
True :
If both buffers have capacity
False :
If at least one buffer does not have capacity
TODO Ensure that we do not start observations if the size of the data +
the total size of the TRANSFERRING observation data is > than the total
data of the cold buffer.
NOTE This is necessary only for if we want to move data from
hot-to-cold. This avoids the possibility for an observation to
go ahead when both hot buffer and cold buffer have capacity
mid-transfer, but the cold buffer does not have capacity post-transfer.
For example, if we both buffers have capacity of 150, we ingest a 125
tb ingest, we start to transfer that to the buffer, then we get a 50tb
ingest when the HotBuffer is at 80 and the ColdBuffer is at 70 (
mid-transfer), this means will will accept an ingest of 50 data because
both separately are fine, but after ingest will not be.
"""
b = observation.buffer_id
if observation.duration < 1:
raise RuntimeError(
f"Observation duration has become less than 1 second.\n"
f"Please check the observation plan, "
f"or ensure that the conversion between observation duration "
f"and simulation units do not cause a fractional timestep."
)
size = observation.ingest_data_rate * observation.duration
if self.hot[b].total_capacity <= size:
raise RuntimeError(
f"Observation data size is equal or greater than HotBuffer capacity."
f"Consider expanding capacity size."
f"{observation.name}, Observation: {size} vs Hot Buffer: {self.hot[b].total_capacity:.1f}"
)
elif self.hot[b].current_capacity - size < 0 \
or not self.cold[b].has_capacity_for(size):
return False
return True
def ingest_data_dump(self, data):
pass
def has_observations_ready_for_processing(self):
"""
Observations that are available for processing must come from the
ColdBuffer.
Returns
-------
True if has observations; False if not.
"""
for b in self.hot:
if self.hot[b].has_stored_observations() and not self.check_buffer_over_data_threshold(b):
return True
return False
# return self.cold.has_stored_observations()
def next_observation_for_processing(self):
"""
Calls the cold buffer and checks if there are any observations
for processing
Returns
-------
observation : topsim.core.telescope.Observation()
next observation for processing
"""
# Next observation for processing must come from the hot buffer.
# This should triger
for b in self.hot:
obs = self.hot[b].next_observation_for_processing()
return obs
def mark_observation_finished(self, observation):
"""
To be called by the Scheduler.
This marks an observation as 'finished' in the buffer, so it is
no longer accessible from the ColdBuffer storage.
Parameters
----------
observation : topsim.core.Telescope.Observation
The observation that has been processed on the scheduler.
Returns
-------
The result of ColdBuffer.remove(observation), which is a boolean value
which is True if the observation is in the ColdBuffer, and false if it
is not.
"""
b = observation.buffer_id
self._add_event(observation, "buffer", "removed")
return self.hot[b].remove(observation)
def move_hot_to_cold(self, b):
"""
Called when the scheduler is requesting data for workflow processing.
This method 'moves' the observation data from the HotBuffer to the
ColdBuffer, at a rate of ColdBuffer.max_data_rate.
----------
observation : core.telescope.Observation object
The observation that is stored in the HotBuffer, to be moved
Returns
-------
"""
# TODO Support multiple observation transfers
if not self.hot[b].observations["stored"]:
raise RuntimeError(
"No observations in Hot Buffer"
)
# Iterate through current observations for transfer
# Each of them will have a data size
# The total data rate is just total divided by the number of
# observations in the 'transfer' dictionary.
# Each timestep we check the length - if something has been removed
# from transfer, we update the data rate
current_obs = self.hot[b].observation_for_transfer()
# current_obs = self.hot.observations['transfer']
self._data_left_to_transfer = current_obs.total_data_size
data_left_to_transfer = current_obs.total_data_size
_total_data = current_obs.total_data_size
_tqdm = False
pbar = None
if _tqdm:
pbar = tqdm(total=_total_data, desc=f'Buffer: {current_obs.name}')
if not self.cold[b].has_capacity_for(data_left_to_transfer):
# We cannot actually transfer the observation due to size
# constraints
# TODO create an object method to update the hot buffer
self.hot[b].observations['stored'].append(current_obs)
self.hot[b].observations['transfer'].remove(current_obs)
return False
# TODO UPDATE EVENT INFORMATION ON WHICH TRANSFER DIRECTION
self._add_event(current_obs, "transfer", "started")
while True:
# data_transfer_time = observation_size / self.cold.max_data_rate
#
# time_left = data_transfer_time - 1
if data_left_to_transfer <= 0:
LOGGER.info(
"Buffer transfer completed at time %s", self.env.now
)
self._add_event(current_obs, "transfer", "stopped")
break
check = self.cold[b].receive_observation(
current_obs,
data_left_to_transfer
)
data_left_to_transfer = self.hot[b].transfer_observation(
current_obs, self.cold[b].max_data_rate, data_left_to_transfer
)
if check != data_left_to_transfer:
raise RuntimeError(
"Hot and Cold Buffer receiving data at a differen rate"
)
if pbar:
pbar.update(n=self.cold[b].max_data_rate)
self._data_left_to_transfer = data_left_to_transfer
yield self.env.timeout(TIMESTEP)
if pbar:
pbar.close()
return True
def move_cold_to_hot(self, b):
"""
Called from within the buffer, when we hae capacity in the HotBuffer to
process an existing observation.
This method 'moves' the observation data from the ColdBuffer to the
HotBuffer, at a rate of ColdBuffer.max_data_rate.
----------
observation : core.telescope.Observation object
The observation that is stored in the HotBuffer, to be moved
Returns
-------
"""
# TODO Support multiple observation transfers
if not self.cold[b].observations["stored"]:
raise RuntimeError(
"No observations in Hot Buffer"
)
# Iterate through current observations for transfer
# Each of them will have a data size
# The total data rate is just total divided by the number of
# observations in the 'transfer' dictionary.
# Each timestep we check the length - if something has been removed
# from transfer, we update the data rate
current_obs = self.cold[b].observation_for_transfer()
# current_obs = self.hot.observations['transfer']
data_left_to_transfer = current_obs.total_data_size
_total_data = current_obs.total_data_size
_tqdm = False
pbar = None
if _tqdm:
pbar = tqdm(total=_total_data, desc=f'Buffer: {current_obs.name}')
if not self.hot[b].has_capacity_for(data_left_to_transfer):
# We cannot actually transfer the observation due to size
# constraints
# TODO create an object method to update the hot buffer
self.cold[b].observations['stored'].append(current_obs)
self.cold[b].observations['transfer'] = None
return False
self._add_event(current_obs, "transfer-to-hot", "started")
while True:
if data_left_to_transfer <= 0:
LOGGER.info(
"Buffer transfer completed at time %s", self.env.now
)
self._add_event(current_obs, "transfer-to-hot", "stopped")
break
check = self.hot[b].receive_observation(
current_obs,
data_left_to_transfer,
# Pick the slowest rate to transfer
min(self.hot[b].max_ingest_data_rate, self.cold[b].max_data_rate)
)
data_left_to_transfer = self.cold[b].transfer_observation(
current_obs, min(self.hot[b].max_ingest_data_rate, self.cold[b].max_data_rate),
data_left_to_transfer
)
if check != data_left_to_transfer:
raise RuntimeError(
"Hot and Cold Buffer receiving data at a differen rate"
)
if pbar:
pbar.update(n=self.cold[b].max_data_rate)
yield self.env.timeout(TIMESTEP)
if pbar:
pbar.close()
return True
def ingest_data_stream(self, observation):
"""
Buffer ingests the data stream from the Ingest pipelines. the data
is what is added to the 'hot' buffer every timestep
That is - the observation.ingest_data_rate is a per-timestep value
Timestep is the current timeout (in this scenario, it's specsoified as
1 unit of time).
Parameters
----------
observation : core.Telescope.Observation object
The observation we are attempting to ingest
"""
b = observation.buffer_id
time_left = observation.duration - 1
if observation.status is RunStatus.WAITING:
raise RuntimeError(
"Observation must be marked RUNNING before ingest begins!"
)
self._add_event(observation, "buffer", "added")
while observation.status == RunStatus.RUNNING:
self.hot[b].process_incoming_data_stream(
observation.ingest_data_rate,
self.env.now
)
observation.total_data_size += observation.ingest_data_rate
if time_left > 0:
time_left -= 1
else:
# observation.status = RunStatus.FINISHED
self.waiting_observation_list.append(observation)
self.hot[b].observations["stored"].append(observation)
curr_time = int(self.env.now)
self.stored_times.append(curr_time)
break
yield self.env.timeout(TIMESTEP)
def buffer_storage_summary(self):
"""
Provide other actors information on the capacity and rate details of
the respective buffers.
This is a nicely packaged method that avoids us interacting with the
values in the hot and cold buffers, which should not be accessed
outside the Buffer class.
Returns
-------
storage : dict
Dictionary comprising hot and cold buffer storage information
"""
if self.hot_count == 1 and self.cold_count == 1:
return {
'hotbuffer': {
'capacity': self.hot[0].current_capacity,
'data_rate': self.hot[0].max_ingest_data_rate
},
'coldbuffer': {
'capacity': self.cold[0].current_capacity,
'data_rate': self.cold[0].max_data_rate
}
}
else:
raise RuntimeError(f'Multi-buffer functionality not complete')
def is_empty(self):
"""
Determine if both buffers are cleared of all observational data
Returns
-------
True if both buffers' current capacity is the same as their total
capacity.
"""
for buf in self.hot:
if self.hot[buf].total_capacity != self.hot[buf].current_capacity:
return False
for buf in self.cold:
if self.cold[buf].total_capacity != self.cold[buf].current_capacity:
return False
return True
def to_df(self):
"""
Return a pandas.DataFrame that represents the state of the
Buffer and its attributes at the current timestep
Returns
-------
current_state : pandas.DataFrame()
A DataFrame (1xn) table of the current state of the Buffers.
"""
current_state = pd.DataFrame()
current_state['hot_buffer'] = [self.hot[0].current_capacity]
current_state['cold_buffer'] = [self.cold[0].current_capacity]
current_state['stored'] = [
len(self.cold[0].observations['stored'])
+ len(self.hot[0].observations['stored'])
]
return current_state
def _add_event(self, observation, resource, event):
self.events.append(
{
"time": int(self.env.now), "actor": "buffer",
"observation": str(observation.name), "event": str(event),
"resource": str(resource)
}
)
[docs]
class HotBuffer:
"""
HotBuffer represents the ingest-facing part of the Buffer. Observation
data is intended to stay in the HotBuffer only temporarily, and ideally
is moved to the ColdBuffer as soon as possible.
Transition to the ColBuffer is not instantaneous; rather it depends on
the data rate supported by the ColdBuffer, which may be defined
differently to the HotBuffer based on the Buffer config JSON.
"""
def __init__(self, capacity, max_ingest_data_rate):
self.total_capacity = capacity
self.current_capacity = self.total_capacity
self.max_ingest_data_rate = max_ingest_data_rate
self.stored_observations = []
self.observations = {
"stored": [],
"transfer": None,
"scheduled": [],
"finished": []
}
def has_capacity_for(self, observation_size):
"""
Check if the HotBuffer has capacity (checks self.current_capacity).
Preferred approach over accessing self.current_capacity
We also need to check that the combined observation size of the capacity
Parameters
----------
observation_size : int
The size of the observation
Returns
-------
True
If there is capacity
False
Otherwise.
"""
size = observation_size
if self.observations['transfer']:
size = observation_size + self.observations[
'transfer'].total_data_size
return (
self.current_capacity - size >= 0
)
def has_waiting_observations(self):
"""
Check if there are observations in the buffer to transfer to cold
buffer.
Returns
-------
"""
return bool(self.observations['stored'])
def process_incoming_data_stream(self, incoming_datarate, time):
"""
During Ingest, the buffer will coordinate the incoming data from
the observation. This is a sanity check function to make sure the
hot buffer has capacity to accept the incoming data.
Parameters
-----------
incoming_datarate: The amount of data-per-timestep the telescope
is producing
Returns
-----------
True if the data can be processed - false if it cannot
"""
if int(incoming_datarate) > self.max_ingest_data_rate:
raise ValueError(
'Incoming data rate {0} exceeds maximum.'.format(
incoming_datarate)
)
self.current_capacity -= incoming_datarate
# LOGGER.debug("Current HotBuffer capacity is %s @ %s",
# self.current_capacity, time)
return self.current_capacity
def transfer_observation(self, observation, transfer_rate, residual_data):
"""
Parameters
----------
Returns
-------
residual_data : int
The amount of data left to transfer
"""
if self.observations['transfer'] is None:
self.observations['transfer'] = observation
# We are doing a 'real-time' simulation, which means we treat the hot
# and cold buffers as one buffer.
if transfer_rate < 0: # 'Real Time': CB is an extension of HB
self.current_capacity += observation.total_data_size
residual_data -= observation.total_data_size
elif residual_data < transfer_rate:
self.current_capacity += residual_data
residual_data = 0
else:
self.current_capacity += transfer_rate
residual_data -= transfer_rate
if residual_data == 0:
self.observations['transfer'] = None
return residual_data
def receive_observation(self, observation, residual_data, data_rate):
"""
For an observation that needs to be moved to ColdBuffer storage,
we must 'receive' it.
Parameters
----------
observation : topsim.core.telescope.Observation
The observation to be stored
residual_data : int
How much data is left to transfer
Returns
-------
residual_data
Decremented value of residual_data by the data_rate of ColdBuffer
"""
self.observations['transfer'] = observation
if data_rate > 0:
if residual_data < data_rate:
self.current_capacity -= residual_data
residual_data = 0
else:
self.current_capacity -= data_rate
residual_data -= data_rate
else:
self.current_capacity -= observation.total_data_size
residual_data -= observation.total_data_size
if residual_data == 0:
self.observations['transfer'] = None
print('Added to hotbuffer')
self.observations['stored'].append(observation)
return residual_data
def observation_for_transfer(self):
self.observations['transfer'] = self.observations["stored"].pop()
return self.observations['transfer']
def has_stored_observations(self):
"""
Hides the dictionary nature of observations stored in the Buffer.
Returns
-------
True if there are observations in self.observations['stored']
"""
return len(self.observations['stored']) > 0
def next_observation_for_processing(self):
"""
Produces the next observation without removal
Returns
-------
topsim.core.telescope.Observation object
"""
if len(self.observations['stored']) > 0:
self.observations['scheduled'].append(self.observations['stored'].pop())
return self.observations['scheduled'][-1]
def remove(self, observation):
"""
Removes an observation from the cold buffer and updates the capacity
accordingly
Parameters
----------
observation : topsim.core.telescope.Observation
The observation that is to be removed
Returns
-------
True if observation is stored and remove successfully
False otherwise
"""
if observation in self.observations['scheduled']:
self.current_capacity += observation.total_data_size
self.observations['finished'].append(observation)
self.observations['scheduled'].remove(observation)
return True
return False
[docs]
class ColdBuffer:
"""
The ColdBuffer takes data from the hot buffer for use in workflow
processing
"""
def __init__(self, capacity, max_data_rate):
self.total_capacity = capacity
self.current_capacity = self.total_capacity
self.max_data_rate = max_data_rate
self.next_obs = 0
self.observations = {
'stored': [],
'transfer': None
}
def has_capacity_for(self, observation_size):
"""
Check if the ColdBuffer has capacity (checks self.current_capacity).
Preferred approach over accessing self.current_capacity
We also need to check that the combined observation size of the capacity
Parameters
----------
observation_size : int
The size of the observation
Returns
-------
True
If there is capacity
False
Otherwise.
"""
size = observation_size
if self.observations['transfer']:
size = observation_size + self.observations['transfer'].total_data_size
# + sum([o.total_data_size for o in self.observations['transfer']])
# )
return (
self.current_capacity - size >= 0
)
def transfer_observation(self, observation, transfer_rate, residual_data):
"""
Parameters
----------
Returns
-------
residual_data : int
The amount of data left to transfer
"""
if not self.observations['transfer']:
self.observations['transfer'] = observation
# We are doing a 'real-time' simulation, which means we treat the hot
# and cold buffers as one buffer.
if transfer_rate < 0: # 'Real Time': CB is an extension of HB
self.current_capacity += observation.total_data_size
residual_data -= observation.total_data_size
elif residual_data < transfer_rate:
self.current_capacity += residual_data
residual_data = 0
else:
self.current_capacity += transfer_rate
residual_data -= transfer_rate
if residual_data == 0:
self.observations['transfer'] = None
return residual_data
def receive_observation(self, observation, residual_data):
"""
For an observation that needs to be moved to ColdBuffer storage,
we must 'receive' it.
Parameters
----------
observation : topsim.core.telescope.Observation
The observation to be stored
residual_data : int
How much data is left to transfer
Returns
-------
residual_data
Decremented value of residual_data by the data_rate of ColdBuffer
"""
self.observations['transfer'] = observation
if self.max_data_rate > 0:
if residual_data < self.max_data_rate:
self.current_capacity -= residual_data
residual_data = 0
else:
self.current_capacity -= self.max_data_rate
residual_data -= self.max_data_rate
else:
self.current_capacity -= observation.total_data_size
residual_data -= observation.total_data_size
if residual_data == 0:
self.observations['transfer'] = None
self.observations['stored'].append(observation)
return residual_data
def has_stored_observations(self):
"""
Hides the dictionary nature of observations stored in the Buffer.
Returns
-------
True if there are observations in self.observations['stored']
"""
return len(self.observations['stored']) > 0
def observation_for_transfer(self):
self.observations['transfer'] = self.observations["stored"].pop()
return self.observations['transfer']
def next_observation_for_processing(self):
"""
Produces the next observation without removal
Returns
-------
topsim.core.telescope.Observation object
"""
if len(self.observations['stored']) >= self.next_obs:
return self.observations['stored'][-1]
else:
obs = self.observations['stored'][self.next_obs]
self.next_obs += 1
return obs
def remove(self, observation):
"""
Removes an observation from the cold buffer and updates the capacity
accordingly
Parameters
----------
observation : topsim.core.telescope.Observation
The observation that is to be removed
Returns
-------
True if observation is stored and remove successfully
False otherwise
"""
if observation in self.observations['stored']:
self.current_capacity += observation.total_data_size
self.observations['stored'].remove(observation)
return True
return False