From f8a29a35639cd0420177772737e2a10abf43ba36 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Sun, 24 Nov 2024 07:56:58 -0500 Subject: [PATCH 1/6] Cleaning up code and removing noisy caller tag --- Sources/Segment/Errors.swift | 18 ++++++++---------- Sources/Segment/Utilities/Telemetry.swift | 2 -- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/Sources/Segment/Errors.swift b/Sources/Segment/Errors.swift index 8abf95f..72a99ff 100644 --- a/Sources/Segment/Errors.swift +++ b/Sources/Segment/Errors.swift @@ -73,12 +73,11 @@ extension Analytics { if fatal { exceptionFailure("A critical error occurred: \(translatedError)") } - Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) { - (_ it: inout [String: String]) in - it["error"] = "\(translatedError)" - it["writekey"] = configuration.values.writeKey - it["caller"] = Thread.callStackSymbols[3] - } + Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) { + (_ it: inout [String: String]) in + it["error"] = "\(translatedError)" + it["writekey"] = configuration.values.writeKey + } } static public func reportInternalError(_ error: Error, fatal: Bool = false) { @@ -90,9 +89,8 @@ extension Analytics { exceptionFailure("A critical error occurred: \(translatedError)") } Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) { - (_ it: inout [String: String]) in - it["error"] = "\(translatedError)" - it["caller"] = Thread.callStackSymbols[3] - } + (_ it: inout [String: String]) in + it["error"] = "\(translatedError)" + } } } diff --git a/Sources/Segment/Utilities/Telemetry.swift b/Sources/Segment/Utilities/Telemetry.swift index 53eb0a8..a5b8efc 100644 --- a/Sources/Segment/Utilities/Telemetry.swift +++ b/Sources/Segment/Utilities/Telemetry.swift @@ -86,7 +86,6 @@ public class Telemetry: Subscriber { internal var queue = [RemoteMetric]() private var queueBytes = 0 private var queueSizeExceeded = false - private var seenErrors = [String: Int]() internal var started = false private var rateLimitEndTime: TimeInterval = 0 internal var flushFirstError = true @@ -118,7 +117,6 @@ public class Telemetry: Subscriber { func reset() { telemetryTimer?.suspend() resetQueue() - seenErrors.removeAll() started = false rateLimitEndTime = 0 } From ec2a96ec5d59a5e1b3bfee1ed29f30fc4e121aff Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Sun, 24 Nov 2024 20:17:36 -0500 Subject: [PATCH 2/6] Adding test that reproduced crash before fix --- Tests/Segment-Tests/Telemetry_Tests.swift | 33 +++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/Tests/Segment-Tests/Telemetry_Tests.swift b/Tests/Segment-Tests/Telemetry_Tests.swift index 9097fbd..ed0cc0e 100644 --- a/Tests/Segment-Tests/Telemetry_Tests.swift +++ b/Tests/Segment-Tests/Telemetry_Tests.swift @@ -143,6 +143,39 @@ class TelemetryTests: XCTestCase { Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: longString) { $0["writekey"] = longString } XCTAssertTrue(Telemetry.shared.queue.count < 1000) } + + func testConcurrentErrorReporting() { + Telemetry.shared.enable = true + Telemetry.shared.start() + let operationCount = 200 + + var concurrentExpectation = XCTestExpectation(description: "High pressure operations") + concurrentExpectation.expectedFulfillmentCount = operationCount + + // Use multiple dispatch queues to increase concurrency + let queues = [ + DispatchQueue.global(qos: .userInitiated), + DispatchQueue.global(qos: .default), + DispatchQueue.global(qos: .utility) + ] + for i in 0.. Date: Mon, 25 Nov 2024 08:38:04 -0500 Subject: [PATCH 3/6] Refactoring out unhelpful size checks, fixing warnings --- Sources/Segment/Utilities/Telemetry.swift | 19 ++++--------------- Tests/Segment-Tests/Telemetry_Tests.swift | 6 +++--- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/Sources/Segment/Utilities/Telemetry.swift b/Sources/Segment/Utilities/Telemetry.swift index a5b8efc..36ac690 100644 --- a/Sources/Segment/Utilities/Telemetry.swift +++ b/Sources/Segment/Utilities/Telemetry.swift @@ -85,7 +85,6 @@ public class Telemetry: Subscriber { internal var queue = [RemoteMetric]() private var queueBytes = 0 - private var queueSizeExceeded = false internal var started = false private var rateLimitEndTime: TimeInterval = 0 internal var flushFirstError = true @@ -126,7 +125,7 @@ public class Telemetry: Subscriber { /// - metric: The metric name. /// - buildTags: A closure to build the tags dictionary. func increment(metric: String, buildTags: (inout [String: String]) -> Void) { - guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return } + guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return } if Double.random(in: 0...1) > sampleRate { return } var tags = [String: String]() @@ -142,7 +141,7 @@ public class Telemetry: Subscriber { /// - log: The log data. /// - buildTags: A closure to build the tags dictionary. func error(metric: String, log: String, buildTags: (inout [String: String]) -> Void) { - guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return } + guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return } if Double.random(in: 0...1) > sampleRate { return } var tags = [String: String]() @@ -198,7 +197,6 @@ public class Telemetry: Subscriber { sendQueue.append(metric) } queueBytes = 0 - queueSizeExceeded = false let payload = try JSONEncoder().encode(["series": sendQueue]) var request = upload(apiHost: host) @@ -253,6 +251,8 @@ public class Telemetry: Subscriber { return } + guard queue.count < maxQueueSize else { return } + let newMetric = RemoteMetric( type: METRIC_TYPE, metric: metric, @@ -264,8 +264,6 @@ public class Telemetry: Subscriber { if queueBytes + newMetricSize <= maxQueueBytes { queue.append(newMetric) queueBytes += newMetricSize - } else { - queueSizeExceeded = true } } } @@ -295,19 +293,10 @@ public class Telemetry: Subscriber { return request } - private func queueHasSpace() -> Bool { - var under = false - telemetryQueue.sync { - under = queue.count < maxQueueSize - } - return under - } - private func resetQueue() { telemetryQueue.sync { queue.removeAll() queueBytes = 0 - queueSizeExceeded = false } } } diff --git a/Tests/Segment-Tests/Telemetry_Tests.swift b/Tests/Segment-Tests/Telemetry_Tests.swift index ed0cc0e..44e6fae 100644 --- a/Tests/Segment-Tests/Telemetry_Tests.swift +++ b/Tests/Segment-Tests/Telemetry_Tests.swift @@ -146,10 +146,9 @@ class TelemetryTests: XCTestCase { func testConcurrentErrorReporting() { Telemetry.shared.enable = true - Telemetry.shared.start() let operationCount = 200 - var concurrentExpectation = XCTestExpectation(description: "High pressure operations") + let concurrentExpectation = XCTestExpectation(description: "High pressure operations") concurrentExpectation.expectedFulfillmentCount = operationCount // Use multiple dispatch queues to increase concurrency @@ -187,12 +186,13 @@ class URLSessionMock: RestrictedHTTPSession { var shouldThrow = false override func dataTask(with request: URLRequest, completionHandler: @escaping (Data?, URLResponse?, Error?) -> Void) -> URLSessionDataTask { + let task = URLSession.shared.dataTask(with: request) { _, _, _ in } if shouldThrow { completionHandler(nil, nil, NSError(domain: "Test", code: 1, userInfo: nil)) } else { completionHandler(nil, HTTPURLResponse(url: request.url!, statusCode: 200, httpVersion: nil, headerFields: nil), nil) } - return URLSessionDataTaskMock() + return task } } From 98974b25b2f6b331331cb3901a853cbef486b87c Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 27 Nov 2024 11:27:04 -0500 Subject: [PATCH 4/6] Adding atomic to shared variables --- Sources/Segment/Utilities/Telemetry.swift | 28 +++++++++++------------ 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/Sources/Segment/Utilities/Telemetry.swift b/Sources/Segment/Utilities/Telemetry.swift index 36ac690..6a07f8f 100644 --- a/Sources/Segment/Utilities/Telemetry.swift +++ b/Sources/Segment/Utilities/Telemetry.swift @@ -71,7 +71,7 @@ public class Telemetry: Subscriber { internal var session: any HTTPSession internal var host: String = HTTPClient.getDefaultAPIHost() - var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start + @Atomic var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start private var flushTimer: Int = 30 internal var maxQueueSize: Int = 20 var errorLogSizeMax: Int = 4000 @@ -85,9 +85,9 @@ public class Telemetry: Subscriber { internal var queue = [RemoteMetric]() private var queueBytes = 0 - internal var started = false - private var rateLimitEndTime: TimeInterval = 0 - internal var flushFirstError = true + @Atomic internal var started = false + @Atomic private var rateLimitEndTime: TimeInterval = 0 + @Atomic internal var flushFirstError = true private var telemetryQueue = DispatchQueue(label: "telemetryQueue") private var updateQueue = DispatchQueue(label: "updateQueue") private var telemetryTimer: QueueTimer? @@ -95,7 +95,7 @@ public class Telemetry: Subscriber { /// Starts the Telemetry send loop. Requires both `enable` to be set and a configuration to be retrieved from Segment. func start() { guard enable, !started, sampleRate > 0.0 && sampleRate <= 1.0 else { return } - started = true + _started.set(true) // Queue contents were sampled at the default 100% // the values on flush will be adjusted in the send function @@ -103,9 +103,9 @@ public class Telemetry: Subscriber { resetQueue() } - self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: .main) { [weak self] in + self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: updateQueue) { [weak self] in if (!(self?.enable ?? false)) { - self?.started = false + self?._started.set(false) self?.telemetryTimer?.suspend() } self?.flush() @@ -116,8 +116,8 @@ public class Telemetry: Subscriber { func reset() { telemetryTimer?.suspend() resetQueue() - started = false - rateLimitEndTime = 0 + _started.set(false) + _rateLimitEndTime.set(0) } /// Increments a metric with the provided tags. @@ -161,7 +161,7 @@ public class Telemetry: Subscriber { addRemoteMetric(metric: metric, tags: filteredTags, log: logData) if (flushFirstError) { - flushFirstError = false + _flushFirstError.set(false) flush() } } @@ -175,14 +175,14 @@ public class Telemetry: Subscriber { if rateLimitEndTime > Date().timeIntervalSince1970 { return } - rateLimitEndTime = 0 + _rateLimitEndTime.set(0) do { try send() queueBytes = 0 } catch { errorHandler?(error) - sampleRate = 0.0 + _sampleRate.set(0.0) } } } @@ -210,7 +210,7 @@ public class Telemetry: Subscriber { if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 429 { if let retryAfter = httpResponse.allHeaderFields["Retry-After"] as? String, let retryAfterSeconds = TimeInterval(retryAfter) { - self.rateLimitEndTime = retryAfterSeconds + Date().timeIntervalSince1970 + self._rateLimitEndTime.set(retryAfterSeconds + Date().timeIntervalSince1970) } } } @@ -280,7 +280,7 @@ public class Telemetry: Subscriber { private func systemUpdate(system: System) { if let settings = system.settings, let sampleRate = settings.metrics?["sampleRate"]?.doubleValue { - self.sampleRate = sampleRate + self._sampleRate.set(sampleRate) start() } } From 5f53cbb1e1d485156d403b1544da98b0914e886a Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 27 Nov 2024 11:42:23 -0500 Subject: [PATCH 5/6] Fixing tests for new atomic members --- Sources/Segment/Utilities/Telemetry.swift | 4 +++- Tests/Segment-Tests/Telemetry_Tests.swift | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Sources/Segment/Utilities/Telemetry.swift b/Sources/Segment/Utilities/Telemetry.swift index 6a07f8f..2198bc9 100644 --- a/Sources/Segment/Utilities/Telemetry.swift +++ b/Sources/Segment/Utilities/Telemetry.swift @@ -71,7 +71,8 @@ public class Telemetry: Subscriber { internal var session: any HTTPSession internal var host: String = HTTPClient.getDefaultAPIHost() - @Atomic var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start + @Atomic internal var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start + internal var sampleRateTest: Atomic { _sampleRate } private var flushTimer: Int = 30 internal var maxQueueSize: Int = 20 var errorLogSizeMax: Int = 4000 @@ -88,6 +89,7 @@ public class Telemetry: Subscriber { @Atomic internal var started = false @Atomic private var rateLimitEndTime: TimeInterval = 0 @Atomic internal var flushFirstError = true + internal var flushFirstErrorTest: Atomic { _flushFirstError } private var telemetryQueue = DispatchQueue(label: "telemetryQueue") private var updateQueue = DispatchQueue(label: "updateQueue") private var telemetryTimer: QueueTimer? diff --git a/Tests/Segment-Tests/Telemetry_Tests.swift b/Tests/Segment-Tests/Telemetry_Tests.swift index 44e6fae..e219ceb 100644 --- a/Tests/Segment-Tests/Telemetry_Tests.swift +++ b/Tests/Segment-Tests/Telemetry_Tests.swift @@ -12,7 +12,7 @@ class TelemetryTests: XCTestCase { self.errors.append("\(error)") } errors.removeAll() - Telemetry.shared.sampleRate = 1.0 + Telemetry.shared.sampleRateTest.set(1.0) mockTelemetryHTTPClient() } @@ -29,12 +29,12 @@ class TelemetryTests: XCTestCase { } func testTelemetryStart() { - Telemetry.shared.sampleRate = 0.0 + Telemetry.shared.sampleRateTest.set(0.0) Telemetry.shared.enable = true Telemetry.shared.start() XCTAssertFalse(Telemetry.shared.started) - Telemetry.shared.sampleRate = 1.0 + Telemetry.shared.sampleRateTest.set(1.0) Telemetry.shared.start() XCTAssertTrue(Telemetry.shared.started) XCTAssertTrue(errors.isEmpty) @@ -116,7 +116,7 @@ class TelemetryTests: XCTestCase { func testHTTPException() { mockTelemetryHTTPClient(shouldThrow: true) - Telemetry.shared.flushFirstError = true + Telemetry.shared.flushFirstErrorTest.set(true) Telemetry.shared.enable = true Telemetry.shared.start() Telemetry.shared.error(metric: Telemetry.INVOKE_METRIC, log: "log") { $0["error"] = "test" } From 5b0c67dd33b116f78390397671640c1ec5316d7d Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 2 Dec 2024 16:35:55 -0500 Subject: [PATCH 6/6] Fixing NSObject plugin class names --- Sources/Segment/Timeline.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/Segment/Timeline.swift b/Sources/Segment/Timeline.swift index 3ab7a2b..4f9901d 100644 --- a/Sources/Segment/Timeline.swift +++ b/Sources/Segment/Timeline.swift @@ -71,7 +71,7 @@ internal class Mediator { if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty { it["plugin"] = "\(plugin.type)-\(plugin.key)" } else { - it["plugin"] = "\(plugin.type)-\(String(describing: plugin))" + it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))" } } } @@ -84,7 +84,7 @@ internal class Mediator { if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty { it["plugin"] = "\(plugin.type)-\(plugin.key)" } else { - it["plugin"] = "\(plugin.type)-\(String(describing: plugin))" + it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))" } } return plugin === storedPlugin } @@ -109,7 +109,7 @@ internal class Mediator { if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty { it["plugin"] = "\(plugin.type)-\(plugin.key)" } else { - it["plugin"] = "\(plugin.type)-\(String(describing: plugin))" + it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))" } } } }