diff --git a/PILOTVERSION b/PILOTVERSION index bbafcf0b..e3b1f212 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.9.0.17 \ No newline at end of file +3.9.1.13 \ No newline at end of file diff --git a/pilot/control/job.py b/pilot/control/job.py index d9b4d1fa..506b0ffc 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -470,7 +470,7 @@ def send_state(job: Any, args: Any, state: str, xml: str = "", metadata: str = " return False -def get_job_status_from_server(job_id: int, url: str, port: str) -> (str, int, int): +def get_job_status_from_server(job_id: int, url: str, port: int) -> (str, int, int): """ Return the current status of job from the dispatcher. @@ -482,7 +482,7 @@ def get_job_status_from_server(job_id: int, url: str, port: str) -> (str, int, i In the case of time-out, the dispatcher will be asked one more time after 10 s. :param job_id: PanDA job id (int) - :param url: PanDA server URL (str + :param url: PanDA server URL (int) :param port: PanDA server port (str) :return: status (string; e.g. holding), attempt_nr (int), status_code (int). """ @@ -1512,10 +1512,6 @@ def get_dispatcher_dictionary(args: Any, taskid: str = "") -> dict: if 'HARVESTER_WORKER_ID' in os.environ: data['worker_id'] = os.environ.get('HARVESTER_WORKER_ID') -# instruction_sets = has_instruction_sets(['AVX', 'AVX2']) -# if instruction_sets: -# data['cpuConsumptionUnit'] = instruction_sets - return data diff --git a/pilot/control/monitor.py b/pilot/control/monitor.py index f2110c18..9c92c9ec 100644 --- a/pilot/control/monitor.py +++ b/pilot/control/monitor.py @@ -89,6 +89,9 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 last_minute_check = t_0 queuedata = get_queuedata_from_job(queues) + if not queuedata: + logger.warning('queuedata could not be extracted from queues') + push = args.harvester and args.harvester_submitmode.lower() == 'push' try: # overall loop counter (ignoring the fact that more than one job may be running) @@ -103,7 +106,7 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901 break # check if the OIDC token needs to be refreshed - if tokendownloadchecktime: + if tokendownloadchecktime and queuedata: if int(time.time() - last_token_check) > tokendownloadchecktime: last_token_check = time.time() if 'no_token_renewal' in queuedata.catchall: diff --git a/pilot/user/atlas/proxy.py b/pilot/user/atlas/proxy.py index 011dd5af..b6defea9 100644 --- a/pilot/user/atlas/proxy.py +++ b/pilot/user/atlas/proxy.py @@ -31,7 +31,10 @@ # from pilot.user.atlas.setup import get_file_system_root_path from pilot.common.errorcodes import ErrorCodes -from pilot.util.container import execute +from pilot.util.container import ( + execute, + execute_nothreads +) from pilot.util.proxy import get_proxy logger = logging.getLogger(__name__) @@ -134,7 +137,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo if not hasattr(verify_arcproxy, "cache"): verify_arcproxy.cache = {} - if proxy_id in verify_arcproxy.cache: # if exist, then calculate result from current cache + if proxy_id in verify_arcproxy.cache: # if exists, then calculate result from current cache validity_end_cert = verify_arcproxy.cache[proxy_id][0] validity_end = verify_arcproxy.cache[proxy_id][1] if validity_end < 0: # previous validity check failed, do not try to re-check @@ -162,7 +165,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo _exit_code, _, _ = execute(cmd, shell=True) # , usecontainer=True, copytool=True) cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft" - _exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True) + _exit_code, stdout, stderr = execute_nothreads(cmd, shell=True) # , usecontainer=True, copytool=True) if stdout is not None: if 'command not found' in stdout: logger.warning(f"arcproxy is not available on this queue," @@ -243,7 +246,7 @@ def verify_vomsproxy(envsetup: str, limit: int) -> tuple[int, str]: if os.environ.get('X509_USER_PROXY', '') != '': cmd = f"{envsetup}voms-proxy-info -actimeleft --timeleft --file $X509_USER_PROXY" logger.info(f'executing command: {cmd}') - _exit_code, stdout, stderr = execute(cmd, shell=True) + _exit_code, stdout, stderr = execute_nothreads(cmd, shell=True) if stdout is not None: if "command not found" in stdout: logger.info("skipping voms proxy check since command is not available") diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index 27d91384..3b087d01 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -32,7 +32,10 @@ NoSuchFile ) from pilot.info.jobdata import JobData -from pilot.util.container import execute +from pilot.util.container import ( + execute, + execute_nothreads +) from pilot.util.filehandling import ( read_json, copy, @@ -830,7 +833,7 @@ def filter_output(stdout): # CPU arch script has now been copied, time to execute it # (reset irrelevant stderr) - ec, stdout, stderr = execute(cmd) + ec, stdout, stderr = execute_nothreads(cmd) if ec == 0 and ('RHEL9 and clone support is relatively new' in stderr or 'RHEL8 and clones are not supported for users' in stderr): stderr = '' diff --git a/pilot/util/auxiliary.py b/pilot/util/auxiliary.py index 4ab06b2c..7c5e3146 100644 --- a/pilot/util/auxiliary.py +++ b/pilot/util/auxiliary.py @@ -17,7 +17,7 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24 """Auxiliary functions.""" @@ -33,6 +33,7 @@ from numbers import Number from time import sleep from typing import Any +from uuid import uuid4 from pilot.util.constants import ( SUCCESS, @@ -44,7 +45,10 @@ ) from pilot.common.errorcodes import ErrorCodes from pilot.util.container import execute -from pilot.util.filehandling import dump +from pilot.util.filehandling import ( + dump, + grep +) zero_depth_bases = (str, bytes, Number, range, bytearray) iteritems = 'items' @@ -444,7 +448,7 @@ def get_memory_usage(pid: int) -> (int, str, str): return execute(f'ps aux -q {pid}', timeout=60) -def extract_memory_usage_value(output: str) -> int: +def extract_memory_usage_value(output: str) -> str: """ Extract the memory usage value from the ps output (in kB). @@ -482,34 +486,38 @@ def cut_output(txt: str, cutat: int = 1024, separator: str = '\n[...]\n') -> str return txt -def has_instruction_set(instruction_set: str) -> bool: +def has_instruction_sets(instruction_sets: list) -> str: """ Determine whether a given CPU instruction set is available. The function will use grep to search in /proc/cpuinfo (both in upper and lower case). + Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2" - :param instruction_set: instruction set (e.g. AVX2) (str) - :return: True if given instruction set is available, False otherwise (bool). + :param instruction_sets: instruction set (e.g. AVX2) (list) + :return: string of pipe-separated instruction sets (str). """ - status = False - cmd = fr"grep -o \'{instruction_set.lower()}[^ ]*\|{instruction_set.upper()}[^ ]*\' /proc/cpuinfo" - exit_code, stdout, stderr = execute(cmd) - if not exit_code and not stderr: - if instruction_set.lower() in stdout.split() or instruction_set.upper() in stdout.split(): - status = True + ret = "" - return status + for instr in instruction_sets: + pattern = re.compile(fr'{instr.lower()}[^ ]*', re.IGNORECASE) + out = grep(patterns=[pattern], file_name="/proc/cpuinfo") + for stdout in out: + if instr.upper() not in ret and (instr.lower() in stdout.split() or instr.upper() in stdout.split()): + ret += f'|{instr.upper()}' if ret else instr.upper() -def has_instruction_sets(instruction_sets: str) -> bool: + return ret + + +def has_instruction_sets_old(instruction_sets: list) -> str: """ Determine whether a given list of CPU instruction sets is available. The function will use grep to search in /proc/cpuinfo (both in upper and lower case). Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2" - :param instruction_sets: instruction set (e.g. AVX2) (str) - :return: True if given instruction set is available, False otherwise (bool). + :param instruction_sets: instruction set (e.g. AVX2) (list) + :return: string of pipe-separated instruction sets (str). """ ret = "" pattern = "" @@ -709,8 +717,8 @@ def get_host_name(): else: try: host = socket.gethostname() - except socket.herror as exc: - logger.warning(f'failed to get host name: {exc}') + except socket.herror as e: + logger.warning(f'failed to get host name: {e}') host = 'localhost' return host.split('.')[0] @@ -821,3 +829,12 @@ def is_kubernetes_resource() -> bool: return True else: return False + + +def uuidgen_t() -> str: + """ + Generate a UUID string in the same format as "uuidgen -t". + + :return: A UUID in the format "00000000-0000-0000-0000-000000000000" (str). + """ + return str(uuid4()) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 98d66af2..e90371e1 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '17' # build number should be reset to '1' for every new development cycle +REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '13' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/container.py b/pilot/util/container.py index 74c109b9..e5837d14 100644 --- a/pilot/util/container.py +++ b/pilot/util/container.py @@ -21,6 +21,7 @@ """Functions for executing commands.""" +import errno import os import subprocess import logging @@ -113,12 +114,17 @@ def read_output(stream, queue): while True: try: line = stream.readline() - except AttributeError: - # Handle the case where stream is None - break - - if not line: + if not line: + break + except (AttributeError, ValueError): + # Handle the case where stream is None (AttributeError) or closed (ValueError) break + except OSError as e: + if e.errno == errno.EBADF: + # Handle the case where the file descriptor is bad + break + else: + raise queue.put(line) @@ -146,8 +152,11 @@ def read_output(stream, queue): exit_code = process.poll() # Wait for the threads to finish reading - stdout_thread.join() - stderr_thread.join() + try: + stdout_thread.join() + stderr_thread.join() + except Exception as e: + logger.warning(f'exception caught in execute: {e}') # Read the remaining output from the queues while not stdout_queue.empty(): @@ -263,12 +272,15 @@ def read_output(pipe, output_list): return exit_code, stdout, stderr -def execute_old(executable: Any, **kwargs: dict) -> Any: +def execute_nothreads(executable: Any, **kwargs: dict) -> Any: """ Execute the command with its options in the provided executable list using subprocess time-out handler. The function also determines whether the command should be executed within a container. + This variant of execute() is not using threads to read stdout and stderr. This is required for some use-cases like + executing arcproxy where the stdout is time-ordered. + :param executable: command to be executed (str or list) :param kwargs: kwargs (dict) :return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument). diff --git a/pilot/util/filehandling.py b/pilot/util/filehandling.py index b4d63b48..316cbed0 100644 --- a/pilot/util/filehandling.py +++ b/pilot/util/filehandling.py @@ -252,6 +252,32 @@ def grep(patterns: list, file_name: str) -> list: """ Search for the patterns in the given list in a file. + Example: + grep(["St9bad_alloc", "FATAL"], "athena_stdout.txt") + -> [list containing the lines below] + CaloTrkMuIdAlg2.sysExecute() ERROR St9bad_alloc + AthAlgSeq.sysExecute() FATAL Standard std::exception is caught + + :param patterns: list of regexp patterns (list) + :param file_name: file name (str) + :return: list of matched lines in file (list). + """ + matched_lines = [] + compiled_patterns = [re.compile(pattern) for pattern in patterns] + + with open(file_name, 'r', encoding='utf-8') as _file: + matched_lines = [ + line for line in _file + if any(compiled_pattern.search(line) for compiled_pattern in compiled_patterns) + ] + + return matched_lines + + +def grep_old(patterns: list, file_name: str) -> list: + """ + Search for the patterns in the given list in a file. + Example: grep(["St9bad_alloc", "FATAL"], "athena_stdout.txt") -> [list containing the lines below] diff --git a/pilot/util/monitoring.py b/pilot/util/monitoring.py index 3cc621bd..9b7e1d14 100644 --- a/pilot/util/monitoring.py +++ b/pilot/util/monitoring.py @@ -30,7 +30,7 @@ from signal import SIGKILL from pilot.common.errorcodes import ErrorCodes -from pilot.common.exception import PilotException, MiddlewareImportFailure +from pilot.common.exception import PilotException, MiddlewareImportFailure #, FileHandlingFailure from pilot.util.auxiliary import set_pilot_state #, show_memory_usage from pilot.util.config import config from pilot.util.constants import PILOT_PRE_PAYLOAD @@ -40,7 +40,8 @@ remove_files, get_local_file_size, read_file, - zip_files + zip_files, + #write_file ) from pilot.util.loopingjob import looping_job from pilot.util.math import ( @@ -135,7 +136,7 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i if exit_code != 0: return exit_code, diagnostics - # display OOM process info + # display OOM process info (once) display_oom_info(job.pid) # should the pilot abort the payload? @@ -204,20 +205,31 @@ def display_oom_info(payload_pid): :param payload_pid: payload pid (int). """ - + #fname = f"/proc/{payload_pid}/oom_score_adj" payload_score = get_score(payload_pid) if payload_pid else 'UNKNOWN' pilot_score = get_score(os.getpid()) - logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score}') + if isinstance(pilot_score, str) and pilot_score == 'UNKNOWN': + logger.warning(f'could not get oom_score for pilot process: {pilot_score}') + else: + #relative_payload_score = "1" + + # write the payload oom_score to the oom_score_adj file + #try: + # write_file(path=fname, contents=relative_payload_score) + #except Exception as e: # FileHandlingFailure + # logger.warning(f'could not write oom_score to file: {e}') + #logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score} (attempted writing relative score 1 to {fname})') + logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score}') -def get_score(pid): + +def get_score(pid) -> str: """ Get the OOM process score. - :param pid: process id (int). - :return: score (string). + :param pid: process id (int) + :return: score (str). """ - try: score = '%s' % read_file('/proc/%d/oom_score' % pid) except Exception as error: diff --git a/pilot/util/proxy.py b/pilot/util/proxy.py index 685d8991..b78196e8 100644 --- a/pilot/util/proxy.py +++ b/pilot/util/proxy.py @@ -26,7 +26,10 @@ from pilot.common.exception import FileHandlingFailure from pilot.util import https from pilot.util.config import config -from pilot.util.container import execute +from pilot.util.container import ( + execute, + execute_nothreads +) from pilot.util.filehandling import write_file logger = logging.getLogger(__name__) @@ -78,7 +81,7 @@ def vomsproxyinfo(options: str = '-all', mute: bool = False, path: str = '') -> executable = f'voms-proxy-info {options}' if path: executable += f' --file={path}' - exit_code, stdout, stderr = execute(executable) + exit_code, stdout, stderr = execute_nothreads(executable) if not mute: logger.info(stdout + stderr) diff --git a/pilot/util/tracereport.py b/pilot/util/tracereport.py index 9c73c6be..d6b25ca5 100644 --- a/pilot/util/tracereport.py +++ b/pilot/util/tracereport.py @@ -18,21 +18,25 @@ # Authors: # - Alexey Anisenkov, alexey.anisenkov@cern.ch, 2017 # - Pavlo Svirin, pavlo.svirin@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24 import hashlib import os import socket import time + from sys import exc_info from json import dumps, loads from os import environ, getuid from pilot.common.exception import FileHandlingFailure -from pilot.util.auxiliary import correct_none_types +from pilot.util.auxiliary import ( + correct_none_types, + uuidgen_t +) from pilot.util.config import config from pilot.util.constants import get_pilot_version, get_rucio_client_version -from pilot.util.container import execute, execute2 +from pilot.util.container import execute2 from pilot.util.filehandling import append_to_file, write_file from pilot.util.https import request2 @@ -130,10 +134,8 @@ def init(self, job): s = 'ppilot_%s' % job.jobdefinitionid self['uuid'] = hashlib.md5(s.encode('utf-8')).hexdigest() # hash_pilotid, Python 2/3 else: - #self['uuid'] = commands.getoutput('uuidgen -t 2> /dev/null').replace('-', '') # all LFNs of one request have the same uuid - cmd = 'uuidgen -t 2> /dev/null' - exit_code, stdout, stderr = execute(cmd, timeout=10) - self['uuid'] = stdout.replace('-', '') + _uuid = uuidgen_t() # 'uuidgen -t 2> /dev/null' + self['uuid'] = _uuid.replace('-', '') def get_value(self, key): """