From 14d8420f3a1452324a7fdc3e7a1872467c0b9ab2 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 2 Dec 2024 17:11:30 -0500 Subject: [PATCH] Telemetry cleanup and adding test for fixed crash (#376) * Cleaning up code and removing noisy caller tag * Adding test that reproduced crash before fix * Refactoring out unhelpful size checks * Adding atomic to shared variables * Fixing NSObject plugin class names --- Sources/Segment/Errors.swift | 18 ++++---- Sources/Segment/Timeline.swift | 6 +-- Sources/Segment/Utilities/Telemetry.swift | 51 +++++++++-------------- Tests/Segment-Tests/Telemetry_Tests.swift | 43 ++++++++++++++++--- 4 files changed, 69 insertions(+), 49 deletions(-) diff --git a/Sources/Segment/Errors.swift b/Sources/Segment/Errors.swift index 8abf95f..72a99ff 100644 --- a/Sources/Segment/Errors.swift +++ b/Sources/Segment/Errors.swift @@ -73,12 +73,11 @@ extension Analytics { if fatal { exceptionFailure("A critical error occurred: \(translatedError)") } - Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) { - (_ it: inout [String: String]) in - it["error"] = "\(translatedError)" - it["writekey"] = configuration.values.writeKey - it["caller"] = Thread.callStackSymbols[3] - } + Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) { + (_ it: inout [String: String]) in + it["error"] = "\(translatedError)" + it["writekey"] = configuration.values.writeKey + } } static public func reportInternalError(_ error: Error, fatal: Bool = false) { @@ -90,9 +89,8 @@ extension Analytics { exceptionFailure("A critical error occurred: \(translatedError)") } Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) { - (_ it: inout [String: String]) in - it["error"] = "\(translatedError)" - it["caller"] = Thread.callStackSymbols[3] - } + (_ it: inout [String: String]) in + it["error"] = "\(translatedError)" + } } } diff --git a/Sources/Segment/Timeline.swift b/Sources/Segment/Timeline.swift index 3ab7a2b..4f9901d 100644 --- a/Sources/Segment/Timeline.swift +++ b/Sources/Segment/Timeline.swift @@ -71,7 +71,7 @@ internal class Mediator { if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty { it["plugin"] = "\(plugin.type)-\(plugin.key)" } else { - it["plugin"] = "\(plugin.type)-\(String(describing: plugin))" + it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))" } } } @@ -84,7 +84,7 @@ internal class Mediator { if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty { it["plugin"] = "\(plugin.type)-\(plugin.key)" } else { - it["plugin"] = "\(plugin.type)-\(String(describing: plugin))" + it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))" } } return plugin === storedPlugin } @@ -109,7 +109,7 @@ internal class Mediator { if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty { it["plugin"] = "\(plugin.type)-\(plugin.key)" } else { - it["plugin"] = "\(plugin.type)-\(String(describing: plugin))" + it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))" } } } } diff --git a/Sources/Segment/Utilities/Telemetry.swift b/Sources/Segment/Utilities/Telemetry.swift index 53eb0a8..2198bc9 100644 --- a/Sources/Segment/Utilities/Telemetry.swift +++ b/Sources/Segment/Utilities/Telemetry.swift @@ -71,7 +71,8 @@ public class Telemetry: Subscriber { internal var session: any HTTPSession internal var host: String = HTTPClient.getDefaultAPIHost() - var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start + @Atomic internal var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start + internal var sampleRateTest: Atomic { _sampleRate } private var flushTimer: Int = 30 internal var maxQueueSize: Int = 20 var errorLogSizeMax: Int = 4000 @@ -85,11 +86,10 @@ public class Telemetry: Subscriber { internal var queue = [RemoteMetric]() private var queueBytes = 0 - private var queueSizeExceeded = false - private var seenErrors = [String: Int]() - internal var started = false - private var rateLimitEndTime: TimeInterval = 0 - internal var flushFirstError = true + @Atomic internal var started = false + @Atomic private var rateLimitEndTime: TimeInterval = 0 + @Atomic internal var flushFirstError = true + internal var flushFirstErrorTest: Atomic { _flushFirstError } private var telemetryQueue = DispatchQueue(label: "telemetryQueue") private var updateQueue = DispatchQueue(label: "updateQueue") private var telemetryTimer: QueueTimer? @@ -97,7 +97,7 @@ public class Telemetry: Subscriber { /// Starts the Telemetry send loop. Requires both `enable` to be set and a configuration to be retrieved from Segment. func start() { guard enable, !started, sampleRate > 0.0 && sampleRate <= 1.0 else { return } - started = true + _started.set(true) // Queue contents were sampled at the default 100% // the values on flush will be adjusted in the send function @@ -105,9 +105,9 @@ public class Telemetry: Subscriber { resetQueue() } - self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: .main) { [weak self] in + self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: updateQueue) { [weak self] in if (!(self?.enable ?? false)) { - self?.started = false + self?._started.set(false) self?.telemetryTimer?.suspend() } self?.flush() @@ -118,9 +118,8 @@ public class Telemetry: Subscriber { func reset() { telemetryTimer?.suspend() resetQueue() - seenErrors.removeAll() - started = false - rateLimitEndTime = 0 + _started.set(false) + _rateLimitEndTime.set(0) } /// Increments a metric with the provided tags. @@ -128,7 +127,7 @@ public class Telemetry: Subscriber { /// - metric: The metric name. /// - buildTags: A closure to build the tags dictionary. func increment(metric: String, buildTags: (inout [String: String]) -> Void) { - guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return } + guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return } if Double.random(in: 0...1) > sampleRate { return } var tags = [String: String]() @@ -144,7 +143,7 @@ public class Telemetry: Subscriber { /// - log: The log data. /// - buildTags: A closure to build the tags dictionary. func error(metric: String, log: String, buildTags: (inout [String: String]) -> Void) { - guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return } + guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return } if Double.random(in: 0...1) > sampleRate { return } var tags = [String: String]() @@ -164,7 +163,7 @@ public class Telemetry: Subscriber { addRemoteMetric(metric: metric, tags: filteredTags, log: logData) if (flushFirstError) { - flushFirstError = false + _flushFirstError.set(false) flush() } } @@ -178,14 +177,14 @@ public class Telemetry: Subscriber { if rateLimitEndTime > Date().timeIntervalSince1970 { return } - rateLimitEndTime = 0 + _rateLimitEndTime.set(0) do { try send() queueBytes = 0 } catch { errorHandler?(error) - sampleRate = 0.0 + _sampleRate.set(0.0) } } } @@ -200,7 +199,6 @@ public class Telemetry: Subscriber { sendQueue.append(metric) } queueBytes = 0 - queueSizeExceeded = false let payload = try JSONEncoder().encode(["series": sendQueue]) var request = upload(apiHost: host) @@ -214,7 +212,7 @@ public class Telemetry: Subscriber { if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 429 { if let retryAfter = httpResponse.allHeaderFields["Retry-After"] as? String, let retryAfterSeconds = TimeInterval(retryAfter) { - self.rateLimitEndTime = retryAfterSeconds + Date().timeIntervalSince1970 + self._rateLimitEndTime.set(retryAfterSeconds + Date().timeIntervalSince1970) } } } @@ -255,6 +253,8 @@ public class Telemetry: Subscriber { return } + guard queue.count < maxQueueSize else { return } + let newMetric = RemoteMetric( type: METRIC_TYPE, metric: metric, @@ -266,8 +266,6 @@ public class Telemetry: Subscriber { if queueBytes + newMetricSize <= maxQueueBytes { queue.append(newMetric) queueBytes += newMetricSize - } else { - queueSizeExceeded = true } } } @@ -284,7 +282,7 @@ public class Telemetry: Subscriber { private func systemUpdate(system: System) { if let settings = system.settings, let sampleRate = settings.metrics?["sampleRate"]?.doubleValue { - self.sampleRate = sampleRate + self._sampleRate.set(sampleRate) start() } } @@ -297,19 +295,10 @@ public class Telemetry: Subscriber { return request } - private func queueHasSpace() -> Bool { - var under = false - telemetryQueue.sync { - under = queue.count < maxQueueSize - } - return under - } - private func resetQueue() { telemetryQueue.sync { queue.removeAll() queueBytes = 0 - queueSizeExceeded = false } } } diff --git a/Tests/Segment-Tests/Telemetry_Tests.swift b/Tests/Segment-Tests/Telemetry_Tests.swift index 9097fbd..e219ceb 100644 --- a/Tests/Segment-Tests/Telemetry_Tests.swift +++ b/Tests/Segment-Tests/Telemetry_Tests.swift @@ -12,7 +12,7 @@ class TelemetryTests: XCTestCase { self.errors.append("\(error)") } errors.removeAll() - Telemetry.shared.sampleRate = 1.0 + Telemetry.shared.sampleRateTest.set(1.0) mockTelemetryHTTPClient() } @@ -29,12 +29,12 @@ class TelemetryTests: XCTestCase { } func testTelemetryStart() { - Telemetry.shared.sampleRate = 0.0 + Telemetry.shared.sampleRateTest.set(0.0) Telemetry.shared.enable = true Telemetry.shared.start() XCTAssertFalse(Telemetry.shared.started) - Telemetry.shared.sampleRate = 1.0 + Telemetry.shared.sampleRateTest.set(1.0) Telemetry.shared.start() XCTAssertTrue(Telemetry.shared.started) XCTAssertTrue(errors.isEmpty) @@ -116,7 +116,7 @@ class TelemetryTests: XCTestCase { func testHTTPException() { mockTelemetryHTTPClient(shouldThrow: true) - Telemetry.shared.flushFirstError = true + Telemetry.shared.flushFirstErrorTest.set(true) Telemetry.shared.enable = true Telemetry.shared.start() Telemetry.shared.error(metric: Telemetry.INVOKE_METRIC, log: "log") { $0["error"] = "test" } @@ -143,6 +143,38 @@ class TelemetryTests: XCTestCase { Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: longString) { $0["writekey"] = longString } XCTAssertTrue(Telemetry.shared.queue.count < 1000) } + + func testConcurrentErrorReporting() { + Telemetry.shared.enable = true + let operationCount = 200 + + let concurrentExpectation = XCTestExpectation(description: "High pressure operations") + concurrentExpectation.expectedFulfillmentCount = operationCount + + // Use multiple dispatch queues to increase concurrency + let queues = [ + DispatchQueue.global(qos: .userInitiated), + DispatchQueue.global(qos: .default), + DispatchQueue.global(qos: .utility) + ] + for i in 0.. Void) -> URLSessionDataTask { + let task = URLSession.shared.dataTask(with: request) { _, _, _ in } if shouldThrow { completionHandler(nil, nil, NSError(domain: "Test", code: 1, userInfo: nil)) } else { completionHandler(nil, HTTPURLResponse(url: request.url!, statusCode: 200, httpVersion: nil, headerFields: nil), nil) } - return URLSessionDataTaskMock() + return task } }