Source code for atomate2.turbomole.jobs.base
"""Definition of base turbomole job makers."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
import turbomoleio.output.files
from custodian import Custodian
from jobflow import Maker, Response, job
from atomate2.turbomole.custodian.jobs import TMJob
from atomate2.turbomole.schemas.task import TaskDocument
from atomate2.turbomole.sets.base import (
BaseTurbomoleInputGenerator,
)
logger = logging.getLogger(__name__)
__all__ = ["BaseTurbomoleMaker"]
[docs]
@dataclass
class BaseTurbomoleMaker(Maker):
"""Base Turbomole job maker."""
input_set_generator: BaseTurbomoleInputGenerator
tm_exec: str
name: str = "base turbomole job"
command_options: list = field(default_factory=list)
handlers: list = field(default_factory=list)
validators: list = field(default_factory=list)
output_cls_str: str = "ScfOutput"
def __post_init__(self):
"""Set the output_cls based on the output_cls_str."""
self.output_cls = getattr(turbomoleio.output.files, self.output_cls_str)
[docs]
@job
def make(self, prev_output=None):
"""Create the dscf job."""
self.input_set_generator.get_input_set(prev_output=prev_output)
command_options = self.get_command_options()
custodian_job = TMJob(executable=self.tm_exec, options=command_options)
custodian = Custodian(
handlers=self.handlers,
validators=self.validators,
jobs=[custodian_job],
)
custodian.run()
# Parse the outputs
if self.tm_exec == "jobex":
output_file = "job.last"
else:
output_file = custodian_job.output_file
doc = TaskDocument.from_directory(
".", output_file=output_file, output_cls=self.output_cls
)
return Response(output=doc)
[docs]
def get_command_options(self):
"""Get the options for the Turbomole executable."""
return self.command_options