-
Notifications
You must be signed in to change notification settings - Fork 1
/
oJobDirector.yaml
129 lines (109 loc) · 5.33 KB
/
oJobDirector.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Direct distributed jobs through a single channel
# Copyright 2023 Nuno Aguiar
jobs:
#-------------------------
- name: oJob Director init
help: >
Initializes an oJob Director expecting:
ch (String) An OpenAF channel name (defaults to oJob::check)
stamp (Map) A key and value stamp map (defaults to { type: "director" })
exec: |
global.oJobCheck = _$(global.oJobCheck, "global oJobCheck").isMap().default({});
global.oJobCheck.ch = _$(args.ch).isString().default("oJob::check");
global.oJobCheck.stamp = _$(args.stamp).isMap().default({ type: "director" });
#----------------------------
- name: oJob Director checkin
help: >
Checks in a specific job with a jobUUID execution (to be used with "from"). Will not execute
further job code if the job name and uuid have already been executed.
jobName (String) The job generic name
jobUUID (String) The job execution uuid
from: oJob Director check
exec: |
_$(args.jobName).$_("Please define a job name.");
_$(args.jobUUID, "jobUUID").isString().$_();
args.job = $ch(global.oJobCheck.ch).get(merge(global.oJobCheck.stamp, { name: args.jobName, uuid: args.jobUUID }));
if (isDef(args.job)) {
args.job.start.push(nowUTC());
} else {
args.job = merge(global.oJobCheck.stamp, {
name : args.jobName,
uuid : args.jobUUID,
start: [ nowUTC() ]
})
}
$ch(global.oJobCheck.ch).set(merge(global.oJobCheck.stamp, { name: args.jobName, uuid: args.jobUUID }), args.job);
#-----------------------------
- name: oJob Director checkout
help: >
Checks out a specific job with a jobUUID execution (to be used with "to"):
jobName (String) The job generic name
jobUUID (String) The job execution uuid
from: oJob Director check
exec: |
_$(args.jobName).$_("Please define a job name.");
_$(args.jobUUID, "jobUUID").isString().$_();
args.job = $ch(global.oJobCheck.ch).get(merge(global.oJobCheck.stamp, { name: args.jobName, uuid: args.jobUUID }));
if (isUnDef(args.job)) {
logErr("job '" + args.jobName + "' wasn't previously registered.");
} else {
if (isUnDef(args.job.end)) {
args.job.end = [ nowUTC() ];
} else {
args.job.end.push(nowUTC());
}
$ch(global.oJobCheck.ch).set(merge(global.oJobCheck.stamp, { name: args.jobName, uuid: args.jobUUID }), args.job);
}
#--------------------------
- name: oJob Director check
help: >
Checks if a specific job with a jobUUID execution has executed or not. If yes, it will warn and return from
the current job (to be used with "from"):
jobName (String) The job generic name
jobUUID (String) The job execution uuid
from: oJob Director init
exec: |
_$(args.jobName).$_("Please define a job name.");
_$(args.jobUUID, "jobUUID").isString().$_();
args.job = $ch(global.oJobCheck.ch).get(merge(global.oJobCheck.stamp, { name: args.jobName, uuid: args.jobUUID }));
var itsokay = false;
if (isDef(args.job) && isDef(args.job) && isDef(args.job.start) && isDef(args.job.end)) {
if (args.job.start.length > 0 &&
args.job.start.length >= args.job.end.length &&
$path(args.job.end, "[] | max(@)") >= $path(args.job.start, "[] | max(@)"))
itsokay = true;
} else {
itsokay = false;
}
if (itsokay) {
logWarn("Job named '" + args.jobName + "' for uuid '" + args.jobUUID + "' has already executed... skipping.");
return;
}
#-------------------------
- name: oJob Director wait
help: >
Waits for a specific job with a jobUUID execution (to be used with "from"). If it timesout waiting
it will throw an exception.
jobName (String) The job generic name
jobUUID (String) The job execution uuid
jobWaitRetry (Number) Number of retries to check the job execution status (defaults to 40)
jobWaitTime (Number) Time between retries to check the job execution status (defaults to 1500ms)
from: oJob Director init
exec: |
_$(args.jobName).$_("Please define a job name.");
_$(args.jobUUID, "jobUUID").isString().$_();
args.jobWaitRetry = _$(args.jobWaitRetry).isNumber().default(40);
args.jobWaitTime = _$(args.jobWaitTime).isNumber().default(1500);
var execOk = false;
var c = args.jobWaitRetry;
do {
args.job = $ch(global.oJobCheck.ch).get(merge(global.oJobCheck.stamp, { name: args.jobName, uuid: args.jobUUID }));
if (isDef(args.job) && isDef(args.job.start) && isDef(args.job.end) &&
args.job.start.length > 0 &&
args.job.start.length >= args.job.end.length &&
$path(args.job.end, "[] | max(@)") >= $path(args.job.start, "[] | max(@)"))
execOk = true;
c--;
if (!execOk) sleep(args.jobWaitTime, true);
} while(c > 0 && !execOk);
if (!execOk) throw "Timeout waiting for job '" + args.jobName + "' for uuid '" + args.jobUUID + "'.";