Source code for jobflow_remote.jobs.state
from __future__ import annotations
from enum import Enum
[docs]
class JobState(Enum):
"""States of a Job."""
WAITING = "WAITING"
READY = "READY"
CHECKED_OUT = "CHECKED_OUT" # TODO should it be RESERVED?
UPLOADED = "UPLOADED"
SUBMITTED = "SUBMITTED"
RUNNING = "RUNNING"
TERMINATED = "TERMINATED"
DOWNLOADED = "DOWNLOADED"
REMOTE_ERROR = "REMOTE_ERROR"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
PAUSED = "PAUSED"
STOPPED = "STOPPED"
USER_STOPPED = "USER_STOPPED"
BATCH_SUBMITTED = "BATCH_SUBMITTED"
BATCH_RUNNING = "BATCH_RUNNING"
@property
def short_value(self) -> str:
return short_state_mapping[self]
short_state_mapping = {
JobState.WAITING: "W",
JobState.READY: "R",
JobState.CHECKED_OUT: "CE",
JobState.UPLOADED: "U",
JobState.SUBMITTED: "SU",
JobState.RUNNING: "RU",
JobState.TERMINATED: "T",
JobState.DOWNLOADED: "D",
JobState.REMOTE_ERROR: "RERR",
JobState.COMPLETED: "C",
JobState.FAILED: "F",
JobState.PAUSED: "P",
JobState.STOPPED: "ST",
JobState.USER_STOPPED: "CA",
JobState.BATCH_SUBMITTED: "BS",
JobState.BATCH_RUNNING: "BR",
}
PAUSABLE_STATES = [
JobState.READY,
JobState.WAITING,
]
PAUSABLE_STATES_V = [s.value for s in PAUSABLE_STATES]
RUNNING_STATES = [
JobState.CHECKED_OUT,
JobState.UPLOADED,
JobState.SUBMITTED,
JobState.RUNNING,
JobState.TERMINATED,
JobState.DOWNLOADED,
]
RUNNING_STATES_V = [s.value for s in RUNNING_STATES]
RESETTABLE_STATES = RUNNING_STATES
RESETTABLE_STATES_V = RUNNING_STATES_V
DELETABLE_STATES = [
s for s in JobState if s not in [JobState.BATCH_SUBMITTED, JobState.BATCH_RUNNING]
]
[docs]
class FlowState(Enum):
"""States of a Flow."""
WAITING = "WAITING"
READY = "READY"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
PAUSED = "PAUSED"
STOPPED = "STOPPED"
[docs]
@classmethod
def from_jobs_states(
cls, jobs_states: list[JobState], leaf_states: list[JobState]
) -> FlowState:
"""
Generate the state of the Flow based on the states of the Jobs
composing it, and in particular the states of the leaf Jobs.
Parameters
----------
jobs_states
List of JobStates of all the Jobs in the Flow.
leaf_states
List of JobStates of the leaf Jobs in the Flow.
Returns
-------
FlowState
The state of the Flow.
"""
if all(js == JobState.WAITING for js in jobs_states):
return cls.WAITING
if all(js in (JobState.WAITING, JobState.READY) for js in jobs_states):
return cls.READY
# only need to check the leaf states to determine if it is completed,
# in case some intermediate Job failed but children allow missing
# references.
if all(js == JobState.COMPLETED for js in leaf_states):
return cls.COMPLETED
# REMOTE_ERROR state does not lead to a failed Flow. Two main reasons:
# 1) it might be a temporary problem and not a final failure of the Flow
# 2) Changing the state of the flow would require locking the Flow
# when applying the change in the remote state.
if any(js == JobState.FAILED for js in jobs_states):
return cls.FAILED
if any(js in (JobState.STOPPED, JobState.USER_STOPPED) for js in jobs_states):
return cls.STOPPED
if any(js == JobState.PAUSED for js in jobs_states):
return cls.PAUSED
return cls.RUNNING