-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
alt stage-out for unified queues #152
Merged
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
528a6b0
Corrected a few type hints, removed unused code
3e04dc6
execute() now using thread synchronization to protect against polling…
17f1b0a
Using start_new_session instead of preexec_fn in subprocess.Popen(). …
f58d93c
Various tests for oom score, now out-commented
8556790
Added new option for turning off token renewal
831cfa4
Added new option for turning off token renewal
162dc16
Now locating correct payload pid for oom score
PalNilsson f488558
Testing oom write
b93f3cc
Removed useless arcproxy -i subject
PalNilsson b1c4549
Cleanup
PalNilsson 0b5ff5f
Added PROXYTOOSHORT
PalNilsson dae81f3
Added PROXYTOOSHORT as pilot exit code 80
PalNilsson 15037f9
Implementing and testing proxy too short
PalNilsson 5ab7204
Removed test
PalNilsson d89faa3
Updated build number
PalNilsson 5ece001
Removed proxy name check
3d499d8
Pylint updates
5a02856
Pylint updates
f495201
Pylint updates
fc76bce
Pylint updates
cdfc37f
Pylint updates
ca8683d
Pylint updates
e20a831
Pylint updates
f60beed
Pylint updates
cbdced9
IPv4 support in request2()
PalNilsson b9825c4
Added support for http_proxy
87123d3
enable altstageout for unified queues; nucleus use-case should be che…
anisyonk 024cd70
modify altstage out logic (exclude nucleus)
anisyonk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
# - Mario Lassnig, [email protected], 2017 | ||
# - Paul Nilsson, [email protected], 2017-2024 | ||
# - Tobias Wegner, [email protected], 2017-2018 | ||
# - Alexey Anisenkov, [email protected], 2018-2019 | ||
# - Alexey Anisenkov, [email protected], 2018-2024 | ||
|
||
"""API for data transfers.""" | ||
|
||
|
@@ -1072,14 +1072,15 @@ class StageOutClient(StagingClient): | |
|
||
mode = "stage-out" | ||
|
||
def prepare_destinations(self, files: list, activities: list or str) -> list: | ||
def prepare_destinations(self, files: list, activities: list or str, alt_exclude: list = []) -> list: | ||
""" | ||
Resolve destination RSE (filespec.ddmendpoint) for each entry from `files` according to requested `activities`. | ||
|
||
Apply Pilot-side logic to choose proper destination. | ||
|
||
:param files: list of FileSpec objects to be processed (list) | ||
:param activities: ordered list of activities to be used to resolve astorages (list or str) | ||
:param alt_exclude: global list of destinations that should be excluded / not used for alternative stage-out | ||
:return: updated fspec entries (list). | ||
""" | ||
if not self.infosys.queuedata: # infosys is not initialized: not able to fix destination if need, nothing to do | ||
|
@@ -1108,11 +1109,26 @@ def prepare_destinations(self, files: list, activities: list or str) -> list: | |
raise PilotException(f"Failed to resolve destination: no associated storages defined for activity={activity} ({act})", | ||
code=ErrorCodes.NOSTORAGE, state='NO_ASTORAGES_DEFINED') | ||
|
||
# take the fist choice for now, extend the logic later if need | ||
ddm = storages[0] | ||
ddm_alt = storages[1] if len(storages) > 1 else None | ||
def resolve_alt_destination(primary, exclude=None): | ||
""" resolve alt destination as the next to primary entry not equal to `primary` and `exclude` """ | ||
|
||
cur = storages.index(primary) if primary in storages else 0 | ||
inext = (cur + 1) % len(storages) # cycle storages, take the first elem when reach end | ||
exclude = set([primary] + list(exclude if exclude is not None else [])) | ||
alt = None | ||
for attempt in range(len(exclude) or 1): # apply several tries to jump exclude entries (in case of dublicated data will stack) | ||
inext = (cur + 1) % len(storages) # cycle storages, start from the beginning when reach end | ||
if storages[inext] not in exclude: | ||
alt = storages[inext] | ||
break | ||
cur += 1 | ||
return alt | ||
|
||
# default destination | ||
ddm = storages[0] # take the fist choice for now, extend the logic later if need | ||
ddm_alt = resolve_alt_destination(ddm, exclude=alt_exclude) | ||
|
||
self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}") | ||
self.logger.info(f"[prepare_destinations][{activity}]: allowed (local) destinations: {storages}, alt_exclude={alt_exclude}") | ||
self.logger.info(f"[prepare_destinations][{activity}]: resolved default destination: ddm={ddm}, ddm_alt={ddm_alt}") | ||
|
||
for e in files: | ||
|
@@ -1125,15 +1141,14 @@ def prepare_destinations(self, files: list, activities: list or str) -> list: | |
# pass | ||
elif e.ddmendpoint not in storages: # fspec.ddmendpoint is not in associated storages => use it as (non local) alternative destination | ||
self.logger.info("[prepare_destinations][%s]: Requested fspec.ddmendpoint=%s is not in the list of allowed (local) destinations" | ||
" .. will consider default ddm=%s for transfer and tag %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint) | ||
e.ddmendpoint_alt = e.ddmendpoint # verify me | ||
e.ddmendpoint = ddm # check/verify nucleus case | ||
else: # set corresponding ddmendpoint_alt if exist (next entry in available storages list) | ||
cur = storages.index(e.ddmendpoint) | ||
ddm_next = storages[cur + 1] if (cur + 1) < len(storages) else storages[0] # cycle storages, take the first elem when reach end | ||
e.ddmendpoint_alt = ddm_next if e.ddmendpoint != ddm_next else None | ||
self.logger.info("[prepare_destinations][%s]: set ddmendpoint_alt=%s for fspec.ddmendpoint=%s", | ||
activity, e.ddmendpoint_alt, e.ddmendpoint) | ||
" .. will consider default ddm=%s as primary and set %s as alt. location", activity, e.ddmendpoint, ddm, e.ddmendpoint) | ||
e.ddmendpoint_alt = e.ddmendpoint if e.ddmendpoint not in alt_exclude else None | ||
e.ddmendpoint = ddm # use default destination, check/verify nucleus case | ||
else: # set corresponding ddmendpoint_alt if exist (next entry in cycled storages list) | ||
e.ddmendpoint_alt = resolve_alt_destination(e.ddmendpoint, exclude=alt_exclude) | ||
|
||
self.logger.info("[prepare_destinations][%s]: use ddmendpoint_alt=%s for fspec.ddmendpoint=%s", | ||
activity, e.ddmendpoint_alt, e.ddmendpoint) | ||
|
||
return files | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ | |
# under the License. | ||
# | ||
# Authors: | ||
# - Alexey Anisenkov, [email protected], 2018-19 | ||
# - Alexey Anisenkov, [email protected], 2018-24 | ||
# - Paul Nilsson, [email protected], 2018-24 | ||
# - Wen Guan, [email protected], 2018 | ||
|
||
|
@@ -177,6 +177,7 @@ class JobData(BaseData): | |
noexecstrcnv = None # server instruction to the pilot if it should take payload setup from job parameters | ||
swrelease = "" # software release string | ||
writetofile = "" # | ||
nucleus = "" | ||
|
||
# cmtconfig encoded info | ||
alrbuserplatform = "" # ALRB_USER_PLATFORM encoded in platform/cmtconfig value | ||
|
@@ -195,7 +196,7 @@ class JobData(BaseData): | |
'swrelease', 'zipmap', 'imagename', 'imagename_jobdef', 'accessmode', 'transfertype', | ||
'datasetin', ## TO BE DEPRECATED: moved to FileSpec (job.indata) | ||
'infilesguids', 'memorymonitor', 'allownooutput', 'pandasecrets', 'prodproxy', 'alrbuserplatform', | ||
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout'], | ||
'debug_command', 'dask_scheduler_ip', 'jupyter_session_ip', 'altstageout', 'nucleus'], | ||
list: ['piloterrorcodes', 'piloterrordiags', 'workdirsizes', 'zombies', 'corecounts', 'subprocesses', | ||
'logdata', 'outdata', 'indata'], | ||
dict: ['status', 'fileinfo', 'metadata', 'utilities', 'overwrite_queuedata', 'sizes', 'preprocess', | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pylint will complain with "W0102: Dangerous default value [] as argument (dangerous-default-value)" here. I will change it to alt_exclude: list = None, and then add "if alt_exclude is None: alt_exclude = []"