Skip to content

Commit

Permalink
report back file.ddmendpoint value for alternative staged files
Browse files Browse the repository at this point in the history
  • Loading branch information
anisyonk committed Sep 10, 2024
1 parent f13770a commit 97d7202
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
22 changes: 15 additions & 7 deletions pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,14 +924,16 @@ def _do_stageout(job: JobData, args: object, xdata: list, activity: list, title:
# check if alt stageout can be applied (all remain files must have alt storage declared ddmendpoint_alt)
has_altstorage = all(entry.ddmendpoint_alt and entry.ddmendpoint != entry.ddmendpoint_alt for entry in remain_files)

logger.info('alt stage-out settings[%s]: is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s',
logger.info('alt stage-out settings: %s, is_unified=%s, altstageout=%s, remain_files=%s, has_altstorage=%s',
activity, is_unified, altstageout, len(remain_files), has_altstorage)

if altstageout and remain_files and has_altstorage: # apply alternative stageout for failed transfers
for entry in remain_files:
entry.ddmendpoint = entry.ddmendpoint_alt
entry.ddmendpoint_alt = None
entry.is_altstaged = True

logger.info('alt stage-out will be applied for remain=%s files (previously failed)', len(remain_files))
client.transfer(xdata, activity, **kwargs)

except PilotException as error:
Expand Down Expand Up @@ -1072,12 +1074,18 @@ def generate_fileinfo(job: JobData) -> dict:
"""
fileinfo = {}
checksum_type = config.File.checksum_type if config.File.checksum_type == 'adler32' else 'md5sum'
for iofile in job.outdata + job.logdata:
if iofile.status in {'transferred'}:
fileinfo[iofile.lfn] = {'guid': iofile.guid,
'fsize': iofile.filesize,
f'{checksum_type}': iofile.checksum.get(config.File.checksum_type),
'surl': iofile.turl}
for entry in job.outdata + job.logdata:
if entry.status in {'transferred'}:
dat = {
'guid': entry.guid,
'fsize': entry.filesize,
f'{checksum_type}': entry.checksum.get(config.File.checksum_type),
'surl': entry.turl
}
if entry.is_altstaged:
dat['ddmendpoint'] = entry.ddmendpoint

fileinfo[entry.lfn] = dat

return fileinfo

Expand Down
1 change: 1 addition & 0 deletions pilot/info/filespec.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class FileSpec(BaseData):
is_tar = False # whether it's a tar file or not
ddm_activity = None # DDM activity names (e.g. [read_lan, read_wan]) which should be used to resolve appropriate protocols from StorageData.arprotocols
checkinputsize = True
is_altstaged = None # indicates if file was transferred using alternative method (altstageout)

# specify the type of attributes for proper data validation and casting
_keys = {int: ['filesize', 'mtime', 'status_code'],
Expand Down

0 comments on commit 97d7202

Please sign in to comment.