Skip to content

Commit

Permalink
Added Error handling and fixed up awaits for slurm and htconder submi…
Browse files Browse the repository at this point in the history
…t and check functions
  • Loading branch information
eacharles committed Sep 18, 2024
1 parent 7298f03 commit d006294
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 30 deletions.
38 changes: 26 additions & 12 deletions src/lsst/cmservice/common/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any

from .enums import StatusEnum
from .errors import CMHTCondorSubmitError
from .errors import CMHTCondorCheckError, CMHTCondorSubmitError

htcondor_status_map = {
1: StatusEnum.running,
Expand All @@ -17,7 +17,7 @@
}


async def write_htcondor_script(
def write_htcondor_script(
htcondor_script: str,
htcondor_log: str,
script_url: str,
Expand Down Expand Up @@ -66,7 +66,7 @@ async def write_htcondor_script(
return htcondor_log


async def submit_htcondor_job(
def submit_htcondor_job(
htcondor_script: str,
) -> None:
"""Submit a `Script` to htcondor
Expand All @@ -84,13 +84,18 @@ async def submit_htcondor_job(
htcondor_script,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as sbatch:
assert sbatch.stdout
sbatch.wait()
if sbatch.returncode != 0:
assert sbatch.stderr
msg = sbatch.stderr.read().decode()
raise CMHTCondorSubmitError(f"Bad htcondor submit: {msg}")
except TypeError as msg:
raise CMHTCondorSubmitError(f"Bad htcondor submit: {msg}") from msg


async def check_htcondor_job(
def check_htcondor_job(
htcondor_id: str,
) -> StatusEnum:
"""Check the status of a `HTCondor` job
Expand All @@ -108,14 +113,23 @@ async def check_htcondor_job(
with subprocess.Popen(
["condor_q", "-userlog", htcondor_id, "-af", "JobStatus", "ExitCode"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as condor_q:
assert condor_q.stdout
lines = condor_q.stdout.read().decode().split("\n")
# condor_q puts an extra newline, so we use the second to the last line
tokens = lines[-2].split()
assert len(tokens) == 2
htcondor_status = int(tokens[0])
exit_code = tokens[1]
condor_q.wait()
if condor_q.returncode != 0:
assert condor_q.stderr
msg = condor_q.stderr.read().decode()
raise CMHTCondorCheckError(f"Bad htcondor check: {msg}")
try:
assert condor_q.stdout
lines = condor_q.stdout.read().decode().split("\n")
# condor_q puts an extra newline, we use second to the last line
tokens = lines[-2].split()
assert len(tokens) == 2
htcondor_status = int(tokens[0])
exit_code = tokens[1]
except Exception as msg:
raise CMHTCondorCheckError(f"Badly formatted htcondor check: {msg}")
status = htcondor_status_map[htcondor_status]
if status == StatusEnum.reviewable:
if int(exit_code) == 0:
Expand Down
36 changes: 25 additions & 11 deletions src/lsst/cmservice/common/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import subprocess

from .enums import StatusEnum
from .errors import CMSlurmSubmitError
from .errors import CMSlurmCheckError, CMSlurmSubmitError

slurm_status_map = {
"BOOT_FAIL": StatusEnum.failed,
Expand Down Expand Up @@ -33,7 +33,7 @@
}


async def submit_slurm_job(
def submit_slurm_job(
script_url: str,
log_url: str,
) -> str:
Expand Down Expand Up @@ -68,15 +68,21 @@ async def submit_slurm_job(
script_url,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as sbatch:
sbatch.wait()
if sbatch.returncode != 0:
assert sbatch.stderr
msg = sbatch.stderr.read().decode()
raise CMSlurmSubmitError(f"Bad slurm submit: {msg}")
assert sbatch.stdout
line = sbatch.stdout.read().decode().strip()
return line.split("|")[0]
except TypeError as msg:
raise CMSlurmSubmitError(f"Bad slurm submit: {msg}") from msg


async def check_slurm_job(
def check_slurm_job(
slurm_id: str | None,
) -> StatusEnum | None:
"""Check the status of a `Slurm` job
Expand All @@ -94,12 +100,20 @@ async def check_slurm_job(
if slurm_id is None:
return None
with subprocess.Popen(["sacct", "--parsable", "-b", "-j", slurm_id], stdout=subprocess.PIPE) as sacct:
assert sacct.stdout
lines = sacct.stdout.read().decode().split("\n")
if len(lines) < 2:
return slurm_status_map["PENDING"]
tokens = lines[1].split("|")
if len(tokens) < 2:
return slurm_status_map["PENDING"]
slurm_status = tokens[1]
sacct.wait()
if sacct.returncode != 0:
assert sacct.stderr
msg = sacct.stderr.read().decode()
raise CMSlurmCheckError(f"Bad slurm check: {msg}")
try:
assert sacct.stdout
lines = sacct.stdout.read().decode().split("\n")
if len(lines) < 2:
return slurm_status_map["PENDING"]
tokens = lines[1].split("|")
if len(tokens) < 2:
return slurm_status_map["PENDING"]
slurm_status = tokens[1]
except Exception as msg:
raise CMSlurmCheckError(f"Badly formatted slurm check: {msg}")
return slurm_status_map[slurm_status]
2 changes: 1 addition & 1 deletion src/lsst/cmservice/handlers/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def _check_slurm_job( # pylint: disable=unused-argument
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
slurm_status = await check_slurm_job(slurm_id)
slurm_status = check_slurm_job(slurm_id)
if slurm_status is None:
slurm_status = StatusEnum.running
if slurm_status == StatusEnum.accepted:
Expand Down
35 changes: 29 additions & 6 deletions src/lsst/cmservice/handlers/script_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
CMBadExecutionMethodError,
CMBadStateTransitionError,
CMBashSubmitError,
CMHTCondorCheckError,
CMHTCondorSubmitError,
CMMissingNodeUrlError,
CMMissingScriptInputError,
CMSlurmCheckError,
CMSlurmSubmitError,
)
from ..common.htcondor import check_htcondor_job, submit_htcondor_job, write_htcondor_script
Expand Down Expand Up @@ -62,14 +64,23 @@ async def process(
except (
CMBadExecutionMethodError,
CMMissingNodeUrlError,
) as msg:
_new_error = await ScriptError.create_row(
session,
script_id=node.id,
source=ErrorSourceEnum.cmservice,
diagnostic_message=msg,
)
status = StatusEnum.failed
except (
CMHTCondorSubmitError,
CMSlurmSubmitError,
CMBashSubmitError,
) as msg:
_new_error = await ScriptError.create_row(
session,
script_id=node.id,
source=ErrorSourceEnum.cmservice,
source=ErrorSourceEnum.local_script,
diagnostic_message=msg,
)
status = StatusEnum.failed
Expand All @@ -84,6 +95,18 @@ async def process(
diagnostic_message=msg,
)
status = StatusEnum.failed
except (
CMHTCondorCheckError,
CMSlurmCheckError,
) as msg:
_new_error = await ScriptError.create_row(
session,
script_id=node.id,
source=ErrorSourceEnum.local_script,
diagnostic_message=msg,
)
status = StatusEnum.failed

if status == StatusEnum.reviewable:
status = await self.review(session, node, parent)
if status != orig_status:
Expand Down Expand Up @@ -351,7 +374,7 @@ async def _check_slurm_job( # pylint: disable=unused-argument
status : StatusEnum
The status of the processing
"""
status = await check_slurm_job(slurm_id)
status = check_slurm_job(slurm_id)
print(f"Getting status for {script.fullname} {status}")
if status is None:
status = StatusEnum.running
Expand Down Expand Up @@ -387,7 +410,7 @@ async def _check_htcondor_job( # pylint: disable=unused-argument
status : StatusEnum
The status of the processing
"""
status = await check_htcondor_job(htcondor_id)
status = check_htcondor_job(htcondor_id)
print(f"Getting status for {script.fullname} {status}")
if status != script.status:
await script.update_values(session, status=status)
Expand Down Expand Up @@ -446,7 +469,7 @@ async def launch(
raise CMMissingNodeUrlError(f"script_url is not set for {script}")
if not script.log_url:
raise CMMissingNodeUrlError(f"log_url is not set for {script}")
job_id = await submit_slurm_job(script.script_url, script.log_url)
job_id = submit_slurm_job(script.script_url, script.log_url)
status = StatusEnum.running
await script.update_values(session, stamp_url=job_id, status=status)
elif script_method == ScriptMethodEnum.htcondor: # pragma: no cover
Expand All @@ -457,13 +480,13 @@ async def launch(
job_id_base = os.path.abspath(os.path.splitext(script.script_url)[0])
htcondor_script = f"{job_id_base}.sub"
htcondor_log = f"{job_id_base}.condorlog"
await write_htcondor_script(
write_htcondor_script(
htcondor_script,
htcondor_log,
os.path.abspath(script.script_url),
os.path.abspath(script.log_url),
)
await submit_htcondor_job(htcondor_script)
submit_htcondor_job(htcondor_script)
status = StatusEnum.running
await script.update_values(session, stamp_url=htcondor_log, status=status)
else:
Expand Down

0 comments on commit d006294

Please sign in to comment.