Source code for jobflow_remote.utils.examples
from random import randint
from typing import NoReturn
from jobflow import Flow, Response, job
[docs]
@job
def add(a, b):
"""A Job adding to numbers."""
return a + b
[docs]
@job
def add_many(*to_sum):
"""A job adding numbers."""
return sum(to_sum)
[docs]
@job
def sleep(s):
"""A Job sleeping."""
import time
time.sleep(s)
return s
[docs]
@job
def add_raise(a, b) -> NoReturn:
"""A Job raising a RuntimeError."""
raise RuntimeError("An error for a and b")
[docs]
@job
def make_list(a, length=None):
"""A Job generating a list of numbers."""
if not length:
length = randint(2, 5)
return [a] * length
[docs]
@job
def add_distributed(list_a):
"""A Job generating a new Flow to add a list of numbers."""
jobs = [add(val, 1) for val in list_a]
flow = Flow(jobs)
return Response(replace=flow)
[docs]
@job
def value(n):
"""A Job returning the input value."""
return n
[docs]
@job
def conditional_sum_replace(numbers, min_n=10):
"""A Job creating a replace and adding number until a value is reached."""
tot = sum(numbers)
if tot < min_n:
j1 = value(tot)
j2 = value(5)
c = conditional_sum_replace([j1.output, j2.output], min_n=min_n)
return Response(replace=Flow([j1, j2, c], output=c.output))
return tot