Skip to content

Commit

Permalink
modify altstage out logic (exclude nucleus)
Browse files Browse the repository at this point in the history
  • Loading branch information
anisyonk committed Nov 14, 2024
1 parent 87123d3 commit 024cd70
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
45 changes: 30 additions & 15 deletions pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# - Mario Lassnig, [email protected], 2017
# - Paul Nilsson, [email protected], 2017-2024
# - Tobias Wegner, [email protected], 2017-2018
# - Alexey Anisenkov, [email protected], 2018-2019
# - Alexey Anisenkov, [email protected], 2018-2024

"""API for data transfers."""

Expand Down Expand Up @@ -1072,14 +1072,15 @@ class StageOutClient(StagingClient):

mode = "stage-out"

def prepare_destinations(self, files: list, activities: list or str) -> list:
def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = []) -> list:
"""
Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`.
Apply Pilot-side logic to choose proper destination.
:param files: list of FileSpec objects to be processed (list)
:param activities: ordered list of activities to be used to resolve astorages (list or str)
:param alt_exclude: global list of destinations that should be excluded / not used for alternative stage-out
:return: updated fspec entries (list).
"""
if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do
Expand Down Expand Up @@ -1108,11 +1109,26 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
raise PilotException(f"Failed to resolve destination: no associated storages defined for activity={activity} ({act})",
code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED')

# take the fist choice for now, extend the logic later if need
ddm = storages[0]
ddm_alt = storages[1] if len(storages) > 1 else None
def resolve_alt_destination(primary, exclude=None):
""" resolve alt destination as the next to primary entry not equal to `primary` and `exclude` """

cur = storages.index(primary) if primary in storages else 0
inext = (cur + 1) % len(storages) # cycle storages, take the first elem when reach end
exclude = set([primary] + list(exclude if exclude is not None else []))
alt = None
for attempt in range(len(exclude) or 1): # apply several tries to jump exclude entries (in case of dublicated data will stack)
inext = (cur + 1) % len(storages) # cycle storages, start from the beginning when reach end
if storages[inext] not in exclude:
alt = storages[inext]
break
cur += 1
return alt

# default destination
ddm = storages[0] # take the fist choice for now, extend the logic later if need
ddm_alt = resolve_alt_destination(ddm, exclude=alt_exclude)

self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}")
self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}, alt_exclude={alt_exclude}")
self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}")

for e in files:
Expand All @@ -1125,15 +1141,14 @@ def prepare_destinations(self, files: list, activities: list or str) -> list:
# pass
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => use it as (non local) alternative destination
self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations"
" .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint # verify me
e.ddmendpoint = ddm # check/verify nucleus case
else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list)
cur = storages.index(e.ddmendpoint)
ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end
e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None
self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)
" .. will consider default ddm=%s as primary and set %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint)
e.ddmendpoint_alt = e.ddmendpoint if e.ddmendpoint not in alt_exclude else None
e.ddmendpoint = ddm # use default destination, check/verify nucleus case
else: # set corresponding ddmendpoint_alt if exist (next entry in cycled storages list)
e.ddmendpoint_alt = resolve_alt_destination(e.ddmendpoint, exclude=alt_exclude)

self.logger.info("[prepare_destinations][%s]: use ddmendpoint_alt=%s for fspec.ddmendpoint=%s",
activity, e.ddmendpoint_alt, e.ddmendpoint)

return files

Expand Down
3 changes: 2 additions & 1 deletion pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,8 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
#if not is_unified:
# client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)

client.prepare_destinations(xdata, activity) ## FIX ME LATER: split activities: for astorages and for copytools (to unify with ES workflow)
## FIX ME LATER: split activities: for `astorages` and `copytools` (to unify with ES workflow)
client.prepare_destinations(xdata, activity, alt_exclude=list(filter(None, [job.nucleus])))

altstageout = job.allow_altstageout()
client.transfer(xdata, activity, raise_exception=not altstageout, **kwargs)
Expand Down
5 changes: 3 additions & 2 deletions pilot/info/jobdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
#
# Authors:
# - Alexey Anisenkov, [email protected], 2018-19
# - Alexey Anisenkov, [email protected], 2018-24
# - Paul Nilsson, [email protected], 2018-24
# - Wen Guan, [email protected], 2018

Expand Down Expand Up @@ -177,6 +177,7 @@ class JobData(BaseData):
noexecstrcnv = None # server instruction to the pilot if it should take payload setup from job parameters
swrelease = "" # software release string
writetofile = "" #
nucleus = ""

# cmtconfig encoded info
alrbuserplatform = "" # ALRB_USER_PLATFORM encoded in platform/cmtconfig value
Expand All @@ -195,7 +196,7 @@ class JobData(BaseData):
'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype',
'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata)
'infilesguids', 'memorymonitor', 'allownooutput', 'pandasecrets', 'prodproxy', 'alrbuserplatform',
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout'],
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout', 'nucleus'],
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts', 'subprocesses',
'logdata', 'outdata', 'indata'],
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess',
Expand Down

0 comments on commit 024cd70

Please sign in to comment.