Source code for jobflow_remote.testing
"""A series of toy workflows that can be used for testing."""
from dataclasses import dataclass
from enum import Enum
from typing import Callable, NoReturn, Optional, Union
from jobflow import Job, JobConfig, Maker, OnMissing, Response, job
[docs]
@job
def add(a, b):
"""Adds two numbers together and writes the answer to a file."""
return a + b
[docs]
@job
def always_fails() -> NoReturn:
"""A job that always fails."""
raise RuntimeError("This job failed.")
[docs]
@job
def write_file(n) -> None:
with open("results.txt", "w") as f:
f.write(str(n))
[docs]
@job
def arithmetic(
a: Union[float, list[float]],
b: Union[float, list[float]],
op: Optional[Callable] = None,
) -> Optional[float]:
if op:
return op(a, b)
return None
[docs]
@job
def check_env_var() -> str:
import os
return os.environ.get("TESTING_ENV_VAR", "unset")
[docs]
@job(big_data="data")
def add_big(a: float, b: float):
"""Adds two numbers together and inflates the answer
to a large list and tries to store that within
the defined store.
"""
result = a + b
big_array = [result] * 5_000
return Response({"data": big_array, "result": a + b})
[docs]
@job(undefined_store="data")
def add_big_undefined_store(a: float, b: float):
"""Adds two numbers together and writes the answer to an artificially large file
which is attempted to be stored in a undefined store.
"""
result = a + b
return Response({"data": [result] * 5_000, "result": result})
[docs]
@job
def add_sleep(a, b):
"""Adds two numbers together and sleeps for "b" seconds."""
import time
time.sleep(b)
return a + b
[docs]
@job
def create_detour(detour_job: Job):
"""Create a detour based on the passed Job."""
from jobflow import Flow
return Response(detour=Flow(detour_job))
[docs]
@job
def self_replace(n: int):
"""Create a replace Job with the same job n times."""
if n > 0:
return Response(replace=self_replace(n - 1))
return n
[docs]
@job
def current_jobdoc():
from jobflow_remote.jobs.run import CURRENT_JOBDOC
return CURRENT_JOBDOC.job_doc
[docs]
@job(config=JobConfig(resolve_references=False, on_missing_references=OnMissing.NONE))
def no_resolve(ref):
"""
A job that does not resolve the reference in input
"""
return ref
[docs]
class TestEnum(Enum):
A = "A"
B = "B"
[docs]
@dataclass
class EnumMaker(Maker):
e: TestEnum = TestEnum.A
name: str = "enum maker"
[docs]
@job
def make(self):
assert isinstance(self.e, TestEnum)