Source code for topsim.core.planner

import sys
import logging
import copy

import networkx as nx

from enum import Enum

LOGGER = logging.getLogger(__name__)


class WorkflowStatus(int, Enum):
    """
    Status enums for a WorkflowPlan object
    """
    UNSCHEDULED = 1
    SCHEDULED = 2
    ON_TIME = 3
    DELAYED = 4
    FINISHED = 5


[docs] class Planner: """ The Planner is our interface with static scheduling algorithms. It provides an interface to other libraries and selects the library based on the provided algorithms based to the _init_. Currently, the SHADOW library is the only library that the Planner is aligned with; this may change in the future. Parameters ---------- env : simpy.Environment Simulation environment object cluster : ~topsim.core.cluster.Cluster The cluster of the simulation; necessary to pass to static scheduling algorithms model : topsim.algorithm.planner.Planner The implemented planning model that is run by this actor delay_model: topsim.core.delay.DelayModel The delaymodel object, to assign to each Workflow Plan task. """ def __init__(self, env, cluster, model, use_task_data, use_edge_data, delay_model=None): self.env = env #: :py:object:~`simpy.Environment` object for the # simulation self.cluster = cluster # self.envconfig = envconfig self.model = model # algorithm, cluster, delay_model) self.delay_model = delay_model self.use_task_data = use_task_data self.use_edge_data = use_edge_data def run(self, observation, buffer, max_ingest): """ Parameters ---------- max_ingest observation : The observation for which we are generating a plan (by forming a schedule using the predefined static scheduling algorithm). buffer Returns ------- core.topsim.planner.WorkflowPlan """ return self.model.generate_plan(self.env.now, self.cluster, buffer, observation, max_ingest, self.use_task_data, self.use_edge_data)
# yield self.env.timeout(0,plan)
[docs] class WorkflowPlan: """ WorkflowPlans are used within the Planner, Scheduler, and Cluster. They are higher-level than the shadow library representation, as they are a storage component of scheduled tasks, rather than directly representing the DAG nature of the workflow. This is why the tasks are stored in queues. Parameters ---------- """ def __init__(self, id, est, eft, tasks, exec_order, status, max_ingest, graph=None): self.id = id self.est = est self.eft = eft self.tasks = tasks self.finished_tasks = [] self.exec_order = exec_order self.status = status self.graph = graph self.min_resources = None self.max_resources = None self.priority = None def __lt__(self, other): return self.priority < other.priority def __eq__(self, other): return self.priority == other.priority def __gt__(self, other): return self.priority > other.priority def set_workflow_status(self, status): """ Update workflow status Parameters ---------- status Returns ------- """ self.status = status def is_finished(self): """ Check if the workflow has been marked as finished Returns ------- """ return self.status == WorkflowStatus.FINISHED def get_task_successors(self, task_id): return self.graph.successors(task_id) def get_task_predecessors(self, task_id): return self.graph.successors(task_id) def get_data_cost(self, task_u, task_v): pass