Source code for jobflow_remote.utils.examples

from random import randint
from typing import NoReturn

from jobflow import Flow, Response, job


[docs] @job def add(a, b): """A Job adding to numbers.""" return a + b
[docs] @job def add_many(*to_sum): """A job adding numbers.""" return sum(to_sum)
[docs] @job def sleep(s): """A Job sleeping.""" import time time.sleep(s) return s
[docs] @job def add_raise(a, b) -> NoReturn: """A Job raising a RuntimeError.""" raise RuntimeError("An error for a and b")
[docs] @job def make_list(a, length=None): """A Job generating a list of numbers.""" if not length: length = randint(2, 5) return [a] * length
[docs] @job def add_distributed(list_a): """A Job generating a new Flow to add a list of numbers.""" jobs = [add(val, 1) for val in list_a] flow = Flow(jobs) return Response(replace=flow)
[docs] @job def value(n): """A Job returning the input value.""" return n
[docs] @job def conditional_sum_replace(numbers, min_n=10): """A Job creating a replace and adding number until a value is reached.""" tot = sum(numbers) if tot < min_n: j1 = value(tot) j2 = value(5) c = conditional_sum_replace([j1.output, j2.output], min_n=min_n) return Response(replace=Flow([j1, j2, c], output=c.output)) return tot