diff --git a/Sources/Segment/Utilities/Telemetry.swift b/Sources/Segment/Utilities/Telemetry.swift index 36ac690..6a07f8f 100644 --- a/Sources/Segment/Utilities/Telemetry.swift +++ b/Sources/Segment/Utilities/Telemetry.swift @@ -71,7 +71,7 @@ public class Telemetry: Subscriber { internal var session: any HTTPSession internal var host: String = HTTPClient.getDefaultAPIHost() - var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start + @Atomic var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start private var flushTimer: Int = 30 internal var maxQueueSize: Int = 20 var errorLogSizeMax: Int = 4000 @@ -85,9 +85,9 @@ public class Telemetry: Subscriber { internal var queue = [RemoteMetric]() private var queueBytes = 0 - internal var started = false - private var rateLimitEndTime: TimeInterval = 0 - internal var flushFirstError = true + @Atomic internal var started = false + @Atomic private var rateLimitEndTime: TimeInterval = 0 + @Atomic internal var flushFirstError = true private var telemetryQueue = DispatchQueue(label: "telemetryQueue") private var updateQueue = DispatchQueue(label: "updateQueue") private var telemetryTimer: QueueTimer? @@ -95,7 +95,7 @@ public class Telemetry: Subscriber { /// Starts the Telemetry send loop. Requires both `enable` to be set and a configuration to be retrieved from Segment. func start() { guard enable, !started, sampleRate > 0.0 && sampleRate <= 1.0 else { return } - started = true + _started.set(true) // Queue contents were sampled at the default 100% // the values on flush will be adjusted in the send function @@ -103,9 +103,9 @@ public class Telemetry: Subscriber { resetQueue() } - self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: .main) { [weak self] in + self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: updateQueue) { [weak self] in if (!(self?.enable ?? false)) { - self?.started = false + self?._started.set(false) self?.telemetryTimer?.suspend() } self?.flush() @@ -116,8 +116,8 @@ public class Telemetry: Subscriber { func reset() { telemetryTimer?.suspend() resetQueue() - started = false - rateLimitEndTime = 0 + _started.set(false) + _rateLimitEndTime.set(0) } /// Increments a metric with the provided tags. @@ -161,7 +161,7 @@ public class Telemetry: Subscriber { addRemoteMetric(metric: metric, tags: filteredTags, log: logData) if (flushFirstError) { - flushFirstError = false + _flushFirstError.set(false) flush() } } @@ -175,14 +175,14 @@ public class Telemetry: Subscriber { if rateLimitEndTime > Date().timeIntervalSince1970 { return } - rateLimitEndTime = 0 + _rateLimitEndTime.set(0) do { try send() queueBytes = 0 } catch { errorHandler?(error) - sampleRate = 0.0 + _sampleRate.set(0.0) } } } @@ -210,7 +210,7 @@ public class Telemetry: Subscriber { if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 429 { if let retryAfter = httpResponse.allHeaderFields["Retry-After"] as? String, let retryAfterSeconds = TimeInterval(retryAfter) { - self.rateLimitEndTime = retryAfterSeconds + Date().timeIntervalSince1970 + self._rateLimitEndTime.set(retryAfterSeconds + Date().timeIntervalSince1970) } } } @@ -280,7 +280,7 @@ public class Telemetry: Subscriber { private func systemUpdate(system: System) { if let settings = system.settings, let sampleRate = settings.metrics?["sampleRate"]?.doubleValue { - self.sampleRate = sampleRate + self._sampleRate.set(sampleRate) start() } }