-
Notifications
You must be signed in to change notification settings - Fork 3
/
schedule.go
242 lines (200 loc) · 6.41 KB
/
schedule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package sched
import (
"fmt"
"sync"
"time"
"github.com/sherifabdlnaby/sched/job"
"github.com/uber-go/tally"
)
// Schedule A Schedule is an object that wraps a Job (func(){}) and runs it on a schedule according to the supplied
// Timer; With the the ability to expose metrics, and write logs to indicate job health, state, and stats.
type Schedule struct {
id string
// Source function used to create job.Job
jobSrcFunc func()
// Timer used to trigger Jobs
timer Timer
// SignalChan for termination
stopScheduleSignal chan interface{}
// Concurrent safe JobMap
activeJobs jobMap
// Wait-group
wg sync.WaitGroup
// Logging Interface
logger Logger
// Logging Interface
mx sync.RWMutex
// State
state State
// metrics
metrics metrics
// State
expectedRuntime time.Duration
}
// NewSchedule Create a new schedule for` jobFunc func()` that will run according to `timer Timer` with the supplied []Options
func NewSchedule(id string, timer Timer, jobFunc func(), opts ...Option) *Schedule {
var options = defaultOptions()
// Apply Options
for _, option := range opts {
option.apply(options)
}
// Set Logger
logger := options.logger.With("id", id)
// Set Metrics
// // Init Default Scope if true, ignore io.closer on purpose.
if options.initDefaultScope {
options.metricsScope, _ = tally.NewRootScope(tally.ScopeOptions{
Reporter: newConsoleStatsReporter(logger.Named("metrics")),
}, options.defaultScopePrintEvery)
}
metrics := *newMetrics(id, options.metricsScope)
return &Schedule{
id: id,
state: NEW,
jobSrcFunc: jobFunc,
timer: timer,
activeJobs: *newJobMap(),
logger: logger,
metrics: metrics,
expectedRuntime: options.expectedRunDuration,
}
}
// Start Start the scheduler. Method is concurrent safe. Calling Start() have the following effects according to the
// scheduler state:
// 1. NEW: Start the Schedule; running the defined Job on the first Timer's Next() time.
// 2. STARTED: No Effect (and prints warning)
// 3. STOPPED: Restart the schedule
// 4. FINISHED: No Effect (and prints warning)
func (s *Schedule) Start() {
s.mx.Lock()
defer s.mx.Unlock()
if s.state == FINISHED {
s.logger.Warnw("Attempting to start a finished schedule")
return
}
if s.state == STARTED {
s.logger.Warnw("Attempting to start an already started schedule")
return
}
s.logger.Infow("Job Schedule Started")
s.state = STARTED
s.metrics.up.Update(1)
// Create stopSchedule signal channel, buffer = 1 to allow non-blocking signaling.
s.stopScheduleSignal = make(chan interface{}, 1)
go s.scheduleLoop()
go func() {}()
}
// Stop stops the scheduler. Method is **Blocking** and concurrent safe. When called:
// 1. Schedule will cancel all waiting scheduled jobs.
// 2. Schedule will wait for all running jobs to finish.
// Calling Stop() has the following effects depending on the state of the schedule:
// 1. NEW: No Effect
// 2. STARTED: Stop Schedule
// 3. STOPPED: No Effect
// 4. FINISHED: No Effect
func (s *Schedule) Stop() {
s.mx.Lock()
defer s.mx.Unlock()
if s.state == STOPPED || s.state == FINISHED || s.state == NEW {
return
}
s.state = STOPPING
// Stop control loop
s.logger.Infow("Stopping Schedule...")
s.stopScheduleSignal <- struct{}{}
close(s.stopScheduleSignal)
// Print No. of Active Jobs
if noOfActiveJobs := s.activeJobs.len(); s.activeJobs.len() > 0 {
s.logger.Infow(fmt.Sprintf("Waiting for '%d' active jobs still running...", noOfActiveJobs))
}
s.wg.Wait()
s.state = STOPPED
s.logger.Infow("Job Schedule Stopped")
s.metrics.up.Update(0)
_ = s.logger.Sync()
}
// Finish stops the scheduler and put it FINISHED state so that schedule cannot re-start again. Finish() is called
// automatically if Schedule Timer returned `done bool` == true.
// Method is **Blocking** and concurrent safe.
func (s *Schedule) Finish() {
// Stop First
s.Stop()
s.mx.Lock()
defer s.mx.Unlock()
if s.state == FINISHED {
return
}
s.state = FINISHED
s.logger.Infow("Job Schedule Finished")
}
// scheduleLoop scheduler control loop
func (s *Schedule) scheduleLoop() {
// Main Loop
for {
nextRun, done := s.timer.Next()
if done {
s.logger.Infow("No more Jobs will be scheduled")
break
}
nextRunDuration := time.Until(nextRun)
nextRunDuration = negativeToZero(nextRunDuration)
nextRunChan := time.After(nextRunDuration)
s.logger.Infow("Job Next Run Scheduled", "After", nextRunDuration.Round(1*time.Second).String(), "At", nextRun.Format(time.RFC3339))
select {
case <-s.stopScheduleSignal:
s.logger.Infow("Job Next Run Canceled", "At", nextRun.Format(time.RFC3339))
return
case <-nextRunChan:
// Run job
go s.runJobInstance()
}
}
s.Finish()
}
func (s *Schedule) runJobInstance() {
s.wg.Add(1)
defer s.wg.Done()
// Create a new instance of s.jobSrcFunc
jobInstance := job.NewJob(s.jobSrcFunc)
// Add to active jobs map
s.activeJobs.add(jobInstance)
defer s.activeJobs.delete(jobInstance)
// Logs and Metrics --------------------------------------
// -------------------------------------------------------
s.logger.Infow("Job Run Starting", "Instance", jobInstance.ID())
s.metrics.runs.Inc(1)
if s.activeJobs.len() > 1 {
s.metrics.overlappingCount.Inc(1)
}
if s.expectedRuntime > 0 {
time.AfterFunc(s.expectedRuntime, func() {
if jobInstance.State() == job.RUNNING {
s.logger.Warnw("Job Run Exceeded Expected Time", "Instance", jobInstance.ID(),
"Expected", s.expectedRuntime.Round(1000*time.Millisecond))
s.metrics.runExceedExpected.Inc(1)
}
})
}
// -------------------------------------------------------
// Synchronously Run Job Instance
err := jobInstance.Run()
// -------------------------------------------------------
// Logs and Metrics --------------------------------------
if err != nil {
s.logger.Errorw("Job Error", "Instance", jobInstance.ID(),
"Duration", jobInstance.ActualElapsed().Round(1*time.Millisecond),
"State", jobInstance.State().String(), "error", err.Error())
s.metrics.runErrors.Inc(1)
}
s.logger.Infow("Job Finished", "Instance", jobInstance.ID(),
"Duration", jobInstance.ActualElapsed().Round(1*time.Millisecond),
"State", jobInstance.State().String())
s.metrics.runActualElapsed.Record(jobInstance.ActualElapsed())
s.metrics.runTotalElapsed.Record(jobInstance.TotalElapsed())
}
func negativeToZero(nextRunDuration time.Duration) time.Duration {
if nextRunDuration < 0 {
nextRunDuration = 0
}
return nextRunDuration
}