Skip to content

Commit

Permalink
Telemetry cleanup and adding test for fixed crash (#376)
Browse files Browse the repository at this point in the history
* Cleaning up code and removing noisy caller tag

* Adding test that reproduced crash before fix

* Refactoring out unhelpful size checks

* Adding atomic to shared variables

* Fixing NSObject plugin class names
  • Loading branch information
MichaelGHSeg authored Dec 2, 2024
1 parent cc47b9a commit 14d8420
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 49 deletions.
18 changes: 8 additions & 10 deletions Sources/Segment/Errors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@ extension Analytics {
if fatal {
exceptionFailure("A critical error occurred: \(translatedError)")
}
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
(_ it: inout [String: String]) in
it["error"] = "\(translatedError)"
it["writekey"] = configuration.values.writeKey
it["caller"] = Thread.callStackSymbols[3]
}
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
(_ it: inout [String: String]) in
it["error"] = "\(translatedError)"
it["writekey"] = configuration.values.writeKey
}
}

static public func reportInternalError(_ error: Error, fatal: Bool = false) {
Expand All @@ -90,9 +89,8 @@ extension Analytics {
exceptionFailure("A critical error occurred: \(translatedError)")
}
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
(_ it: inout [String: String]) in
it["error"] = "\(translatedError)"
it["caller"] = Thread.callStackSymbols[3]
}
(_ it: inout [String: String]) in
it["error"] = "\(translatedError)"
}
}
}
6 changes: 3 additions & 3 deletions Sources/Segment/Timeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ internal class Mediator {
if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty {
it["plugin"] = "\(plugin.type)-\(plugin.key)"
} else {
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))"
}
}
}
Expand All @@ -84,7 +84,7 @@ internal class Mediator {
if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty {
it["plugin"] = "\(plugin.type)-\(plugin.key)"
} else {
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))"
} }
return plugin === storedPlugin
}
Expand All @@ -109,7 +109,7 @@ internal class Mediator {
if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty {
it["plugin"] = "\(plugin.type)-\(plugin.key)"
} else {
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))"
} }
}
}
Expand Down
51 changes: 20 additions & 31 deletions Sources/Segment/Utilities/Telemetry.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public class Telemetry: Subscriber {

internal var session: any HTTPSession
internal var host: String = HTTPClient.getDefaultAPIHost()
var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start
@Atomic internal var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start
internal var sampleRateTest: Atomic<Double> { _sampleRate }
private var flushTimer: Int = 30
internal var maxQueueSize: Int = 20
var errorLogSizeMax: Int = 4000
Expand All @@ -85,29 +86,28 @@ public class Telemetry: Subscriber {

internal var queue = [RemoteMetric]()
private var queueBytes = 0
private var queueSizeExceeded = false
private var seenErrors = [String: Int]()
internal var started = false
private var rateLimitEndTime: TimeInterval = 0
internal var flushFirstError = true
@Atomic internal var started = false
@Atomic private var rateLimitEndTime: TimeInterval = 0
@Atomic internal var flushFirstError = true
internal var flushFirstErrorTest: Atomic<Bool> { _flushFirstError }
private var telemetryQueue = DispatchQueue(label: "telemetryQueue")
private var updateQueue = DispatchQueue(label: "updateQueue")
private var telemetryTimer: QueueTimer?

/// Starts the Telemetry send loop. Requires both `enable` to be set and a configuration to be retrieved from Segment.
func start() {
guard enable, !started, sampleRate > 0.0 && sampleRate <= 1.0 else { return }
started = true
_started.set(true)

// Queue contents were sampled at the default 100%
// the values on flush will be adjusted in the send function
if Double.random(in: 0...1) > sampleRate {
resetQueue()
}

self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: .main) { [weak self] in
self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: updateQueue) { [weak self] in
if (!(self?.enable ?? false)) {
self?.started = false
self?._started.set(false)
self?.telemetryTimer?.suspend()
}
self?.flush()
Expand All @@ -118,17 +118,16 @@ public class Telemetry: Subscriber {
func reset() {
telemetryTimer?.suspend()
resetQueue()
seenErrors.removeAll()
started = false
rateLimitEndTime = 0
_started.set(false)
_rateLimitEndTime.set(0)
}

/// Increments a metric with the provided tags.
/// - Parameters:
/// - metric: The metric name.
/// - buildTags: A closure to build the tags dictionary.
func increment(metric: String, buildTags: (inout [String: String]) -> Void) {
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return }
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return }
if Double.random(in: 0...1) > sampleRate { return }

var tags = [String: String]()
Expand All @@ -144,7 +143,7 @@ public class Telemetry: Subscriber {
/// - log: The log data.
/// - buildTags: A closure to build the tags dictionary.
func error(metric: String, log: String, buildTags: (inout [String: String]) -> Void) {
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return }
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return }
if Double.random(in: 0...1) > sampleRate { return }

var tags = [String: String]()
Expand All @@ -164,7 +163,7 @@ public class Telemetry: Subscriber {
addRemoteMetric(metric: metric, tags: filteredTags, log: logData)

if (flushFirstError) {
flushFirstError = false
_flushFirstError.set(false)
flush()
}
}
Expand All @@ -178,14 +177,14 @@ public class Telemetry: Subscriber {
if rateLimitEndTime > Date().timeIntervalSince1970 {
return
}
rateLimitEndTime = 0
_rateLimitEndTime.set(0)

do {
try send()
queueBytes = 0
} catch {
errorHandler?(error)
sampleRate = 0.0
_sampleRate.set(0.0)
}
}
}
Expand All @@ -200,7 +199,6 @@ public class Telemetry: Subscriber {
sendQueue.append(metric)
}
queueBytes = 0
queueSizeExceeded = false

let payload = try JSONEncoder().encode(["series": sendQueue])
var request = upload(apiHost: host)
Expand All @@ -214,7 +212,7 @@ public class Telemetry: Subscriber {

if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 429 {
if let retryAfter = httpResponse.allHeaderFields["Retry-After"] as? String, let retryAfterSeconds = TimeInterval(retryAfter) {
self.rateLimitEndTime = retryAfterSeconds + Date().timeIntervalSince1970
self._rateLimitEndTime.set(retryAfterSeconds + Date().timeIntervalSince1970)
}
}
}
Expand Down Expand Up @@ -255,6 +253,8 @@ public class Telemetry: Subscriber {
return
}

guard queue.count < maxQueueSize else { return }

let newMetric = RemoteMetric(
type: METRIC_TYPE,
metric: metric,
Expand All @@ -266,8 +266,6 @@ public class Telemetry: Subscriber {
if queueBytes + newMetricSize <= maxQueueBytes {
queue.append(newMetric)
queueBytes += newMetricSize
} else {
queueSizeExceeded = true
}
}
}
Expand All @@ -284,7 +282,7 @@ public class Telemetry: Subscriber {

private func systemUpdate(system: System) {
if let settings = system.settings, let sampleRate = settings.metrics?["sampleRate"]?.doubleValue {
self.sampleRate = sampleRate
self._sampleRate.set(sampleRate)
start()
}
}
Expand All @@ -297,19 +295,10 @@ public class Telemetry: Subscriber {
return request
}

private func queueHasSpace() -> Bool {
var under = false
telemetryQueue.sync {
under = queue.count < maxQueueSize
}
return under
}

private func resetQueue() {
telemetryQueue.sync {
queue.removeAll()
queueBytes = 0
queueSizeExceeded = false
}
}
}
43 changes: 38 additions & 5 deletions Tests/Segment-Tests/Telemetry_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TelemetryTests: XCTestCase {
self.errors.append("\(error)")
}
errors.removeAll()
Telemetry.shared.sampleRate = 1.0
Telemetry.shared.sampleRateTest.set(1.0)
mockTelemetryHTTPClient()
}

Expand All @@ -29,12 +29,12 @@ class TelemetryTests: XCTestCase {
}

func testTelemetryStart() {
Telemetry.shared.sampleRate = 0.0
Telemetry.shared.sampleRateTest.set(0.0)
Telemetry.shared.enable = true
Telemetry.shared.start()
XCTAssertFalse(Telemetry.shared.started)

Telemetry.shared.sampleRate = 1.0
Telemetry.shared.sampleRateTest.set(1.0)
Telemetry.shared.start()
XCTAssertTrue(Telemetry.shared.started)
XCTAssertTrue(errors.isEmpty)
Expand Down Expand Up @@ -116,7 +116,7 @@ class TelemetryTests: XCTestCase {

func testHTTPException() {
mockTelemetryHTTPClient(shouldThrow: true)
Telemetry.shared.flushFirstError = true
Telemetry.shared.flushFirstErrorTest.set(true)
Telemetry.shared.enable = true
Telemetry.shared.start()
Telemetry.shared.error(metric: Telemetry.INVOKE_METRIC, log: "log") { $0["error"] = "test" }
Expand All @@ -143,6 +143,38 @@ class TelemetryTests: XCTestCase {
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: longString) { $0["writekey"] = longString }
XCTAssertTrue(Telemetry.shared.queue.count < 1000)
}

func testConcurrentErrorReporting() {
Telemetry.shared.enable = true
let operationCount = 200

let concurrentExpectation = XCTestExpectation(description: "High pressure operations")
concurrentExpectation.expectedFulfillmentCount = operationCount

// Use multiple dispatch queues to increase concurrency
let queues = [
DispatchQueue.global(qos: .userInitiated),
DispatchQueue.global(qos: .default),
DispatchQueue.global(qos: .utility)
]
for i in 0..<operationCount {
// Round-robin between different queues
let queue = queues[i % queues.count]
queue.async {
Telemetry.shared.error(
metric: Telemetry.INVOKE_ERROR_METRIC,
log: "High pressure test \(i)"
) { tags in
tags["error"] = "pressure_test_key"
tags["queue"] = "\(i % queues.count)"
tags["iteration"] = "\(i)"
}
concurrentExpectation.fulfill()
}
}
wait(for: [concurrentExpectation], timeout: 15.0)
XCTAssertTrue(Telemetry.shared.queue.count == Telemetry.shared.maxQueueSize)
}
}

// Mock URLSession
Expand All @@ -154,12 +186,13 @@ class URLSessionMock: RestrictedHTTPSession {
var shouldThrow = false

override func dataTask(with request: URLRequest, completionHandler: @escaping (Data?, URLResponse?, Error?) -> Void) -> URLSessionDataTask {
let task = URLSession.shared.dataTask(with: request) { _, _, _ in }
if shouldThrow {
completionHandler(nil, nil, NSError(domain: "Test", code: 1, userInfo: nil))
} else {
completionHandler(nil, HTTPURLResponse(url: request.url!, statusCode: 200, httpVersion: nil, headerFields: nil), nil)
}
return URLSessionDataTaskMock()
return task
}
}

Expand Down

0 comments on commit 14d8420

Please sign in to comment.