Source code for jobflow_remote.testing
"""A series of toy workflows that can be used for testing."""
from typing import Callable, NoReturn, Optional, Union
from jobflow import Job, Response, job
[docs]
@job
def add(a, b):
"""Adds two numbers together and writes the answer to a file."""
return a + b
[docs]
@job
def always_fails() -> NoReturn:
"""A job that always fails."""
raise RuntimeError("This job failed.")
[docs]
@job
def write_file(n) -> None:
with open("results.txt", "w") as f:
f.write(str(n))
[docs]
@job
def arithmetic(
a: Union[float, list[float]],
b: Union[float, list[float]],
op: Optional[Callable] = None,
) -> Optional[float]:
if op:
return op(a, b)
return None
[docs]
@job
def check_env_var() -> str:
import os
return os.environ.get("TESTING_ENV_VAR", "unset")
[docs]
@job(big_data="data")
def add_big(a: float, b: float):
"""Adds two numbers together and inflates the answer
to a large list and tries to store that within
the defined store.
"""
result = a + b
big_array = [result] * 5_000
return Response({"data": big_array, "result": a + b})
[docs]
@job(undefined_store="data")
def add_big_undefined_store(a: float, b: float):
"""Adds two numbers together and writes the answer to an artificially large file
which is attempted to be stored in a undefined store.
"""
result = a + b
return Response({"data": [result] * 5_000, "result": result})
[docs]
@job
def add_sleep(a, b):
"""Adds two numbers together and sleeps for "b" seconds."""
import time
time.sleep(b)
return a + b
[docs]
@job
def create_detour(detour_job: Job):
"""Create a detour based on the passed Job."""
from jobflow import Flow
return Response(detour=Flow(detour_job))
[docs]
@job
def self_replace(n: int):
"""Create a replace Job with the same job n times."""
if n > 0:
return Response(replace=self_replace(n - 1))
return n