Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alt stage-out for unified queues #152

Merged
merged 28 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
528a6b0
Corrected a few type hints, removed unused code
Oct 18, 2024
3e04dc6
execute() now using thread synchronization to protect against polling…
Oct 18, 2024
17f1b0a
Using start_new_session instead of preexec_fn in subprocess.Popen(). …
Oct 21, 2024
f58d93c
Various tests for oom score, now out-commented
Oct 28, 2024
8556790
Added new option for turning off token renewal
Oct 28, 2024
831cfa4
Added new option for turning off token renewal
Oct 28, 2024
162dc16
Now locating correct payload pid for oom score
PalNilsson Oct 29, 2024
f488558
Testing oom write
Oct 30, 2024
b93f3cc
Removed useless arcproxy -i subject
PalNilsson Oct 31, 2024
b1c4549
Cleanup
PalNilsson Oct 31, 2024
0b5ff5f
Added PROXYTOOSHORT
PalNilsson Oct 31, 2024
dae81f3
Added PROXYTOOSHORT as pilot exit code 80
PalNilsson Oct 31, 2024
15037f9
Implementing and testing proxy too short
PalNilsson Oct 31, 2024
5ab7204
Removed test
PalNilsson Oct 31, 2024
d89faa3
Updated build number
PalNilsson Oct 31, 2024
5ece001
Removed proxy name check
Oct 31, 2024
3d499d8
Pylint updates
Nov 1, 2024
5a02856
Pylint updates
Nov 1, 2024
f495201
Pylint updates
Nov 1, 2024
fc76bce
Pylint updates
Nov 1, 2024
cdfc37f
Pylint updates
Nov 1, 2024
ca8683d
Pylint updates
Nov 1, 2024
e20a831
Pylint updates
Nov 1, 2024
f60beed
Pylint updates
Nov 4, 2024
cbdced9
IPv4 support in request2()
PalNilsson Nov 6, 2024
b9825c4
Added support for http_proxy
Nov 7, 2024
87123d3
enable altstageout for unified queues; nucleus use-case should be che…
anisyonk Nov 8, 2024
024cd70
modify altstage out logic (exclude nucleus)
anisyonk Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# - Mario Lassnig, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2024
# - Tobias Wegner, [email protected], 2017-2018
# - Alexey Anisenkov, [email protected], 2018-2019
# - Alexey Anisenkov, [email protected], 2018-2024

"""API for data transfers."""

Expand Down Expand Up @@ -1072,14 +1072,15 @@ class StageOutClient(StagingClient):

mode = "stage-out"

def prepare_destinations(self, files: list, activities: list or str) -> list:
def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = []) -> list:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pylint will complain with "W0102: Dangerous default value [] as argument (dangerous-default-value)" here. I will change it to alt_exclude: list = None, and then add "if alt_exclude is None: alt_exclude = []"

"""
Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`.

Apply Pilot-side logic to choose proper destination.

:param files: list of FileSpec objects to be processed (list)
:param activities: ordered list of activities to be used to resolve astorages (list or str)
:param alt_exclude: global list of destinations that should be excluded / not used for alternative stage-out
:return: updated fspec entries (list).
"""
if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do
Expand Down Expand Up @@ -1108,11 +1109,26 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
raise PilotException(f"Failed to resolve destination: no associated storages defined for activity={activity} ({act})",
code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED')

# take the fist choice for now, extend the logic later if need
ddm = storages[0]
ddm_alt = storages[1] if len(storages) > 1 else None
def resolve_alt_destination(primary, exclude=None):
""" resolve alt destination as the next to primary entry not equal to `primary` and `exclude` """

cur = storages.index(primary) if primary in storages else 0
inext = (cur + 1) % len(storages) # cycle storages, take the first elem when reach end
exclude = set([primary] + list(exclude if exclude is not None else []))
alt = None
for attempt in range(len(exclude) or 1): # apply several tries to jump exclude entries (in case of dublicated data will stack)
inext = (cur + 1) % len(storages) # cycle storages, start from the beginning when reach end
if storages[inext] not in exclude:
alt = storages[inext]
break
cur += 1
return alt

# default destination
ddm = storages[0] # take the fist choice for now, extend the logic later if need
ddm_alt = resolve_alt_destination(ddm, exclude=alt_exclude)

self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}")
self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}, alt_exclude={alt_exclude}")
self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}")

for e in files:
Expand All @@ -1125,15 +1141,14 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
# pass
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => use it as (non local) alternative destination
self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations"
" .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint # verify me
e.ddmendpoint = ddm # check/verify nucleus case
else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list)
cur = storages.index(e.ddmendpoint)
ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end
e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None
self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)
" .. will consider default ddm=%s as primary and set %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint if e.ddmendpoint not in alt_exclude else None
e.ddmendpoint = ddm # use default destination, check/verify nucleus case
else: # set corresponding ddmendpoint_alt if exist (next entry in cycled storages list)
e.ddmendpoint_alt = resolve_alt_destination(e.ddmendpoint, exclude=alt_exclude)

self.logger.info("[prepare_destinations][%s]: use ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)

return files

Expand Down
3 changes: 2 additions & 1 deletion pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,8 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
#if not is_unified:
# client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)

client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
## FIX ME LATER: split activities: for `astorages` and `copytools` (to unify with ES workflow)
client.prepare_destinations(xdata, activity, alt_exclude=list(filter(None, [job.nucleus])))

altstageout = job.allow_altstageout()
client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs)
Expand Down
5 changes: 3 additions & 2 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
#
# Authors:
# - Alexey Anisenkov, [email protected], 2018-19
# - Alexey Anisenkov, [email protected], 2018-24
# - Paul Nilsson, [email protected], 2018-24
# - Wen Guan, [email protected], 2018

Expand Down Expand Up @@ -177,6 +177,7 @@ class JobData(BaseData):
noexecstrcnv = None # server instruction to the pilot if it should take payload setup from job parameters
swrelease = "" # software release string
writetofile = "" #
nucleus = ""

# cmtconfig encoded info
alrbuserplatform = "" # ALRB_USER_PLATFORM encoded in platform/cmtconfig value
Expand All @@ -195,7 +196,7 @@ class JobData(BaseData):
'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype',
'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata)
'infilesguids', 'memorymonitor', 'allownooutput', 'pandasecrets', 'prodproxy', 'alrbuserplatform',
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout'],
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout', 'nucleus'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts', 'subprocesses',
'logdata', 'outdata', 'indata'],
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess',
Expand Down
Loading