diff --git a/docs/reference/backends.rst b/docs/reference/backends.rst index e77edb3f..be91e9d4 100644 --- a/docs/reference/backends.rst +++ b/docs/reference/backends.rst @@ -8,7 +8,7 @@ backend that suits your needs here, it's easy to write your own backend `. The source code for the built-in backends is a great source of inspiration. -By default, *gwf* comes with the `local`, `slurm`, and `sge` backends. +By default, *gwf* comes with the `local`, `slurm`, `sge`, and `lsf` backends. Local ----- @@ -31,3 +31,10 @@ Sun Grid Engine (SGE) .. automodule:: gwf.backends.sge + + +IBM Spectrum Load Sharing Facility (LSF) +---------------------------------------- + +.. automodule:: + gwf.backends.lsf diff --git a/pyproject.toml b/pyproject.toml index a8590c35..5f9ce45d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ workers = "gwf.plugins.workers:workers" local = "gwf.backends.local:setup" sge = "gwf.backends.sge:setup" slurm = "gwf.backends.slurm:setup" +lsf = "gwf.backends.lsf:setup" [project.urls] Home = "https://gwf.app" diff --git a/src/gwf/backends/lsf.py b/src/gwf/backends/lsf.py new file mode 100644 index 00000000..e8c7025a --- /dev/null +++ b/src/gwf/backends/lsf.py @@ -0,0 +1,146 @@ +"""Backend for IBM Load Sharing Facility (LSF). + +To use this backend, you must activate the `lsf` backend. This backend requires +the commands `bsub` and `bjobs`. + +**Backend options:** + +None. + +**Target options:** + +* **cores (int):** + Number of cores allocated to this target (default: 1). +* **memory (str):** + Memory allocated to this target (default: 4GB). +* **queue (str):** + Queue to submit the target to (default: normal). + for different purposes or priorities. + +""" + +import logging +import os.path +import re + +import attrs + +from ..utils import ensure_trailing_newline +from .base import TrackingBackend, BackendStatus +from .utils import call, has_exe + +logger = logging.getLogger(__name__) + +TARGET_DEFAULTS = { + "queue": "normal", + "memory": "4GB", + "cores": 1 +} + +BJOB_HEADER = '''#BSUB -M {memory} +#BSUB -R "select[mem>{memory}] rusage[mem={memory}] span[hosts=1]" +#BSUB -n {cores} +#BSUB -q {queue} +#BSUB -oo {std_out} +#BSUB -eo {std_err} +#BSUB -J {job_name}''' + +BJOB_STATES = { + "PEND": BackendStatus.SUBMITTED, + "WAIT": BackendStatus.SUBMITTED, + "RUN": BackendStatus.RUNNING, + "ZOMBI": BackendStatus.RUNNING, + "DONE": BackendStatus.COMPLETED, + "EXIT": BackendStatus.FAILED, + "PSUSP": BackendStatus.FAILED, + "USUSP": BackendStatus.FAILED, + "SSUSP": BackendStatus.FAILED, + "UNKWN": BackendStatus.UNKNOWN +} + + +@attrs.define +class LSFOps: + working_dir: str = attrs.field() + target_defaults: dict = attrs.field() + + def cancel_job(self, job_id): + logger.debug(f"Cancelling job { job_id }") + call("bkill", job_id) + + def submit_target(self, target, dependencies): + script = self.compile_script(target) + with open(os.path.join(self.working_dir, ".gwf", "logs", target.name + ".sh"), 'w') as f: + f.write(script) + args = [] + if dependencies: + args.append("-w") + args.append(" && ".join([f"done({job_id})" for job_id in dependencies])) + logger.debug(f"Submitting job { target.name } to LSF") + stdout = call("bsub", *args, input=script).strip() + job_id = re.search(r"Job <(\d+)>", stdout)[1] + return job_id + + def get_job_states(self, tracked_jobs): + logger.debug("Getting job states from LSF") + if not tracked_jobs: + return {} + job_states = {job_id: BackendStatus.UNKNOWN for job_id in tracked_jobs} + for job_id in tracked_jobs: + cmd = [ + 'bjobs', + '-noheader', + '-o', 'stat', + job_id + ] + ret = call(*cmd).strip() + if ret == '': + continue + state = BJOB_STATES[ret] + job_states[job_id] = state + return job_states + + def compile_script(self, target): + target_options = target.options + target_options["std_err"] = os.path.join( + self.working_dir, ".gwf", "logs", target.name + ".stderr" + ) + target_options["std_out"] = os.path.join( + self.working_dir, ".gwf", "logs", target.name + ".stdout" + ) + header = BJOB_HEADER + for name, value in target_options.items(): + header = header.replace(f"{{{name}}}", str(value)) + header = header.replace("{job_name}", target.name) + out = [] + out.append("#!/bin/bash") + out.append(header) + out.append("") + out.append("# Generated by: gwf") + out.append("") + out.append("cd {}".format(target.working_dir)) + out.append("set -e") + out.append("") + out.append(ensure_trailing_newline(target.spec)) + + return "\n".join(out) + + def close(self): + pass + + +def create_backend(working_dir): + return TrackingBackend( + working_dir, + name="lsf", + ops=LSFOps(working_dir, target_defaults=TARGET_DEFAULTS) + ) + + +def priority(): + if has_exe("bsub"): + return 100 + return -100 + + +setup = (create_backend, priority())