Source code for jobflow_remote.jobs.report

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING

from jobflow_remote.jobs.data import JobInfo, projection_job_info
from jobflow_remote.jobs.state import FlowState, JobState

if TYPE_CHECKING:
    from jobflow_remote import JobController


[docs] @dataclass class JobTrends: """ Trends of job states over time. """ interval: str dates: list[str] completed: list[int] failed: list[int] remote_error: list[int] timezone: str @property def num_intervals(self) -> int: return len(self.dates)
[docs] @dataclass class JobsReport: """ A report of the job states. """ state_counts: dict[JobState, int] = field(default_factory=dict) trends: JobTrends | None = None longest_running: list[JobInfo] = field(default_factory=list) worker_utilization: dict[str, int] = field(default_factory=dict) @property def running(self) -> int: """Returns the count of running jobs.""" return self.state_counts.get(JobState.RUNNING, 0) @property def completed(self) -> int: """Returns the count of completed jobs.""" return self.state_counts.get(JobState.COMPLETED, 0) @property def error(self) -> int: """Returns the sum of failed, remote error, and paused jobs (i.e., error states).""" return sum( self.state_counts.get(state, 0) for state in [JobState.FAILED, JobState.REMOTE_ERROR] ) @property def active(self) -> int: """Returns the sum of failed, remote error, and paused jobs (i.e., error states).""" return sum( self.state_counts.get(state, 0) for state in [ JobState.CHECKED_OUT, JobState.UPLOADED, JobState.UPLOADED, JobState.SUBMITTED, JobState.RUNNING, JobState.TERMINATED, JobState.DOWNLOADED, JobState.BATCH_SUBMITTED, JobState.BATCH_RUNNING, ] )
[docs] @classmethod def generate_report( cls, job_controller: JobController, interval: str = "days", num_intervals: int | None = None, timezone: str = "UTC", ) -> JobsReport: """ Generates a report of the job states. Parameters ---------- job_controller The JobController instance to generate the report from. interval The interval of the trends for the report. num_intervals The number of intervals to consider. timezone The timezone to use for the report. Returns ------- JobsReport A report of the job states. """ now = datetime.utcnow() state_counts = job_controller.count_jobs_states(list(JobState)) # Job trends over time states_trends = [JobState.COMPLETED, JobState.FAILED, JobState.REMOTE_ERROR] trends_dict = job_controller.get_trends( states=states_trends, interval=interval, num_intervals=num_intervals, interval_timezone=timezone, ) trends_dates = sorted(trends_dict) trends = JobTrends( interval=interval, dates=trends_dates, completed=[trends_dict[d][JobState.COMPLETED] for d in trends_dates], failed=[trends_dict[d][JobState.FAILED] for d in trends_dates], remote_error=[trends_dict[d][JobState.REMOTE_ERROR] for d in trends_dates], timezone=timezone, ) # Longest running jobs (Top 5) projection_longest: dict = {k: 1 for k in projection_job_info} projection_longest["duration"] = {"$subtract": [now, "$start_time"]} pipeline_longest = [ {"$match": {"state": "RUNNING"}}, {"$project": projection_longest}, {"$sort": {"duration": -1}}, {"$limit": 5}, ] longest_running_result = job_controller.jobs.aggregate(pipeline_longest) longest_running = [ JobInfo.from_query_output(doc) for doc in longest_running_result ] # Worker utilization (number of jobs assigned to each worker) pipeline_worker_utilization = [ {"$group": {"_id": "$worker", "count": {"$sum": 1}}}, {"$sort": {"count": -1}}, ] worker_utilization_result = job_controller.jobs.aggregate( pipeline_worker_utilization ) worker_utilization = { doc["_id"]: doc["count"] for doc in worker_utilization_result } return cls( state_counts=state_counts, trends=trends, longest_running=longest_running, worker_utilization=worker_utilization, )
[docs] @dataclass class FlowTrends: """ Trends of flow states over time. """ interval: str dates: list[str] completed: list[int] failed: list[int] timezone: str @property def num_intervals(self) -> int: return len(self.dates)
[docs] @dataclass class FlowsReport: """ A report of the flow states. """ state_counts: dict[FlowState, int] trends: FlowTrends @property def running(self) -> int: """Returns the count of running flows.""" return self.state_counts.get(FlowState.RUNNING, 0) @property def completed(self) -> int: """Returns the count of completed flows.""" return self.state_counts.get(FlowState.COMPLETED, 0) @property def error(self) -> int: """Returns the count of failed flows.""" return self.state_counts.get(FlowState.FAILED, 0)
[docs] @classmethod def generate_report( cls, job_controller: JobController, interval: str = "days", num_intervals: int = None, timezone: str = "UTC", ): """ Generates a report of the flow states. Parameters ---------- job_controller The JobController instance to generate the report from. interval The interval of the trends for the report. num_intervals The number of intervals to consider. timezone The timezone to use for the report. Returns ------- FlowsReport A report of the flow states. """ state_counts = job_controller.count_flows_states(list(FlowState)) trends_dict = job_controller.get_trends( states=[FlowState.COMPLETED, FlowState.FAILED], interval=interval, num_intervals=num_intervals, interval_timezone=timezone, ) trends_dates = sorted(trends_dict) trends = FlowTrends( interval=interval, dates=trends_dates, completed=[trends_dict[d][FlowState.COMPLETED] for d in trends_dates], failed=[trends_dict[d][FlowState.FAILED] for d in trends_dates], timezone=timezone, ) # Create report instance return cls(state_counts=state_counts, trends=trends)