Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.9.1.13 #150

Merged
merged 16 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.9.0.17
3.9.1.13
8 changes: 2 additions & 6 deletions pilot/control/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def send_state(job: Any, args: Any, state: str, xml: str = "", metadata: str = "
return False


def get_job_status_from_server(job_id: int, url: str, port: str) -> (str, int, int):
def get_job_status_from_server(job_id: int, url: str, port: int) -> (str, int, int):
"""
Return the current status of job <jobId> from the dispatcher.

Expand All @@ -482,7 +482,7 @@ def get_job_status_from_server(job_id: int, url: str, port: str) -> (str, int, i
In the case of time-out, the dispatcher will be asked one more time after 10 s.

:param job_id: PanDA job id (int)
:param url: PanDA server URL (str
:param url: PanDA server URL (int)
:param port: PanDA server port (str)
:return: status (string; e.g. holding), attempt_nr (int), status_code (int).
"""
Expand Down Expand Up @@ -1512,10 +1512,6 @@ def get_dispatcher_dictionary(args: Any, taskid: str = "") -> dict:
if 'HARVESTER_WORKER_ID' in os.environ:
data['worker_id'] = os.environ.get('HARVESTER_WORKER_ID')

# instruction_sets = has_instruction_sets(['AVX', 'AVX2'])
# if instruction_sets:
# data['cpuConsumptionUnit'] = instruction_sets

return data


Expand Down
5 changes: 4 additions & 1 deletion pilot/control/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
last_minute_check = t_0

queuedata = get_queuedata_from_job(queues)
if not queuedata:
logger.warning('queuedata could not be extracted from queues')

push = args.harvester and args.harvester_submitmode.lower() == 'push'
try:
# overall loop counter (ignoring the fact that more than one job may be running)
Expand All @@ -103,7 +106,7 @@ def control(queues: namedtuple, traces: Any, args: object): # noqa: C901
break

# check if the OIDC token needs to be refreshed
if tokendownloadchecktime:
if tokendownloadchecktime and queuedata:
if int(time.time() - last_token_check) > tokendownloadchecktime:
last_token_check = time.time()
if 'no_token_renewal' in queuedata.catchall:
Expand Down
11 changes: 7 additions & 4 deletions pilot/user/atlas/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@

# from pilot.user.atlas.setup import get_file_system_root_path
from pilot.common.errorcodes import ErrorCodes
from pilot.util.container import execute
from pilot.util.container import (
execute,
execute_nothreads
)
from pilot.util.proxy import get_proxy

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -134,7 +137,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo
if not hasattr(verify_arcproxy, "cache"):
verify_arcproxy.cache = {}

if proxy_id in verify_arcproxy.cache: # if exist, then calculate result from current cache
if proxy_id in verify_arcproxy.cache: # if exists, then calculate result from current cache
validity_end_cert = verify_arcproxy.cache[proxy_id][0]
validity_end = verify_arcproxy.cache[proxy_id][1]
if validity_end < 0: # previous validity check failed, do not try to re-check
Expand Down Expand Up @@ -162,7 +165,7 @@ def verify_arcproxy(envsetup: str, limit: int, proxy_id: str = "pilot", test: bo
_exit_code, _, _ = execute(cmd, shell=True) # , usecontainer=True, copytool=True)

cmd = f"{envsetup}arcproxy -i validityEnd -i validityLeft -i vomsACvalidityEnd -i vomsACvalidityLeft"
_exit_code, stdout, stderr = execute(cmd, shell=True) # , usecontainer=True, copytool=True)
_exit_code, stdout, stderr = execute_nothreads(cmd, shell=True) # , usecontainer=True, copytool=True)
if stdout is not None:
if 'command not found' in stdout:
logger.warning(f"arcproxy is not available on this queue,"
Expand Down Expand Up @@ -243,7 +246,7 @@ def verify_vomsproxy(envsetup: str, limit: int) -> tuple[int, str]:
if os.environ.get('X509_USER_PROXY', '') != '':
cmd = f"{envsetup}voms-proxy-info -actimeleft --timeleft --file $X509_USER_PROXY"
logger.info(f'executing command: {cmd}')
_exit_code, stdout, stderr = execute(cmd, shell=True)
_exit_code, stdout, stderr = execute_nothreads(cmd, shell=True)
if stdout is not None:
if "command not found" in stdout:
logger.info("skipping voms proxy check since command is not available")
Expand Down
7 changes: 5 additions & 2 deletions pilot/user/atlas/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
NoSuchFile
)
from pilot.info.jobdata import JobData
from pilot.util.container import execute
from pilot.util.container import (
execute,
execute_nothreads
)
from pilot.util.filehandling import (
read_json,
copy,
Expand Down Expand Up @@ -830,7 +833,7 @@ def filter_output(stdout):

# CPU arch script has now been copied, time to execute it
# (reset irrelevant stderr)
ec, stdout, stderr = execute(cmd)
ec, stdout, stderr = execute_nothreads(cmd)
if ec == 0 and ('RHEL9 and clone support is relatively new' in stderr or
'RHEL8 and clones are not supported for users' in stderr):
stderr = ''
Expand Down
53 changes: 35 additions & 18 deletions pilot/util/auxiliary.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#
# Authors:
# - Paul Nilsson, [email protected], 2017-23
# - Paul Nilsson, [email protected], 2017-24

"""Auxiliary functions."""

Expand All @@ -33,6 +33,7 @@
from numbers import Number
from time import sleep
from typing import Any
from uuid import uuid4

from pilot.util.constants import (
SUCCESS,
Expand All @@ -44,7 +45,10 @@
)
from pilot.common.errorcodes import ErrorCodes
from pilot.util.container import execute
from pilot.util.filehandling import dump
from pilot.util.filehandling import (
dump,
grep
)

zero_depth_bases = (str, bytes, Number, range, bytearray)
iteritems = 'items'
Expand Down Expand Up @@ -444,7 +448,7 @@ def get_memory_usage(pid: int) -> (int, str, str):
return execute(f'ps aux -q {pid}', timeout=60)


def extract_memory_usage_value(output: str) -> int:
def extract_memory_usage_value(output: str) -> str:
"""
Extract the memory usage value from the ps output (in kB).

Expand Down Expand Up @@ -482,34 +486,38 @@ def cut_output(txt: str, cutat: int = 1024, separator: str = '\n[...]\n') -> str
return txt


def has_instruction_set(instruction_set: str) -> bool:
def has_instruction_sets(instruction_sets: list) -> str:
"""
Determine whether a given CPU instruction set is available.

The function will use grep to search in /proc/cpuinfo (both in upper and lower case).
Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2"

:param instruction_set: instruction set (e.g. AVX2) (str)
:return: True if given instruction set is available, False otherwise (bool).
:param instruction_sets: instruction set (e.g. AVX2) (list)
:return: string of pipe-separated instruction sets (str).
"""
status = False
cmd = fr"grep -o \'{instruction_set.lower()}[^ ]*\|{instruction_set.upper()}[^ ]*\' /proc/cpuinfo"
exit_code, stdout, stderr = execute(cmd)
if not exit_code and not stderr:
if instruction_set.lower() in stdout.split() or instruction_set.upper() in stdout.split():
status = True
ret = ""

return status
for instr in instruction_sets:
pattern = re.compile(fr'{instr.lower()}[^ ]*', re.IGNORECASE)
out = grep(patterns=[pattern], file_name="/proc/cpuinfo")

for stdout in out:
if instr.upper() not in ret and (instr.lower() in stdout.split() or instr.upper() in stdout.split()):
ret += f'|{instr.upper()}' if ret else instr.upper()

def has_instruction_sets(instruction_sets: str) -> bool:
return ret


def has_instruction_sets_old(instruction_sets: list) -> str:
"""
Determine whether a given list of CPU instruction sets is available.

The function will use grep to search in /proc/cpuinfo (both in upper and lower case).
Example: instruction_sets = ['AVX', 'AVX2', 'SSE4_2', 'XXX'] -> "AVX|AVX2|SSE4_2"

:param instruction_sets: instruction set (e.g. AVX2) (str)
:return: True if given instruction set is available, False otherwise (bool).
:param instruction_sets: instruction set (e.g. AVX2) (list)
:return: string of pipe-separated instruction sets (str).
"""
ret = ""
pattern = ""
Expand Down Expand Up @@ -709,8 +717,8 @@ def get_host_name():
else:
try:
host = socket.gethostname()
except socket.herror as exc:
logger.warning(f'failed to get host name: {exc}')
except socket.herror as e:
logger.warning(f'failed to get host name: {e}')
host = 'localhost'
return host.split('.')[0]

Expand Down Expand Up @@ -821,3 +829,12 @@ def is_kubernetes_resource() -> bool:
return True
else:
return False


def uuidgen_t() -> str:
"""
Generate a UUID string in the same format as "uuidgen -t".

:return: A UUID in the format "00000000-0000-0000-0000-000000000000" (str).
"""
return str(uuid4())
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '9' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '0' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '17' # build number should be reset to '1' for every new development cycle
REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '13' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
28 changes: 20 additions & 8 deletions pilot/util/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

"""Functions for executing commands."""

import errno
import os
import subprocess
import logging
Expand Down Expand Up @@ -113,12 +114,17 @@ def read_output(stream, queue):
while True:
try:
line = stream.readline()
except AttributeError:
# Handle the case where stream is None
break

if not line:
if not line:
break
except (AttributeError, ValueError):
# Handle the case where stream is None (AttributeError) or closed (ValueError)
break
except OSError as e:
if e.errno == errno.EBADF:
# Handle the case where the file descriptor is bad
break
else:
raise

queue.put(line)

Expand Down Expand Up @@ -146,8 +152,11 @@ def read_output(stream, queue):
exit_code = process.poll()

# Wait for the threads to finish reading
stdout_thread.join()
stderr_thread.join()
try:
stdout_thread.join()
stderr_thread.join()
except Exception as e:
logger.warning(f'exception caught in execute: {e}')

# Read the remaining output from the queues
while not stdout_queue.empty():
Expand Down Expand Up @@ -263,12 +272,15 @@ def read_output(pipe, output_list):
return exit_code, stdout, stderr


def execute_old(executable: Any, **kwargs: dict) -> Any:
def execute_nothreads(executable: Any, **kwargs: dict) -> Any:
"""
Execute the command with its options in the provided executable list using subprocess time-out handler.

The function also determines whether the command should be executed within a container.

This variant of execute() is not using threads to read stdout and stderr. This is required for some use-cases like
executing arcproxy where the stdout is time-ordered.

:param executable: command to be executed (str or list)
:param kwargs: kwargs (dict)
:return: exit code (int), stdout (str) and stderr (str) (or process if requested via returnproc argument).
Expand Down
26 changes: 26 additions & 0 deletions pilot/util/filehandling.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,32 @@ def grep(patterns: list, file_name: str) -> list:
"""
Search for the patterns in the given list in a file.

Example:
grep(["St9bad_alloc", "FATAL"], "athena_stdout.txt")
-> [list containing the lines below]
CaloTrkMuIdAlg2.sysExecute() ERROR St9bad_alloc
AthAlgSeq.sysExecute() FATAL Standard std::exception is caught

:param patterns: list of regexp patterns (list)
:param file_name: file name (str)
:return: list of matched lines in file (list).
"""
matched_lines = []
compiled_patterns = [re.compile(pattern) for pattern in patterns]

with open(file_name, 'r', encoding='utf-8') as _file:
matched_lines = [
line for line in _file
if any(compiled_pattern.search(line) for compiled_pattern in compiled_patterns)
]

return matched_lines


def grep_old(patterns: list, file_name: str) -> list:
"""
Search for the patterns in the given list in a file.

Example:
grep(["St9bad_alloc", "FATAL"], "athena_stdout.txt")
-> [list containing the lines below]
Expand Down
30 changes: 21 additions & 9 deletions pilot/util/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from signal import SIGKILL

from pilot.common.errorcodes import ErrorCodes
from pilot.common.exception import PilotException, MiddlewareImportFailure
from pilot.common.exception import PilotException, MiddlewareImportFailure #, FileHandlingFailure
from pilot.util.auxiliary import set_pilot_state #, show_memory_usage
from pilot.util.config import config
from pilot.util.constants import PILOT_PRE_PAYLOAD
Expand All @@ -40,7 +40,8 @@
remove_files,
get_local_file_size,
read_file,
zip_files
zip_files,
#write_file
)
from pilot.util.loopingjob import looping_job
from pilot.util.math import (
Expand Down Expand Up @@ -135,7 +136,7 @@ def job_monitor_tasks(job: JobData, mt: MonitoringTime, args: object) -> tuple[i
if exit_code != 0:
return exit_code, diagnostics

# display OOM process info
# display OOM process info (once)
display_oom_info(job.pid)

# should the pilot abort the payload?
Expand Down Expand Up @@ -204,20 +205,31 @@ def display_oom_info(payload_pid):

:param payload_pid: payload pid (int).
"""

#fname = f"/proc/{payload_pid}/oom_score_adj"
payload_score = get_score(payload_pid) if payload_pid else 'UNKNOWN'
pilot_score = get_score(os.getpid())
logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score}')
if isinstance(pilot_score, str) and pilot_score == 'UNKNOWN':
logger.warning(f'could not get oom_score for pilot process: {pilot_score}')
else:
#relative_payload_score = "1"

# write the payload oom_score to the oom_score_adj file
#try:
# write_file(path=fname, contents=relative_payload_score)
#except Exception as e: # FileHandlingFailure
# logger.warning(f'could not write oom_score to file: {e}')

#logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score} (attempted writing relative score 1 to {fname})')
logger.info(f'oom_score(pilot) = {pilot_score}, oom_score(payload) = {payload_score}')

def get_score(pid):

def get_score(pid) -> str:
"""
Get the OOM process score.

:param pid: process id (int).
:return: score (string).
:param pid: process id (int)
:return: score (str).
"""

try:
score = '%s' % read_file('/proc/%d/oom_score' % pid)
except Exception as error:
Expand Down
Loading
Loading