diff --git a/PILOTVERSION b/PILOTVERSION index 2df33a99..c214fbc5 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.7.7.3 \ No newline at end of file +3.7.8.21 \ No newline at end of file diff --git a/pilot.py b/pilot.py index 3bb549ac..79facd91 100755 --- a/pilot.py +++ b/pilot.py @@ -31,6 +31,7 @@ import time from os import getcwd, chdir, environ from os.path import exists, join +from re import match from shutil import rmtree from typing import Any @@ -243,6 +244,25 @@ def str2bool(var: str) -> bool: return ret +def validate_resource_type(value: str) -> str: + """ + Validate the resource type. + + :param value: resource type (str) + :return: resource type (str) + :raises: argparse.ArgumentTypeError if the resource type is invalid. + """ + # Define the allowed patterns + allowed_patterns = ["SCORE", "MCORE", "SCORE_*", "MCORE_*"] + if value in allowed_patterns: + return value + # Check for pattern matching + for pattern in allowed_patterns: + if pattern.endswith('*') and match(f"^{pattern[:-1]}", value): + return value + raise argparse.ArgumentTypeError(f"Invalid resource type: {value}") + + def get_args() -> Any: """ Return the args from the arg parser. @@ -548,9 +568,8 @@ def get_args() -> Any: "--resource-type", dest="resource_type", default="", - type=str, - choices=["SCORE", "MCORE", "SCORE_HIMEM", "MCORE_HIMEM"], - help="Resource type; MCORE, SCORE, SCORE_HIMEM or MCORE_HIMEM", + type=validate_resource_type, + help="Resource type; MCORE, SCORE or patterns like SCORE_* and MCORE_*", ) arg_parser.add_argument( "--use-https", diff --git a/pilot/control/job.py b/pilot/control/job.py index c996fad5..952ee7b3 100644 --- a/pilot/control/job.py +++ b/pilot/control/job.py @@ -788,9 +788,14 @@ def get_data_structure(job: Any, state: str, args: Any, xml: str = "", metadata: extra = {'readbyterate': readfrac} else: logger.debug('read_bytes info not yet available') + # extract and remove any GPU info from data since it will be reported with job metrics add_gpu_info(data, extra) + # add the lsetup time if set + if job.lsetuptime: + extra['lsetup_time'] = job.lsetuptime + job_metrics = get_job_metrics(job, extra=extra) if job_metrics: data['jobMetrics'] = job_metrics diff --git a/pilot/eventservice/esprocess/esprocessfinegrainedproc.py b/pilot/eventservice/esprocess/esprocessfinegrainedproc.py index 1fdd47bf..80592099 100644 --- a/pilot/eventservice/esprocess/esprocessfinegrainedproc.py +++ b/pilot/eventservice/esprocess/esprocessfinegrainedproc.py @@ -19,44 +19,38 @@ # - Wen Guan, wen.guan@cern.ch, 2023 # - Paul Nilsson, paul.nilsson@cern.ch, 2024 +"""Class with functions for finegrained ES processing.""" + import io import logging import os import time import threading import traceback +from typing import Any, IO from pilot.common.exception import PilotException, MessageFailure, SetupFailure, RunPayloadFailure, UnknownException - logger = logging.getLogger(__name__) -""" -Main process to handle event service. -It makes use of two hooks get_event_ranges_hook and handle_out_message_hook to communicate with other processes when -it's running. The process will handle the logic of Event service independently. -""" - class ESProcessFineGrainedProc(threading.Thread): """ Main EventService Process. """ - def __init__(self, payload, waiting_time=30 * 60): + def __init__(self, payload: dict, waiting_time: int = 30 * 60): """ Init ESProcessFineGrainedProc. :param payload: a dict of {'executable': , 'output_file': , 'error_file': } + :param waiting_time: waiting time in seconds for the process to finish (int). """ threading.Thread.__init__(self, name='esprocessFineGrainedProc') self.__payload = payload - self.__process = None - self.get_event_ranges_hook = None self.handle_out_message_hook = None - self.__monitor_log_time = None self.is_no_more_events = False self.__no_more_event_time = None @@ -65,68 +59,78 @@ def __init__(self, payload, waiting_time=30 * 60): self.__stop_time = 180 self.pid = None self.__is_payload_started = False - self.__ret_code = None self.setName("ESProcessFineGrainedProc") self.corecount = 1 - self.event_ranges_cache = [] - def is_payload_started(self): + def is_payload_started(self) -> bool: + """ + Return boolean to indicate whether the payload has started. + + :return: is payload started? (bool). + """ return self.__is_payload_started - def stop(self, delay=1800): + def stop(self, delay: int = 1800): + """Set stop event.""" if not self.__stop.is_set(): self.__stop.set() self.__stop_set_time = time.time() self.__stop_delay = delay - def get_job_id(self): + def get_job_id(self) -> str: + """ + Return job id. + + :return: job id (str). + """ if 'job' in self.__payload and self.__payload['job'] and self.__payload['job'].jobid: return self.__payload['job'].jobid + return '' - def get_corecount(self): + def get_corecount(self) -> int: + """ + Return core count. + + :return: core count (int). + """ if 'job' in self.__payload and self.__payload['job'] and self.__payload['job'].corecount: core_count = int(self.__payload['job'].corecount) return core_count + return 1 - def get_file(self, workdir, file_label='output_file', file_name='ES_payload_output.txt'): + def get_file(self, workdir: str, file_label: str = 'output_file', file_name: str = 'ES_payload_output.txt') -> IO[str]: """ Return the requested file. - :param file_label: - :param workdir: - :return: + :param workdir: work directory (str) + :param file_label: file label (str) + :param file_name: file name (str) + :return: file descriptor (IO[str]). """ - - try: - file_type = file # Python 2 - except NameError: - file_type = io.IOBase # Python 3 + file_type = io.IOBase if file_label in self.__payload: if isinstance(self.__payload[file_label], file_type): _file_fd = self.__payload[file_label] else: _file = self.__payload[file_label] if '/' in self.__payload[file_label] else os.path.join(workdir, self.__payload[file_label]) - _file_fd = open(_file, 'w') + _file_fd = open(_file, 'w', encoding='utf-8') else: - _file = os.path.join(workdir, file_name) - _file_fd = open(_file, 'w') + _file_fd = open(os.path.join(workdir, file_name), 'w', encoding='utf-8') return _file_fd - def get_workdir(self): + def get_workdir(self) -> str: """ Return the workdir. - If the workdir is set but is not a directory, return None. - :return: workdir (string or None). + :return: workdir (str) :raises SetupFailure: in case workdir is not a directory. """ - workdir = '' if 'workdir' in self.__payload: workdir = self.__payload['workdir'] @@ -134,60 +138,60 @@ def get_workdir(self): os.makedirs(workdir) elif not os.path.isdir(workdir): raise SetupFailure('workdir exists but is not a directory') + return workdir - def get_executable(self, workdir): + def get_executable(self, workdir: str) -> str: """ Return the executable string. - :param workdir: work directory (string). - :return: executable (string). + :param workdir: work directory (str) + :return: executable (str). """ - executable = self.__payload['executable'] - executable = self.get_payload_executable(executable) - return 'cd %s; %s' % (workdir, executable) + # self.get_payload_executable() is not defined in this class + # executable = self.get_payload_executable(self.__payload['executable']) + executable = 'undefined' + return f'cd {workdir}; {executable}' - def set_get_event_ranges_hook(self, hook): + def set_get_event_ranges_hook(self, hook: Any): """ - set get_event_ranges hook. + Set get_event_ranges hook. - :param hook: a hook method to get event ranges. + :param hook: a hook method to get event ranges (Any). """ - self.get_event_ranges_hook = hook - def get_get_event_ranges_hook(self): + def get_get_event_ranges_hook(self) -> Any: """ - get get_event_ranges hook. + Return get_event_ranges hook. - :returns: The hook method to get event ranges. + :returns: The hook method to get event ranges (Any). """ - return self.get_event_ranges_hook - def set_handle_out_message_hook(self, hook): + def set_handle_out_message_hook(self, hook: Any): """ - set handle_out_message hook. + Set handle_out_message hook. - :param hook: a hook method to handle payload output and error messages. + :param hook: a hook method to handle payload output and error messages (Any). """ - self.handle_out_message_hook = hook - def get_handle_out_message_hook(self): + def get_handle_out_message_hook(self) -> Any: """ - get handle_out_message hook. + Return handle_out_message hook. - :returns: The hook method to handle payload output and error messages. + :returns: The hook method to handle payload output and error messages (Any). """ - return self.handle_out_message_hook def init(self): """ - initialize message thread and payload process. - """ + Initialize message thread and payload process. + (Incomplete implementation). + :raises: Execption: when an Exception is caught. + """ try: pass except Exception as e: @@ -200,121 +204,119 @@ def monitor(self): """ Monitor whether a process is dead. - raises: MessageFailure: when the message thread is dead or exited. - RunPayloadFailure: when the payload process is dead or exited. + (Incomplete implementation). """ - pass + return - def has_running_children(self): + def has_running_children(self) -> bool: """ Check whether it has running children - :return: True if there are alive children, otherwise False + (Incomplete implementation). + + :return: True if there are alive children, otherwise False (bool). """ return False - def is_payload_running(self): + def is_payload_running(self) -> bool: """ Check whether the payload is still running - :return: True if the payload is running, otherwise False + (Incomplete implementation). + + :return: True if the payload is running, otherwise False (bool). """ return False - def get_event_ranges(self, num_ranges=None, queue_factor=1): + def get_event_ranges(self, num_ranges: int = None, queue_factor: int = 1): """ Calling get_event_ranges hook to get event ranges. - :param num_ranges: number of event ranges to get. - + :param num_ranges: number of event ranges to get (int) + :param queue_factor: queue factor (int) :raises: SetupFailure: If get_event_ranges_hook is not set. MessageFailure: when failed to get event ranges. """ if not num_ranges: num_ranges = self.corecount - logger.debug('getting event ranges(num_ranges=%s)' % num_ranges) + logger.debug(f'getting event ranges(num_ranges={num_ranges})') if not self.get_event_ranges_hook: raise SetupFailure("get_event_ranges_hook is not set") try: - logger.debug('calling get_event_ranges hook(%s) to get event ranges.' % self.get_event_ranges_hook) + logger.debug(f'calling get_event_ranges hook({self.get_event_ranges_hook}) to get event ranges.') event_ranges = self.get_event_ranges_hook(num_ranges, queue_factor=queue_factor) - logger.debug('got event ranges: %s' % event_ranges) + logger.debug(f'got event ranges: {event_ranges}') return event_ranges except Exception as e: - raise MessageFailure("Failed to get event ranges: %s" % e) + raise MessageFailure(f"Failed to get event ranges: {e}") from e - def parse_out_message(self, message): + def parse_out_message(self, message: Any) -> Any: """ Parse output or error messages from payload. - :param message: The message string received from payload. - - :returns: a dict {'id': , 'status': , 'output': , 'cpu': , 'wall': , 'message': } + :param message: The message string received from payload (Any) + :returns: a dict {'id': , 'status': , 'output': , 'cpu': , 'wall': , 'message': } (Any) :raises: PilotExecption: when a PilotException is caught. UnknownException: when other unknown exception is caught. """ - - logger.debug('parsing message: %s' % message) + logger.debug(f'parsing message: {message}') return message - def handle_out_message(self, message): + def handle_out_message(self, message: Any): """ Handle output or error messages from payload. Messages from payload will be parsed and the handle_out_message hook is called. - :param message: The message string received from payload. - + :param message: The message string received from payload (Any) :raises: SetupFailure: when handle_out_message_hook is not set. RunPayloadFailure: when failed to handle an output or error message. """ - - logger.debug('handling out message: %s' % message) + logger.debug(f'handling out message: {message}') if not self.handle_out_message_hook: raise SetupFailure("handle_out_message_hook is not set") try: message_status = self.parse_out_message(message) - logger.debug('parsed out message: %s' % message_status) - logger.debug('calling handle_out_message hook(%s) to handle parsed message.' % self.handle_out_message_hook) + logger.debug(f'parsed out message: {message_status}') + logger.debug(f'calling handle_out_message hook({self.handle_out_message_hook}) to handle parsed message.') self.handle_out_message_hook(message_status) except Exception as e: - raise RunPayloadFailure("Failed to handle out message: %s" % e) + raise RunPayloadFailure(f"Failed to handle out message: {e}") from e - def poll(self): + def poll(self) -> Any: """ poll whether the process is still running. - :returns: None: still running. - 0: finished successfully. - others: failed. + :returns: None: still running + 0: finished successfully + others: failed (Any). """ return self.__ret_code - def terminate(self, time_to_wait=1): + def terminate(self, time_to_wait: int = 1): """ Terminate running threads and processes. - :param time_to_wait: integer, seconds to wait to force kill the payload process. - + :param time_to_wait: integer, seconds to wait to force kill the payload process (int) :raises: PilotExecption: when a PilotException is caught. UnknownException: when other unknown exception is caught. """ - logger.info('terminate running threads and processes.') + logger.info('terminate running threads and processes') + if time_to_wait: + pass # to get rid of pylint warning try: self.stop() except Exception as e: - logger.error('Exception caught when terminating ESProcessFineGrainedProc: %s' % e) + logger.error(f'Exception caught when terminating ESProcessFineGrainedProc: {e}') self.__ret_code = -1 - raise UnknownException(e) + raise UnknownException(e) from e def kill(self): """ Terminate running threads and processes. - :param time_to_wait: integer, seconds to wait to force kill the payload process. - :raises: PilotException: when a PilotException is caught. UnknownException: when other unknown exception is caught. """ @@ -322,25 +324,24 @@ def kill(self): try: self.stop() except Exception as e: - logger.error('Exception caught when terminating ESProcessFineGrainedProc: %s' % e) - raise UnknownException(e) + logger.error(f'Exception caught when terminating ESProcessFineGrainedProc: {e}') + raise UnknownException(e) from e def clean(self): - """ - Clean left resources - """ + """Clean left resources""" self.stop() def run(self): """ - Main run loops: monitor message thread and payload process. - handle messages from payload and response messages with injecting new event ranges or process outputs. + Main run loops. + + 1. monitor message thread and payload process. + 2. handle messages from payload and response messages with injecting new event ranges or process outputs. :raises: PilotExecption: when a PilotException is caught. UnknownException: when other unknown exception is caught. """ - - logger.info('start esprocess with thread ident: %s' % (self.ident)) + logger.info(f'start esprocess with thread ident: {self.ident}') logger.debug('initializing') self.init() logger.debug('initialization finished.') @@ -351,11 +352,11 @@ def run(self): self.monitor() time.sleep(0.01) except PilotException as e: - logger.error('PilotException caught in the main loop: %s, %s' % (e.get_detail(), traceback.format_exc())) + logger.error(f'PilotException caught in the main loop: {e.get_detail()}, {traceback.format_exc()}') # TODO: define output message exception. If caught 3 output message exception, terminate self.stop() except Exception as e: - logger.error('Exception caught in the main loop: %s, %s' % (e, traceback.format_exc())) + logger.error(f'exception caught in the main loop: {e}, {traceback.format_exc()}') # TODO: catch and raise exceptions # if catching dead process exception, terminate. self.stop() diff --git a/pilot/eventservice/workexecutor/plugins/baseexecutor.py b/pilot/eventservice/workexecutor/plugins/baseexecutor.py index 892d9635..15e26a90 100644 --- a/pilot/eventservice/workexecutor/plugins/baseexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/baseexecutor.py @@ -18,15 +18,16 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2019-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2019-24 +import logging import os import threading from pilot.common.pluginfactory import PluginFactory from pilot.control.job import create_job from pilot.eventservice.communicationmanager.communicationmanager import CommunicationManager -import logging + logger = logging.getLogger(__name__) """ @@ -36,114 +37,182 @@ class BaseExecutor(threading.Thread, PluginFactory): - def __init__(self, **kwargs): - super(BaseExecutor, self).__init__() - self.setName("BaseExecutor") + def __init__(self, **kwargs: dict): + """ + Init function for BaseExecutor. + + :param kwargs: keyword arguments (dict). + """ + super().__init__() + self.name = "BaseExecutor" self.queue = None self.payload = None - self.args = None - for key in kwargs: - setattr(self, key, kwargs[key]) - + for key, value in kwargs.items(): + setattr(self, key, value) self.__stop = threading.Event() - self.__event_ranges = [] self.__is_set_payload = False self.__is_retrieve_payload = False - self.communication_manager = None - self.proc = None - self.current_dir = os.getcwd() - def get_pid(self): + def get_pid(self) -> int or None: + """ + Return the process ID. + + :return: process ID (int or None). + """ return self.proc.pid if self.proc else None def __del__(self): + """ + Destructor for the BaseExecutor class. + + This method is called when the BaseExecutor object is about to be destroyed. + It ensures that the stop method is called and the communication manager is stopped, if it exists. + """ self.stop() if self.communication_manager: self.communication_manager.stop() - def is_payload_started(self): + def is_payload_started(self) -> bool: + """ + Return a boolean indicating whether the payload has started. + + (Not properly implemented). + + :return: True if the payload has started, False otherwise (bool). + """ return False def start(self): - super(BaseExecutor, self).start() + """Start the BaseExecutor.""" + super().start() self.communication_manager = CommunicationManager() self.communication_manager.start() def stop(self): + """Stop the BaseExecutor.""" if not self.is_stop(): self.__stop.set() if self.communication_manager: self.communication_manager.stop() + + logger.info(f"changing current dir from {os.getcwd()} to {self.current_dir}") os.chdir(self.current_dir) - logger.info("change current dir from %s to %s" % (os.getcwd(), self.current_dir)) def is_stop(self): + """ + Return a boolean indicating whether the BaseExecutor should be stopped. + + :return: True if the BaseExecutor should be stopped, False otherwise (bool). + """ return self.__stop.is_set() def stop_communicator(self): + """Stop the communication manager.""" logger.info("Stopping communication manager") if self.communication_manager: while self.communication_manager.is_alive(): if not self.communication_manager.is_stop(): self.communication_manager.stop() + logger.info("Communication manager stopped") - def set_payload(self, payload): + def set_payload(self, payload: dict): + """ + Set the payload. + + :param payload: payload (dict). + """ self.payload = payload self.__is_set_payload = True job = self.get_job() if job and job.workdir: - current_dir = os.getcwd() + logger.info("changing current dir from {os.getcwd()} to {job.workdir}") os.chdir(job.workdir) - logger.info("change current dir from %s to %s" % (current_dir, job.workdir)) - def is_set_payload(self): + def is_set_payload(self) -> bool: + """ + Return a boolean indicating whether the payload has been set. + + :return: True if the payload has been set, False otherwise (bool). + """ return self.__is_set_payload def set_retrieve_payload(self): + """Set the retrieve payload flag.""" self.__is_retrieve_payload = True - def is_retrieve_payload(self): + def is_retrieve_payload(self) -> bool: + """ + Return the retrieve payload flag. + + :return: True if the retrieve payload flag is set, False otherwise (bool). + """ return self.__is_retrieve_payload - def retrieve_payload(self): - logger.info("Retrieving payload: %s" % self.args) + def retrieve_payload(self) -> dict or None: + """ + Retrieve the payload. + + :return: payload (dict) or None. + """ + logger.info(f"retrieving payload: {self.args}") jobs = self.communication_manager.get_jobs(njobs=1, args=self.args) - logger.info("Received jobs: %s" % jobs) + logger.info(f"received jobs: {jobs}") if jobs: job = create_job(jobs[0], queuename=self.queue) # get the payload command from the user specific code pilot_user = os.environ.get('PILOT_USER', 'atlas').lower() - user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) # Python 2/3 + user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) cmd = user.get_payload_command(job) - logger.info("payload execution command: %s" % cmd) + logger.info(f"payload execution command: {cmd}") payload = {'executable': cmd, 'workdir': job.workdir, 'job': job} - logger.info("Retrieved payload: %s" % payload) + logger.info(f"retrieved payload: {payload}") return payload + return None - def get_payload(self): + def get_payload(self) -> dict or None: + """ + Return the payload. + + :return: payload (dict or None). + """ if self.__is_set_payload: return self.payload + return None + def get_job(self): + """ + Return the job. + + :return: job (dict or None). + """ return self.payload['job'] if self.payload and 'job' in list(self.payload.keys()) else None # Python 2/3 - def get_event_ranges(self, num_event_ranges=1, queue_factor=2): + def get_event_ranges(self, num_event_ranges: int = 1, queue_factor: int = 2) -> list: + """ + Get event ranges from the communication manager. + + :param num_event_ranges: number of event ranges (int) + :param queue_factor: queue factor (int) + :return: event ranges (list). + """ if os.environ.get('PILOT_ES_EXECUTOR_TYPE', 'generic') == 'raythena': old_queue_factor = queue_factor queue_factor = 1 - logger.info("raythena - Changing queue_factor from %s to %s" % (old_queue_factor, queue_factor)) - logger.info("Getting event ranges: (num_ranges: %s) (queue_factor: %s)" % (num_event_ranges, queue_factor)) + logger.info(f"raythena - Changing queue_factor from {old_queue_factor} to {queue_factor}") + + logger.info(f"getting event ranges: (num_ranges: {num_event_ranges}) (queue_factor: {queue_factor})") if len(self.__event_ranges) < num_event_ranges: ret = self.communication_manager.get_event_ranges(num_event_ranges=num_event_ranges * queue_factor, job=self.get_job()) for event_range in ret: @@ -154,23 +223,36 @@ def get_event_ranges(self, num_event_ranges=1, queue_factor=2): if len(self.__event_ranges) > 0: event_range = self.__event_ranges.pop(0) ret.append(event_range) - logger.info("Received event ranges(num:%s): %s" % (len(ret), ret)) + logger.info(f"received event ranges(num:{len(ret)}): {ret}") + return ret - def update_events(self, messages): - logger.info("Updating event ranges: %s" % messages) + def update_events(self, messages: list) -> bool: + """ + Update event ranges. + + :param messages: messages (list) + :return: True if the event ranges were updated successfully, False otherwise (bool). + """ + logger.info(f"updating event ranges: {messages}") ret = self.communication_manager.update_events(messages) - logger.info("Updated event ranges status: %s" % ret) + logger.info(f"updated event ranges status: {ret}") + return ret - def update_jobs(self, jobs): - logger.info("Updating jobs: %s" % jobs) + def update_jobs(self, jobs: list) -> bool: + """ + Update jobs. + + :param jobs: jobs (list) + :return: True if the jobs were updated successfully, False otherwise (bool). + """ + logger.info(f"updating jobs: {jobs}") ret = self.communication_manager.update_jobs(jobs) - logger.info("Updated jobs status: %s" % ret) + logger.info(f"updated jobs status: {ret}") + return ret def run(self): - """ - Main run process - """ + """Main run process.""" raise NotImplementedError() diff --git a/pilot/eventservice/workexecutor/plugins/finegrainedprocexecutor.py b/pilot/eventservice/workexecutor/plugins/finegrainedprocexecutor.py index b78b904a..401d1aae 100644 --- a/pilot/eventservice/workexecutor/plugins/finegrainedprocexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/finegrainedprocexecutor.py @@ -17,20 +17,20 @@ # under the License. # # Authors: -# - Wen Guan, wen.guan@cern.ch, 2023 - 2024 +# - Wen Guan, wen.guan@cern.ch, 2023-24 +# - Paul Nilsson, paul.nilsson@cern.ch, 2024 import json +import logging import os import time import traceback +from typing import Any from pilot.common.errorcodes import ErrorCodes - from .baseexecutor import BaseExecutor -import logging logger = logging.getLogger(__name__) - errors = ErrorCodes() """ @@ -39,44 +39,61 @@ class FineGrainedProcExecutor(BaseExecutor): - def __init__(self, **kwargs): - super(FineGrainedProcExecutor, self).__init__(**kwargs) - self.setName("FineGrainedProcExecutor") + def __init__(self, **kwargs: dict): + """ + Init function for FineGrainedProcExecutor. + :param kwargs: keyword arguments (dict). + """ + super().__init__(**kwargs) + self.name = "FineGrainedProcExecutor" self.__queued_out_messages = [] self.__stageout_failures = 0 self.__max_allowed_stageout_failures = 20 self.__last_stageout_time = None self.__all_out_messages = [] - self.proc = None self.exit_code = None - def is_payload_started(self): + def is_payload_started(self) -> bool: + """ + Check if payload is started. + + :return: True if payload is started, False otherwise. + """ return self.proc.is_payload_started() if self.proc else False - def get_pid(self): + def get_pid(self) -> int or None: + """ + Return the process ID. + + :return: process ID (int or None). + """ return self.proc.pid if self.proc else None - def get_exit_code(self): + def get_exit_code(self) -> int or None: + """ + Get exit code of the process. + + :return: exit code of the process (int or None). + """ return self.exit_code - def update_finished_event_ranges(self, out_messagess, output_file, fsize, checksum, storage_id): + def update_finished_event_ranges(self, out_messages: list, output_file: str, fsize: int, checksum: str, storage_id: Any): """ Update finished event ranges - :param out_messages: messages from AthenaMP. - :param output_file: output file name. - :param fsize: file size. - :param adler32: checksum (adler32) of the file. - :param storage_id: the id of the storage. + :param out_messages: messages from AthenaMP (list) + :param output_file: output file name (str) + :param fsize: file size (int) + :param checksum: checksum (adler32) of the file (str) + :param storage_id: the id of the storage (Any). """ - - if len(out_messagess) == 0: + if len(out_messages) == 0: return event_ranges = [] - for out_msg in out_messagess: + for out_msg in out_messages: event_ranges.append({"eventRangeID": out_msg['id'], "eventStatus": 'finished'}) event_range_status = {"zipFile": {"numEvents": len(event_ranges), "objstoreID": storage_id, @@ -92,46 +109,44 @@ def update_finished_event_ranges(self, out_messagess, output_file, fsize, checks job = self.get_job() job.nevents += len(event_ranges) - def update_failed_event_ranges(self, out_messagess): + def update_failed_event_ranges(self, out_messages: list): """ - Update failed event ranges + Update failed event ranges. - :param out_messages: messages from AthenaMP. + :param out_messages: messages from AthenaMP (list). """ - - if len(out_messagess) == 0: + if len(out_messages) == 0: return event_ranges = [] - for message in out_messagess: - status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed' + for message in out_messages: + status = message['status'] if message['status'] in {'failed', 'fatal'} else 'failed' # ToBeFixed errorCode event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status}) event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)} self.update_events(event_range_message) - def update_terminated_event_ranges(self, out_messagess): + def update_terminated_event_ranges(self, out_messages: list): """ Update terminated event ranges - :param out_messages: messages from AthenaMP. + :param out_messages: messages from AthenaMP (list). """ - - if len(out_messagess) == 0: + if len(out_messages) == 0: return event_ranges = [] finished_events = 0 - for message in out_messagess: - if message['status'] in ['failed', 'fatal', 'finished', 'running', 'transferring']: + for message in out_messages: + if message['status'] in {'failed', 'fatal', 'finished', 'running', 'transferring'}: status = message['status'] - if message['status'] in ['finished']: + if message['status'] in {'finished'}: finished_events += 1 else: - logger.warn("status is unknown for messages, set it running: %s" % str(message)) + logger.warning(f"status is unknown for messages, set it running: {message}") status = 'running' error_code = message.get("error_code", None) - if status in ["failed", "fatal"] and error_code is None: + if status in {"failed", "fatal"} and error_code is None: error_code = errors.UNKNOWNPAYLOADFAILURE error_diag = message.get("error_diag") @@ -143,30 +158,31 @@ def update_terminated_event_ranges(self, out_messagess): job = self.get_job() job.nevents += finished_events - def handle_out_message(self, message): + def handle_out_message(self, message: dict): """ Handle ES output or error messages hook function for tests. - :param message: a dict of parsed message. - For 'finished' event ranges, it's {'id': , 'status': 'finished', 'output': , 'cpu': , + Example of message: + For 'finished' event ranges, it's {'id': , 'status': 'finished', 'output': , 'cpu': , 'wall': , 'message': }. - Fro 'failed' event ranges, it's {'id': , 'status': 'failed', 'message': }. - """ + For 'failed' event ranges, it's {'id': , 'status': 'failed', 'message': }. + :param message: a dict of parsed message (dict). + """ logger.info(f"handling out message: {message}") - self.__all_out_messages.append(message) - self.__queued_out_messages.append(message) - def stageout_es(self, force=False): + def stageout_es(self, force: bool = False): """ Stage out event service outputs. + When pilot fails to stage out a file, the file will be added back to the queue for staging out next period. - """ + :param force: force to stage out (bool). + """ job = self.get_job() - if len(self.__queued_out_messages): + if self.__queued_out_messages: if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap): out_messages = [] @@ -178,18 +194,15 @@ def stageout_es(self, force=False): self.update_terminated_event_ranges(out_messages) def clean(self): - """ - Clean temp produced files - """ - + """Clean temp produced files.""" for msg in self.__all_out_messages: - if msg['status'] in ['failed', 'fatal']: + if msg['status'] in {'failed', 'fatal'}: pass elif 'output' in msg: try: logger.info(f"removing ES pre-merge file: {msg['output']}") os.remove(msg['output']) - except Exception as exc: + except OSError as exc: logger.error(f"failed to remove file({msg['output']}): {exc}") self.__queued_out_messages = [] self.__stageout_failures = 0 @@ -213,7 +226,7 @@ def get_esprocess_finegrainedproc(self, payload): proc = esprocessfinegrainedproc.ESProcessFineGrainedProc(payload) return proc except Exception as ex: - logger.warn("use specific ESProcessFineGrainedProc does not exist. Using the pilot.eventservice.esprocess.esprocessfinegrainedproc: " + str(ex)) + logger.warning(f"use specific ESProcessFineGrainedProc does not exist. Using the pilot.eventservice.esprocess.esprocessfinegrainedproc: {ex}") from pilot.eventservice.esprocess.esprocessfinegrainedproc import ESProcessFineGrainedProc proc = ESProcessFineGrainedProc(payload) return proc @@ -224,7 +237,7 @@ def run(self): """ try: - logger.info("starting ES FineGrainedProcExecutor with thread identifier: %s" % (self.ident)) + logger.info(f"starting ES FineGrainedProcExecutor with thread identifier: {self.ident}") if self.is_set_payload(): payload = self.get_payload() elif self.is_retrieve_payload(): @@ -275,7 +288,7 @@ def run(self): self.stageout_es(force=True) self.clean() self.exit_code = proc.poll() - logger.info("ESProcess exit_code: %s" % self.exit_code) + logger.info(f"ESProcess exit_code: {self.exit_code}") except Exception as exc: logger.error(f'execute payload failed: {exc}, {traceback.format_exc()}') diff --git a/pilot/eventservice/workexecutor/plugins/genericexecutor.py b/pilot/eventservice/workexecutor/plugins/genericexecutor.py index 17aa1528..f4981c49 100644 --- a/pilot/eventservice/workexecutor/plugins/genericexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/genericexecutor.py @@ -50,14 +50,14 @@ class GenericExecutor(BaseExecutor): """Generic executor class.""" - def __init__(self, **kwargs): + def __init__(self, **kwargs: dict): """ Initialize generic executor. :param kwargs: kwargs dictionary (dict). """ - super(GenericExecutor, self).__init__(**kwargs) - self.setName("GenericExecutor") + super().__init__(**kwargs) + self.name = "GenericExecutor" self.__queued_out_messages = [] self.__stageout_failures = 0 self.__max_allowed_stageout_failures = 20 @@ -90,7 +90,7 @@ def get_exit_code(self): """ return self.exit_code - def update_finished_event_ranges(self, out_messagess: Any, output_file: str, fsize: int, checksum: str, + def update_finished_event_ranges(self, out_messages: Any, output_file: str, fsize: int, checksum: str, storage_id: Any) -> None: """ Update finished event ranges. @@ -98,14 +98,14 @@ def update_finished_event_ranges(self, out_messagess: Any, output_file: str, fsi :param out_messages: messages from AthenaMP (Any) :param output_file: output file name (str) :param fsize: file size (int) - :param adler32: checksum (adler32) of the file (str) + :param checksum: checksum (adler32) of the file (str) :param storage_id: the id of the storage (Any). """ - if len(out_messagess) == 0: + if len(out_messages) == 0: return event_ranges = [] - for out_msg in out_messagess: + for out_msg in out_messages: event_ranges.append({"eventRangeID": out_msg['id'], "eventStatus": 'finished'}) event_range_status = {"zipFile": {"numEvents": len(event_ranges), "objstoreID": storage_id, @@ -132,13 +132,13 @@ def update_failed_event_ranges(self, out_messages: Any) -> None: event_ranges = [] for message in out_messages: - status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed' + status = message['status'] if message['status'] in {'failed', 'fatal'} else 'failed' # ToBeFixed errorCode event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status}) event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)} self.update_events(event_range_message) - def handle_out_message(self, message: Any): + def handle_out_message(self, message: dict): """ Handle ES output or error messages hook function for tests. @@ -147,13 +147,13 @@ def handle_out_message(self, message: Any): 'wall': , 'message': }. For 'failed' event ranges, it's {'id': , 'status': 'failed', 'message': }. - :param message: a dict of parsed message (Any). + :param message: a dict of parsed message (dict). """ logger.info(f"handling out message: {message}") self.__all_out_messages.append(message) - if message['status'] in ['failed', 'fatal']: + if message['status'] in {'failed', 'fatal'}: self.update_failed_event_ranges([message]) else: self.__queued_out_messages.append(message) @@ -213,7 +213,7 @@ def stageout_es_real(self, output_file: str) -> (str, Any, int, str): # noqa: C } file_spec = FileSpec(filetype='output', **file_data) xdata = [file_spec] - kwargs = dict(workdir=job.workdir, cwd=job.workdir, usecontainer=False, job=job) + kwargs = {'workdir': job.workdir, 'cwd': job.workdir, 'usecontainer': False, 'job': job} try_failover = False activity = ['es_events', 'pw'] ## FIX ME LATER: replace `pw` with `write_lan` once AGIS is updated (acopytools) @@ -234,7 +234,7 @@ def stageout_es_real(self, output_file: str) -> (str, Any, int, str): # noqa: C try: if _error: pass - except Exception as exc: + except NameError as exc: logger.error(f'found no error object - stage-out must have failed: {exc}') _error = StageOutFailure("stage-out failed") @@ -293,40 +293,40 @@ def stageout_es(self, force: bool = False): :param force: force to stage out (bool). """ job = self.get_job() - if len(self.__queued_out_messages): + if self.__queued_out_messages: if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap): - out_messagess, output_file = self.tarzip_output_es() - logger.info(f"tar/zip event ranges: {out_messagess}, output_file: {output_file}") + out_messages, output_file = self.tarzip_output_es() + logger.info(f"tar/zip event ranges: {out_messages}, output_file: {output_file}") - if out_messagess: + if out_messages: self.__last_stageout_time = time.time() try: logger.info(f"staging output file: {output_file}") storage, storage_id, fsize, checksum = self.stageout_es_real(output_file) logger.info(f"staged output file ({output_file}) to storage: {storage} storage_id: {storage_id}") - self.update_finished_event_ranges(out_messagess, output_file, fsize, checksum, storage_id) + self.update_finished_event_ranges(out_messages, output_file, fsize, checksum, storage_id) except Exception as exc: logger.error(f"failed to stage out file({output_file}): {exc}, {traceback.format_exc()}") if force: - self.update_failed_event_ranges(out_messagess) + self.update_failed_event_ranges(out_messages) else: logger.info("failed to stage out, adding messages back to the queued messages") - self.__queued_out_messages += out_messagess + self.__queued_out_messages += out_messages self.__stageout_failures += 1 def clean(self): """Clean temp produced files.""" for msg in self.__all_out_messages: - if msg['status'] in ['failed', 'fatal']: + if msg['status'] in {'failed', 'fatal'}: pass elif 'output' in msg: try: logger.info(f"removing ES pre-merge file: {msg['output']}") os.remove(msg['output']) - except Exception as exc: + except OSError as exc: logger.error(f"failed to remove file({msg['output']}): {exc}") self.__queued_out_messages = [] self.__stageout_failures = 0 diff --git a/pilot/eventservice/workexecutor/plugins/hpoexecutor.py b/pilot/eventservice/workexecutor/plugins/hpoexecutor.py index 534de1ae..f76442cb 100644 --- a/pilot/eventservice/workexecutor/plugins/hpoexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/hpoexecutor.py @@ -29,7 +29,6 @@ from typing import Any from pilot.common.errorcodes import ErrorCodes -from pilot.common.exception import FileHandlingFailure from pilot.eventservice.esprocess.esprocess import ESProcess from pilot.info.filespec import FileSpec from pilot.util.config import config @@ -49,8 +48,8 @@ def __init__(self, **kwargs): :param kwargs: kwargs dictionary (dict). """ - super(HPOExecutor, self).__init__(**kwargs) - self.setName("HPOExecutor") + super().__init__(**kwargs) + self.name = "HPOExecutor" self.__queued_out_messages = [] self.__last_stageout_time = None self.__all_out_messages = [] @@ -90,7 +89,7 @@ def create_file_spec(self, pfn: str) -> FileSpec: """ try: checksum = calculate_checksum(pfn, algorithm=config.File.checksum_type) - except (FileHandlingFailure, NotImplementedError, Exception) as exc: + except Exception as exc: logger.warning(f'caught exception: {exc}') checksum = '' # fail later filesize = os.path.getsize(pfn) @@ -138,7 +137,7 @@ def update_failed_event_ranges(self, out_messages: Any) -> None: event_ranges = [] for message in out_messages: - status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed' + status = message['status'] if message['status'] in {'failed', 'fatal'} else 'failed' # ToBeFixed errorCode event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status}) event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)} @@ -158,7 +157,7 @@ def handle_out_message(self, message: dict): self.__all_out_messages.append(message) - if message['status'] in ['failed', 'fatal']: + if message['status'] in {'failed', 'fatal'}: self.update_failed_event_ranges([message]) else: self.__queued_out_messages.append(message) @@ -170,7 +169,7 @@ def stageout_es(self, force: bool = False): :param force: force stage out (bool). """ job = self.get_job() - if len(self.__queued_out_messages): + if self.__queued_out_messages: if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap): out_messages = [] while len(self.__queued_out_messages) > 0: diff --git a/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py b/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py index 53fd52f1..1b33c8fc 100644 --- a/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py +++ b/pilot/eventservice/workexecutor/plugins/raythenaexecutor.py @@ -30,11 +30,17 @@ from typing import Any from pilot.common.errorcodes import ErrorCodes -from pilot.common.exception import FileHandlingFailure +from pilot.common.exception import ( + FileHandlingFailure, + NoSuchFile +) from pilot.eventservice.esprocess.esprocess import ESProcess from pilot.info.filespec import FileSpec from pilot.util.config import config -from pilot.util.filehandling import calculate_checksum, move +from pilot.util.filehandling import ( + calculate_checksum, + move +) from .baseexecutor import BaseExecutor logger = logging.getLogger(__name__) @@ -50,8 +56,8 @@ def __init__(self, **kwargs): :param kwargs: kwargs dictionary (dict). """ - super(RaythenaExecutor, self).__init__(**kwargs) - self.setName("RaythenaExecutor") + super().__init__(**kwargs) + self.name = "RaythenaExecutor" self.__queued_out_messages = [] self.__last_stageout_time = None self.__all_out_messages = [] @@ -91,7 +97,7 @@ def create_file_spec(self, pfn: str) -> FileSpec: """ try: checksum = calculate_checksum(pfn, algorithm=config.File.checksum_type) - except (FileHandlingFailure, NotImplementedError, Exception) as exc: + except Exception as exc: logger.warning(f'caught exception: {exc}') checksum = '' # fail later @@ -115,7 +121,7 @@ def move_output(self, pfn: str): if outputdir: try: move(pfn, outputdir) - except Exception as exc: + except (NoSuchFile, FileHandlingFailure) as exc: logger.warning(f'failed to move output: {exc}') def update_finished_event_ranges(self, out_messages: Any) -> None: @@ -159,7 +165,7 @@ def update_failed_event_ranges(self, out_messages: Any) -> None: event_ranges = [] for message in out_messages: - status = message['status'] if message['status'] in ['failed', 'fatal'] else 'failed' + status = message['status'] if message['status'] in {'failed', 'fatal'} else 'failed' # ToBeFixed errorCode event_ranges.append({"errorCode": errors.UNKNOWNPAYLOADFAILURE, "eventRangeID": message['id'], "eventStatus": status}) event_range_message = {'version': 0, 'eventRanges': json.dumps(event_ranges)} @@ -180,7 +186,7 @@ def handle_out_message(self, message: dict): self.__all_out_messages.append(message) - if message['status'] in ['failed', 'fatal']: + if message['status'] in {'failed', 'fatal'}: self.update_failed_event_ranges([message]) else: if 'output' in message: @@ -194,7 +200,7 @@ def stageout_es(self, force: bool = False): :param force: force stage out (bool). """ job = self.get_job() - if len(self.__queued_out_messages): + if self.__queued_out_messages: if force or self.__last_stageout_time is None or (time.time() > self.__last_stageout_time + job.infosys.queuedata.es_stageout_gap): out_messages = [] while len(self.__queued_out_messages) > 0: @@ -226,9 +232,9 @@ def run(self): payload = self.retrieve_payload() else: logger.error("payload is not set but is_retrieve_payload is also not set. No payloads.") + payload = None logger.info(f"payload: {payload}") - logger.info("starting ESProcess") proc = ESProcess(payload, waiting_time=999999) self.proc = proc diff --git a/pilot/eventservice/workexecutor/workexecutor.py b/pilot/eventservice/workexecutor/workexecutor.py index 945bf21f..9b26fa2b 100644 --- a/pilot/eventservice/workexecutor/workexecutor.py +++ b/pilot/eventservice/workexecutor/workexecutor.py @@ -18,7 +18,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2019-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2019-24 """Base executor - Main class to manage the event service work.""" @@ -41,7 +41,7 @@ def __init__(self, args: Any = None): :param args: args dictionary (Any). """ - super(WorkExecutor, self).__init__() + super().__init__() self.payload = None self.plugin = None self.is_retrieve_payload = False @@ -82,30 +82,22 @@ def get_plugin_confs(self) -> dict: :return: plugin configurations (dict). """ - plugin_confs = {} - if self.args and 'executor_type' in list(self.args.keys()): # Python 2/3 - if self.args['executor_type'] == 'hpo': - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.hpoexecutor.HPOExecutor'} - elif self.args['executor_type'] == 'raythena': - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.raythenaexecutor.RaythenaExecutor'} - elif self.args['executor_type'] == 'generic': - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor'} - elif self.args['executor_type'] == 'base': - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.baseexecutor.BaseExecutor'} - elif self.args['executor_type'] == 'nl': # network-less - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.nlexecutor.NLExecutor'} - elif self.args['executor_type'] == 'boinc': - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.boincexecutor.BOINCExecutor'} - elif self.args['executor_type'] == 'hammercloud': # hammercloud test: refine normal simul to ES - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.hammercloudexecutor.HammerCloudExecutor'} - elif self.args['executor_type'] == 'mpi': # network-less - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.mpiexecutor.MPIExecutor'} - elif self.args['executor_type'] == 'fineGrainedProc': - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.finegrainedprocexecutor.FineGrainedProcExecutor'} - else: - plugin_confs = {'class': 'pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor'} - - plugin_confs['args'] = self.args + executor_type_to_class = { + 'hpo': 'pilot.eventservice.workexecutor.plugins.hpoexecutor.HPOExecutor', + 'raythena': 'pilot.eventservice.workexecutor.plugins.raythenaexecutor.RaythenaExecutor', + 'generic': 'pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor', + 'base': 'pilot.eventservice.workexecutor.plugins.baseexecutor.BaseExecutor', + 'nl': 'pilot.eventservice.workexecutor.plugins.nlexecutor.NLExecutor', + 'boinc': 'pilot.eventservice.workexecutor.plugins.boincexecutor.BOINCExecutor', + 'hammercloud': 'pilot.eventservice.workexecutor.plugins.hammercloudexecutor.HammerCloudExecutor', + 'mpi': 'pilot.eventservice.workexecutor.plugins.mpiexecutor.MPIExecutor', + 'fineGrainedProc': 'pilot.eventservice.workexecutor.plugins.finegrainedprocexecutor.FineGrainedProcExecutor' + } + + executor_type = self.args.get('executor_type', 'generic') + class_name = executor_type_to_class.get(executor_type, + 'pilot.eventservice.workexecutor.plugins.genericexecutor.GenericExecutor') + plugin_confs = {'class': class_name, 'args': self.args} return plugin_confs @@ -127,8 +119,8 @@ def start(self): else: if not self.get_payload(): raise exception.SetupFailure("Payload is not assigned.") - else: - self.plugin.set_payload(self.get_payload()) + + self.plugin.set_payload(self.get_payload()) logger.info(f"Starting plugin: {self.plugin}") self.plugin.start() diff --git a/pilot/info/__init__.py b/pilot/info/__init__.py index 661993e1..a31ddc9d 100644 --- a/pilot/info/__init__.py +++ b/pilot/info/__init__.py @@ -17,7 +17,7 @@ # # Authors: # - Alexey Anisenkov, anisyonk@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2019-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2019-24 """ Pilot Information component. @@ -36,11 +36,11 @@ from collections import namedtuple from typing import Any +from pilot.common.exception import PilotException from .infoservice import InfoService from .jobinfo import JobInfoProvider # noqa from .jobdata import JobData # noqa from .filespec import FileSpec # noqa -from pilot.common.exception import PilotException logger = logging.getLogger(__name__) diff --git a/pilot/info/basedata.py b/pilot/info/basedata.py index 519b0a1e..43a9edcc 100644 --- a/pilot/info/basedata.py +++ b/pilot/info/basedata.py @@ -17,7 +17,7 @@ # # Authors: # - Alexey Anisenkov, anisyonk@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2019-2024 +# - Paul Nilsson, paul.nilsson@cern.ch, 2019-24 """ Base data class. @@ -141,7 +141,7 @@ def clean_numeric(self, raw: Any, ktype: Any, kname: Any = None, defval: int = 0 try: return ktype(raw) - except Exception: + except (ValueError, TypeError): if raw is not None: logger.warning(f'failed to convert data for key={kname}, raw={raw} to type={ktype}, defval={defval}') return defval @@ -166,7 +166,7 @@ def clean_string(self, raw: Any, ktype: Any, kname: Any = None, defval: str = "" raw = raw.strip() try: return ktype(raw) - except Exception: + except (ValueError, TypeError): logger.warning(f'failed to convert data for key={kname}, raw={raw} to type={ktype}') return defval @@ -215,7 +215,7 @@ def clean_dictdata(self, raw: Any, ktype: Any, kname: Any = None, defval: Any = return defval try: return ktype(raw) - except Exception: + except (ValueError, TypeError): logger.warning(f'failed to convert data for key={kname}, raw={raw} to type={ktype}') return defval @@ -239,7 +239,7 @@ def clean_listdata(self, raw: Any, ktype: Any, kname: Any = None, defval: Any = raw = raw.split(',') try: return ktype(raw) - except Exception: + except (ValueError, TypeError): logger.warning(f'failed to convert data for key={kname}, raw={raw} to type={ktype}') return defval diff --git a/pilot/info/dataloader.py b/pilot/info/dataloader.py index 530b8480..bcdf232b 100644 --- a/pilot/info/dataloader.py +++ b/pilot/info/dataloader.py @@ -17,7 +17,7 @@ # # Authors: # - Alexey Anisenkov, anisyonk@cern.ch, 2018 -# - Paul Nilsson, paul.nilsson@cern.ch, 2019-2024 +# - Paul Nilsson, paul.nilsson@cern.ch, 2019-24 """ Base loader class to retrieve data from Ext sources (file, url). diff --git a/pilot/info/filespec.py b/pilot/info/filespec.py index 0f10973c..d7e8e906 100644 --- a/pilot/info/filespec.py +++ b/pilot/info/filespec.py @@ -187,7 +187,7 @@ def is_directaccess(self, ensure_replica: bool = True, allowed_replica_schemas: if ensure_replica: allowed_replica_schemas = allowed_replica_schemas or ['root', 'dcache', 'dcap', 'file', 'https'] - if not self.turl or not any([self.turl.startswith(f'{allowed}://') for allowed in allowed_replica_schemas]): + if not self.turl or not any(self.turl.startswith(f'{allowed}://') for allowed in allowed_replica_schemas): _is_directaccess = False return _is_directaccess diff --git a/pilot/info/infoservice.py b/pilot/info/infoservice.py index 5162f6fd..1dc4d6e5 100644 --- a/pilot/info/infoservice.py +++ b/pilot/info/infoservice.py @@ -78,6 +78,7 @@ def inner(self, *args: Any, **kwargs: dict) -> Any: :raises PilotException: in case of error. """ if getattr(self, key, None) is None: + # pylint: disable=no-member raise PilotException(f"failed to call {func.__name__}(): InfoService instance is not initialized. " f"Call init() first!") @@ -173,6 +174,7 @@ def _resolve_data(cls, fname: Any, providers: Any = None, args: list = None, kwa if not merge: return r ret = merge_dict_data(ret or {}, r or {}) + # pylint: disable=broad-except except Exception as exc: logger.warning(f"failed to resolve data ({fcall.__name__}) from provider={provider} .. skipped, error={exc}") logger.warning(traceback.format_exc()) diff --git a/pilot/info/jobdata.py b/pilot/info/jobdata.py index b61c09bb..a760341d 100644 --- a/pilot/info/jobdata.py +++ b/pilot/info/jobdata.py @@ -17,7 +17,7 @@ # # Authors: # - Alexey Anisenkov, anisyonk@cern.ch, 2018-19 -# - Paul Nilsson, paul.nilsson@cern.ch, 2018-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2018-24 # - Wen Guan, wen.guan@cern.ch, 2018 """ @@ -125,6 +125,7 @@ class JobData(BaseData): subprocesses = [] # list of PIDs for payload subprocesses prodproxy = "" # to keep track of production proxy on unified queues completed = False # True when job has finished or failed, used by https::send_update() + lsetuptime = 0 # payload setup time (lsetup) # time variable used for on-the-fly cpu consumption time measurements done by job monitoring t0 = None # payload startup time diff --git a/pilot/scripts/open_file.sh b/pilot/scripts/open_file.sh index c213a824..dbfb6a6e 100644 --- a/pilot/scripts/open_file.sh +++ b/pilot/scripts/open_file.sh @@ -6,5 +6,7 @@ lsetup 'root pilot-default' echo LSETUP_COMPLETED date python3 REPLACE_ME_FOR_CMD +export PYTHON_EC=$? echo "Script execution completed." -exit $? +echo PYTHON_COMPLETED $PYTHON_EC +exit $PYTHON_EC diff --git a/pilot/user/atlas/common.py b/pilot/user/atlas/common.py index a09fbfc9..006148f0 100644 --- a/pilot/user/atlas/common.py +++ b/pilot/user/atlas/common.py @@ -17,7 +17,7 @@ # under the License. # # Authors: -# - Paul Nilsson, paul.nilsson@cern.ch, 2017-2024 +# - Paul Nilsson, paul.nilsson@cern.ch, 2017-24 # - Wen Guan, wen.guan@cern.ch, 2018 """Common functions for ATLAS.""" @@ -180,19 +180,20 @@ def validate(job: Any) -> bool: return status -def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, list): # noqa: C901 +def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, list, int): # noqa: C901 """ Verify that direct i/o files can be opened. :param indata: list of FileSpec (list) :param workdir: working directory (str) :param nthreads: number of concurrent file open threads (int) - :return: exit code (int), diagnostics (str), not opened files (list) + :return: exit code (int), diagnostics (str), not opened files (list), lsetup time (int). :raises PilotException: in case of pilot error. """ exitcode = 0 diagnostics = "" not_opened = [] + lsetup_time = 0 # extract direct i/o files from indata (string of comma-separated turls) turls = extract_turls(indata) @@ -223,7 +224,7 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l ) logger.warning(diagnostics) logger.warning(f'tested both path={dir1} and path={dir2} (none exists)') - return exitcode, diagnostics, not_opened + return exitcode, diagnostics, not_opened, lsetup_time try: copy(full_script_path, final_script_path) @@ -231,7 +232,7 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l # do not set ec since this will be a pilot issue rather than site issue diagnostics = f'cannot perform file open test - pilot source copy failed: {exc}' logger.warning(diagnostics) - return exitcode, diagnostics, not_opened + return exitcode, diagnostics, not_opened, lsetup_time # correct the path when containers have been used if "open_remote_file.py" in script: @@ -246,7 +247,7 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l diagnostics = (f'cannot perform file open test - failed to read script content from path ' f'{final_paths["open_file.sh"]}') logger.warning(diagnostics) - return exitcode, diagnostics, not_opened + return exitcode, diagnostics, not_opened, lsetup_time logger.debug(f'creating file open command from path: {final_paths["open_remote_file.py"]}') _cmd = get_file_open_command(final_paths['open_remote_file.py'], turls, nthreads) @@ -254,7 +255,7 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l diagnostics = (f'cannot perform file open test - failed to create file open command from path ' f'{final_paths["open_remote_file.py"]}') logger.warning(diagnostics) - return exitcode, diagnostics, not_opened + return exitcode, diagnostics, not_opened, lsetup_time timeout = get_timeout_for_remoteio(indata) cmd = create_root_container_command(workdir, _cmd, script_content) @@ -265,12 +266,12 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l except FileHandlingFailure as exc: diagnostics = f'failed to write file: {exc}' logger.warning(diagnostics) - return 11, diagnostics, not_opened + return 11, diagnostics, not_opened, lsetup_time # if execute_remote_file_open() returns exit code 1, it means general error. # exit code 2 means that lsetup timed out, while 3 means that the python script (actual file open) timed out try: - exitcode, stdout = execute_remote_file_open(path, timeout) + exitcode, stdout, lsetup_time = execute_remote_file_open(path, timeout) except PilotException as exc: logger.warning(f'caught pilot exception: {exc}') exitcode = 11 @@ -279,14 +280,19 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l # if config.Pilot.remotefileverification_log: # fpath = os.path.join(workdir, config.Pilot.remotefileverification_log) # write_file(fpath, stdout + stderr, mute=False) + logger.info(f'remote file open finished with ec={exitcode}') + if lsetup_time > 0: + logger.info(f"lsetup completed after {lsetup_time} seconds") + else: + logger.info("lsetup did not finish correctly") # error handling if exitcode: # first check for apptainer errors _exitcode = errors.resolve_transform_error(exitcode, stdout) if _exitcode != exitcode: # a better error code was found (COMMANDTIMEDOUT error will be passed through) - return _exitcode, stdout, not_opened + return _exitcode, stdout, not_opened, lsetup_time # note: if the remote files could still be opened the reported error should not be REMOTEFILEOPENTIMEDOUT _exitcode, diagnostics, not_opened = parse_remotefileverification_dictionary(workdir) @@ -308,7 +314,7 @@ def open_remote_files(indata: list, workdir: str, nthreads: int) -> (int, str, l if exitcode: logger.warning(f'remote file open exit code: {exitcode}') - return exitcode, diagnostics, not_opened + return exitcode, diagnostics, not_opened, lsetup_time def get_timeout_for_remoteio(indata: list) -> int: @@ -511,10 +517,14 @@ def get_payload_command(job: Any) -> str: try: logger.debug('executing open_remote_files()') - exitcode, diagnostics, not_opened_turls = open_remote_files(job.indata, job.workdir, get_nthreads(catchall)) + exitcode, diagnostics, not_opened_turls, lsetup_time = open_remote_files(job.indata, job.workdir, get_nthreads(catchall)) except Exception as exc: logger.warning(f'caught std exception: {exc}') else: + # store the lsetup time for later reporting with job metrics + if lsetup_time: + job.lsetuptime = lsetup_time + # read back the base trace report path = os.path.join(job.workdir, config.Pilot.base_trace_report) if not os.path.exists(path): @@ -1985,7 +1995,8 @@ def get_redundants() -> list: "docs/", "/venv/", "/pilot3", - "%1"] + "%1", + "open_remote_file_cmd.sh"] return dir_list diff --git a/pilot/user/atlas/container.py b/pilot/user/atlas/container.py index e74b1ceb..f6ada08d 100644 --- a/pilot/user/atlas/container.py +++ b/pilot/user/atlas/container.py @@ -805,14 +805,14 @@ def create_root_container_command(workdir: str, cmd: str, script: str) -> str: return command -def execute_remote_file_open(path: str, python_script_timeout: int) -> (int, str): +def execute_remote_file_open(path: str, python_script_timeout: int) -> (int, str, int): # noqa: C901 """ Execute the remote file open script. :param path: path to container script (str) :param workdir: workdir (str) :param python_script_timeout: timeout (int) - :return: exit code (int), stdout (str). + :return: exit code (int), stdout (str), lsetup time (int). """ lsetup_timeout = 600 # Timeout for 'lsetup' step exit_code = 1 @@ -824,66 +824,100 @@ def execute_remote_file_open(path: str, python_script_timeout: int) -> (int, str fcntl.fcntl(process.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) # Set non-blocking except OSError as e: logger.warning(f"error starting subprocess: {e}") - return exit_code + return exit_code, "", 0 - start_time = time.time() # Track execution start time - lsetup_completed = False # Flag to track completion of 'lsetup' + # Split the path at the last dot + filename, old_suffix = path.rsplit(".", 1) - while True: - # Check for timeout (once per second) - if time.time() - start_time > lsetup_timeout and not lsetup_completed: - logger.warning("timeout for 'lsetup' exceeded - killing script") - exit_code = 2 # 'lsetup' timeout - process.kill() - break + # Create the new path with the desired suffix + new_path = f"{filename}.stdout" - # Try to read output without blocking (might return None) - try: - output = process.stdout.readline() # Read bytes directly - if output is not None: # Check if any output is available (not None) - output = output.decode().strip() - logger.info(f'remote file open: {output}') - - # Check for LSETUP_COMPLETED message - if output == "LSETUP_COMPLETED": - logger.info('lsetup has completed (resetting start time)') - lsetup_completed = True - start_time = time.time() # Reset start time for 'python3' timeout - - stdout += output + "\n" - except BlockingIOError: - time.sleep(0.1) # No output available yet, continue the loop - continue - except (OSError, ValueError): # Catch potential errors from process.stdout - # print(f"Error reading from subprocess output: {e}") - # # Handle the error (e.g., log it, retry, exit) - # break - # logger.warning(f"error reading from subprocess output: {e}") - time.sleep(0.1) - continue - - # Timeout for python script after LSETUP_COMPLETED - if lsetup_completed and ((time.time() - start_time) > python_script_timeout): - logger.warning(f"timeout for 'python3' subscript exceeded - killing script " - f"({time.time()} - {start_time} > {python_script_timeout})") - exit_code = 3 # python script timeout - process.kill() - break - - # Check if script has completed normally - return_code = process.poll() - if return_code is not None: - logger.info(f"script execution completed with return code: {return_code}") - exit_code = return_code - break - - time.sleep(0.5) + start_time = time.time() # Track execution start time + lsetup_start_time = start_time + lsetup_completed = False # Flag to track completion of 'lsetup' process + python_completed = False # Flag to track completion of 'python3' process + lsetup_completed_at = None + + with open(new_path, "w", encoding='utf-8') as file: + while True: + # Check for timeout (once per second) + if time.time() - start_time > lsetup_timeout and not lsetup_completed: + logger.warning("timeout for 'lsetup' exceeded - killing script") + exit_code = 2 # 'lsetup' timeout + process.kill() + break + + # Try to read output without blocking (might return None) + try: + output = process.stdout.readline() # Read bytes directly + if output is not None: # Check if any output is available (not None) + output = output.decode().strip() + if output: + file.write(output + "\n") + # logger.info(f'remote file open: {output}') + + # Check for LSETUP_COMPLETED message + if output == "LSETUP_COMPLETED": + logger.info('lsetup has completed (resetting start time)') + lsetup_completed = True + lsetup_completed_at = time.time() + start_time = time.time() # Reset start time for 'python3' timeout + + # Check for LSETUP_COMPLETED message + if "PYTHON_COMPLETED" in output: + python_completed = True + match = re.search(r"\d+$", output) + if match: + exit_code = int(match.group()) + logger.info(f"python remote open command has completed with exit code {exit_code}") + else: + logger.info("python remote open command has completed without any exit code") + + stdout += output + "\n" + except BlockingIOError: + time.sleep(0.1) # No output available yet, continue the loop + continue + except (OSError, ValueError): # Catch potential errors from process.stdout + # print(f"Error reading from subprocess output: {e}") + # # Handle the error (e.g., log it, retry, exit) + # break + time.sleep(0.1) + continue + + # Timeout for python script after LSETUP_COMPLETED + if lsetup_completed and ((time.time() - start_time) > python_script_timeout): + logger.warning(f"timeout for 'python3' subscript exceeded - killing script " + f"({time.time()} - {start_time} > {python_script_timeout})") + exit_code = 3 # python script timeout + process.kill() + break + + if python_completed: + logger.info('aborting since python command has finished') + return_code = process.poll() + if return_code: + logger.warning(f"script execution completed with return code: {return_code}") + # exit_code = return_code + break + + # Check if script has completed normally + return_code = process.poll() + if return_code is not None: + pass + # logger.info(f"script execution completed with return code: {return_code}") + # exit_code = return_code + # break + + time.sleep(0.5) # Ensure process is terminated if process.poll() is None: process.terminate() - return exit_code, stdout + # Check if 'lsetup' was completed + lsetup_time = int(lsetup_completed_at - lsetup_start_time) if lsetup_completed_at else 0 + + return exit_code, stdout, lsetup_time def fix_asetup(asetup): @@ -900,7 +934,7 @@ def fix_asetup(asetup): return asetup -def create_middleware_container_command(job, cmd, label='stagein', proxy=True): +def create_middleware_container_command(job, cmd, label='stage-in', proxy=True): """ Create the container command for stage-in/out or other middleware. @@ -962,7 +996,11 @@ def create_middleware_container_command(job, cmd, label='stagein', proxy=True): if label == 'setup': # set the platform info command += f'export thePlatform="{job.platform}";' - command += f'source ${{ATLAS_LOCAL_ROOT_BASE}}/user/atlasLocalSetup.sh -c {middleware_container}' + command += f'source ${{ATLAS_LOCAL_ROOT_BASE}}/user/atlasLocalSetup.sh -c ' # noqa: F541 + if middleware_container: + command += f'{middleware_container}' + elif label == 'stage-in' or label == 'stage-out': + command += 'el9 ' if label == 'setup': command += f' -s /srv/{script_name} -r /srv/{container_script_name}' else: @@ -1016,7 +1054,7 @@ def get_middleware_container_script(middleware_container: str, cmd: str, asetup: _asetup = get_asetup(asetup=False) _asetup = fix_asetup(_asetup) content += _asetup - if label == 'stagein' or label == 'stageout': + if label == 'stage-in' or label == 'stage-out': content += sitename + 'lsetup rucio davix xrootd; ' content += f'python3 {cmd} ' else: diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index ba47ba9d..d82abb3d 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -171,7 +171,8 @@ def get_proper_pid(pid, pgrp, jobid, command="", transformation="", outdata="", if _pid: logger.debug(f'discovered pid={_pid} for job id {jobid}') cmd = get_command_by_pid(_pid) - logger.debug(f'command for pid {_pid}: {cmd}') + if cmd: + logger.debug(f'command for pid {_pid}: {cmd}') break logger.warning(f'payload pid has not yet been identified (#{i + 1}/#{imax})') diff --git a/pilot/util/constants.py b/pilot/util/constants.py index f73f723d..5a44eda4 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '3' # build number should be reset to '1' for every new development cycle +REVISION = '8' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '21' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/default.cfg b/pilot/util/default.cfg index 63f715f9..fb88488e 100644 --- a/pilot/util/default.cfg +++ b/pilot/util/default.cfg @@ -271,9 +271,11 @@ middleware_stageout_stderr: stageout_stderr.txt # Name of middleware image # This image is used if middleware is not found locally on the worker node. Middleware is expected to be present # in the container image -middleware_container: /cvmfs/unpacked.cern.ch/registry.hub.docker.com/atlas/rucio-clients:default +# If middleware container is requested via PQ.container_type, but middleware_container is empty, then ALRB will be used +# Outdated: middleware_container: /cvmfs/unpacked.cern.ch/registry.hub.docker.com/atlas/rucio-clients:default +middleware_container: # On HPC (ALRB will locate the image) -middleware_container_no_path: atlas/rucio-clients:default +middleware_container_no_path: el9 ################################ # Harvester parameters diff --git a/pilot/util/filehandling.py b/pilot/util/filehandling.py index 4a0deb2c..e4cb8b43 100644 --- a/pilot/util/filehandling.py +++ b/pilot/util/filehandling.py @@ -555,6 +555,7 @@ def move(path1: str, path2: str): :param path1: source path (str) :param path2: destination path (str). + :raises PilotException: FileHandlingFailure, NoSuchFile. """ if not os.path.exists(path1): diagnostic = f'file copy failure: path does not exist: {path1}' diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index c8968f5d..f9b606e2 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -261,6 +261,9 @@ def get_command_by_pid(pid: int) -> str or None: process = psutil.Process(pid) command = " ".join(process.cmdline()) return command + except NameError: + logger.warning('psutil module not available - aborting') + return None except psutil.NoSuchProcess: - print(f"process with PID {pid} not found") + logger.warning(f"process with PID {pid} not found") return None diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index d7514545..3f432a44 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -147,7 +147,7 @@ def __init__(self, args: Any, info_dic: dict, workdir: str, secrets: str, level: client = google.cloud.logging.Client() _handler = CloudLoggingHandler(client, name=name) api_logger = logging.getLogger('google.cloud.logging_v2') - api_logger.setLevel(logger.INFO) + api_logger.setLevel(logging.INFO) elif logtype == "fluent": _handler = fluent_handler.FluentHandler(name, host=server, port=port) elif logtype == "logstash":