Skip to content

Commit

Permalink
Adding atomic to shared variables
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelGHSeg committed Nov 27, 2024
1 parent 20f6aae commit 98974b2
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions Sources/Segment/Utilities/Telemetry.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class Telemetry: Subscriber {

internal var session: any HTTPSession
internal var host: String = HTTPClient.getDefaultAPIHost()
var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start
@Atomic var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start
private var flushTimer: Int = 30
internal var maxQueueSize: Int = 20
var errorLogSizeMax: Int = 4000
Expand All @@ -85,27 +85,27 @@ public class Telemetry: Subscriber {

internal var queue = [RemoteMetric]()
private var queueBytes = 0
internal var started = false
private var rateLimitEndTime: TimeInterval = 0
internal var flushFirstError = true
@Atomic internal var started = false
@Atomic private var rateLimitEndTime: TimeInterval = 0
@Atomic internal var flushFirstError = true
private var telemetryQueue = DispatchQueue(label: "telemetryQueue")
private var updateQueue = DispatchQueue(label: "updateQueue")
private var telemetryTimer: QueueTimer?

/// Starts the Telemetry send loop. Requires both `enable` to be set and a configuration to be retrieved from Segment.
func start() {
guard enable, !started, sampleRate > 0.0 && sampleRate <= 1.0 else { return }
started = true
_started.set(true)

// Queue contents were sampled at the default 100%
// the values on flush will be adjusted in the send function
if Double.random(in: 0...1) > sampleRate {
resetQueue()
}

self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: .main) { [weak self] in
self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: updateQueue) { [weak self] in
if (!(self?.enable ?? false)) {
self?.started = false
self?._started.set(false)
self?.telemetryTimer?.suspend()
}
self?.flush()
Expand All @@ -116,8 +116,8 @@ public class Telemetry: Subscriber {
func reset() {
telemetryTimer?.suspend()
resetQueue()
started = false
rateLimitEndTime = 0
_started.set(false)
_rateLimitEndTime.set(0)
}

/// Increments a metric with the provided tags.
Expand Down Expand Up @@ -161,7 +161,7 @@ public class Telemetry: Subscriber {
addRemoteMetric(metric: metric, tags: filteredTags, log: logData)

if (flushFirstError) {
flushFirstError = false
_flushFirstError.set(false)
flush()
}
}
Expand All @@ -175,14 +175,14 @@ public class Telemetry: Subscriber {
if rateLimitEndTime > Date().timeIntervalSince1970 {
return
}
rateLimitEndTime = 0
_rateLimitEndTime.set(0)

do {
try send()
queueBytes = 0
} catch {
errorHandler?(error)
sampleRate = 0.0
_sampleRate.set(0.0)
}
}
}
Expand Down Expand Up @@ -210,7 +210,7 @@ public class Telemetry: Subscriber {

if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 429 {
if let retryAfter = httpResponse.allHeaderFields["Retry-After"] as? String, let retryAfterSeconds = TimeInterval(retryAfter) {
self.rateLimitEndTime = retryAfterSeconds + Date().timeIntervalSince1970
self._rateLimitEndTime.set(retryAfterSeconds + Date().timeIntervalSince1970)
}
}
}
Expand Down Expand Up @@ -280,7 +280,7 @@ public class Telemetry: Subscriber {

private func systemUpdate(system: System) {
if let settings = system.settings, let sampleRate = settings.metrics?["sampleRate"]?.doubleValue {
self.sampleRate = sampleRate
self._sampleRate.set(sampleRate)
start()
}
}
Expand Down

0 comments on commit 98974b2

Please sign in to comment.