Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry cleanup and adding test for fixed crash #376

Merged
merged 6 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions Sources/Segment/Errors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@ extension Analytics {
if fatal {
exceptionFailure("A critical error occurred: \(translatedError)")
}
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
(_ it: inout [String: String]) in
it["error"] = "\(translatedError)"
it["writekey"] = configuration.values.writeKey
it["caller"] = Thread.callStackSymbols[3]
}
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
(_ it: inout [String: String]) in
it["error"] = "\(translatedError)"
it["writekey"] = configuration.values.writeKey
}
}

static public func reportInternalError(_ error: Error, fatal: Bool = false) {
Expand All @@ -90,9 +89,8 @@ extension Analytics {
exceptionFailure("A critical error occurred: \(translatedError)")
}
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: Thread.callStackSymbols.joined(separator: "\n")) {
(_ it: inout [String: String]) in
it["error"] = "\(translatedError)"
it["caller"] = Thread.callStackSymbols[3]
}
(_ it: inout [String: String]) in
it["error"] = "\(translatedError)"
}
}
}
6 changes: 3 additions & 3 deletions Sources/Segment/Timeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ internal class Mediator {
if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty {
it["plugin"] = "\(plugin.type)-\(plugin.key)"
} else {
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))"
}
}
}
Expand All @@ -84,7 +84,7 @@ internal class Mediator {
if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty {
it["plugin"] = "\(plugin.type)-\(plugin.key)"
} else {
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))"
} }
return plugin === storedPlugin
}
Expand All @@ -109,7 +109,7 @@ internal class Mediator {
if let plugin = plugin as? DestinationPlugin, !plugin.key.isEmpty {
it["plugin"] = "\(plugin.type)-\(plugin.key)"
} else {
it["plugin"] = "\(plugin.type)-\(String(describing: plugin))"
it["plugin"] = "\(plugin.type)-\(String(describing: type(of: plugin)))"
} }
}
}
Expand Down
51 changes: 20 additions & 31 deletions Sources/Segment/Utilities/Telemetry.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public class Telemetry: Subscriber {

internal var session: any HTTPSession
internal var host: String = HTTPClient.getDefaultAPIHost()
var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start
@Atomic internal var sampleRate: Double = 1.0 // inital sample rate should be 1.0, will be downsampled on start
internal var sampleRateTest: Atomic<Double> { _sampleRate }
private var flushTimer: Int = 30
internal var maxQueueSize: Int = 20
var errorLogSizeMax: Int = 4000
Expand All @@ -85,29 +86,28 @@ public class Telemetry: Subscriber {

internal var queue = [RemoteMetric]()
private var queueBytes = 0
private var queueSizeExceeded = false
private var seenErrors = [String: Int]()
internal var started = false
private var rateLimitEndTime: TimeInterval = 0
internal var flushFirstError = true
@Atomic internal var started = false
@Atomic private var rateLimitEndTime: TimeInterval = 0
@Atomic internal var flushFirstError = true
internal var flushFirstErrorTest: Atomic<Bool> { _flushFirstError }
private var telemetryQueue = DispatchQueue(label: "telemetryQueue")
private var updateQueue = DispatchQueue(label: "updateQueue")
private var telemetryTimer: QueueTimer?

/// Starts the Telemetry send loop. Requires both `enable` to be set and a configuration to be retrieved from Segment.
func start() {
guard enable, !started, sampleRate > 0.0 && sampleRate <= 1.0 else { return }
started = true
_started.set(true)

// Queue contents were sampled at the default 100%
// the values on flush will be adjusted in the send function
if Double.random(in: 0...1) > sampleRate {
resetQueue()
}

self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: .main) { [weak self] in
self.telemetryTimer = QueueTimer(interval: .seconds(self.flushTimer), queue: updateQueue) { [weak self] in
if (!(self?.enable ?? false)) {
self?.started = false
self?._started.set(false)
self?.telemetryTimer?.suspend()
}
self?.flush()
Expand All @@ -118,17 +118,16 @@ public class Telemetry: Subscriber {
func reset() {
telemetryTimer?.suspend()
resetQueue()
seenErrors.removeAll()
started = false
rateLimitEndTime = 0
_started.set(false)
_rateLimitEndTime.set(0)
}

/// Increments a metric with the provided tags.
/// - Parameters:
/// - metric: The metric name.
/// - buildTags: A closure to build the tags dictionary.
func increment(metric: String, buildTags: (inout [String: String]) -> Void) {
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return }
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return }
if Double.random(in: 0...1) > sampleRate { return }

var tags = [String: String]()
Expand All @@ -144,7 +143,7 @@ public class Telemetry: Subscriber {
/// - log: The log data.
/// - buildTags: A closure to build the tags dictionary.
func error(metric: String, log: String, buildTags: (inout [String: String]) -> Void) {
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG), queueHasSpace() else { return }
guard enable, sampleRate > 0.0 && sampleRate <= 1.0, metric.hasPrefix(Telemetry.METRICS_BASE_TAG) else { return }
if Double.random(in: 0...1) > sampleRate { return }

var tags = [String: String]()
Expand All @@ -164,7 +163,7 @@ public class Telemetry: Subscriber {
addRemoteMetric(metric: metric, tags: filteredTags, log: logData)

if (flushFirstError) {
flushFirstError = false
_flushFirstError.set(false)
flush()
}
}
Expand All @@ -178,14 +177,14 @@ public class Telemetry: Subscriber {
if rateLimitEndTime > Date().timeIntervalSince1970 {
return
}
rateLimitEndTime = 0
_rateLimitEndTime.set(0)

do {
try send()
queueBytes = 0
} catch {
errorHandler?(error)
sampleRate = 0.0
_sampleRate.set(0.0)
}
}
}
Expand All @@ -200,7 +199,6 @@ public class Telemetry: Subscriber {
sendQueue.append(metric)
}
queueBytes = 0
queueSizeExceeded = false

let payload = try JSONEncoder().encode(["series": sendQueue])
var request = upload(apiHost: host)
Expand All @@ -214,7 +212,7 @@ public class Telemetry: Subscriber {

if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 429 {
if let retryAfter = httpResponse.allHeaderFields["Retry-After"] as? String, let retryAfterSeconds = TimeInterval(retryAfter) {
self.rateLimitEndTime = retryAfterSeconds + Date().timeIntervalSince1970
self._rateLimitEndTime.set(retryAfterSeconds + Date().timeIntervalSince1970)
}
}
}
Expand Down Expand Up @@ -255,6 +253,8 @@ public class Telemetry: Subscriber {
return
}

guard queue.count < maxQueueSize else { return }

let newMetric = RemoteMetric(
type: METRIC_TYPE,
metric: metric,
Expand All @@ -266,8 +266,6 @@ public class Telemetry: Subscriber {
if queueBytes + newMetricSize <= maxQueueBytes {
queue.append(newMetric)
queueBytes += newMetricSize
} else {
queueSizeExceeded = true
}
}
}
Expand All @@ -284,7 +282,7 @@ public class Telemetry: Subscriber {

private func systemUpdate(system: System) {
if let settings = system.settings, let sampleRate = settings.metrics?["sampleRate"]?.doubleValue {
self.sampleRate = sampleRate
self._sampleRate.set(sampleRate)
start()
}
}
Expand All @@ -297,19 +295,10 @@ public class Telemetry: Subscriber {
return request
}

private func queueHasSpace() -> Bool {
var under = false
telemetryQueue.sync {
under = queue.count < maxQueueSize
}
return under
}

private func resetQueue() {
telemetryQueue.sync {
queue.removeAll()
queueBytes = 0
queueSizeExceeded = false
}
}
}
43 changes: 38 additions & 5 deletions Tests/Segment-Tests/Telemetry_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TelemetryTests: XCTestCase {
self.errors.append("\(error)")
}
errors.removeAll()
Telemetry.shared.sampleRate = 1.0
Telemetry.shared.sampleRateTest.set(1.0)
mockTelemetryHTTPClient()
}

Expand All @@ -29,12 +29,12 @@ class TelemetryTests: XCTestCase {
}

func testTelemetryStart() {
Telemetry.shared.sampleRate = 0.0
Telemetry.shared.sampleRateTest.set(0.0)
Telemetry.shared.enable = true
Telemetry.shared.start()
XCTAssertFalse(Telemetry.shared.started)

Telemetry.shared.sampleRate = 1.0
Telemetry.shared.sampleRateTest.set(1.0)
Telemetry.shared.start()
XCTAssertTrue(Telemetry.shared.started)
XCTAssertTrue(errors.isEmpty)
Expand Down Expand Up @@ -116,7 +116,7 @@ class TelemetryTests: XCTestCase {

func testHTTPException() {
mockTelemetryHTTPClient(shouldThrow: true)
Telemetry.shared.flushFirstError = true
Telemetry.shared.flushFirstErrorTest.set(true)
Telemetry.shared.enable = true
Telemetry.shared.start()
Telemetry.shared.error(metric: Telemetry.INVOKE_METRIC, log: "log") { $0["error"] = "test" }
Expand All @@ -143,6 +143,38 @@ class TelemetryTests: XCTestCase {
Telemetry.shared.error(metric: Telemetry.INVOKE_ERROR_METRIC, log: longString) { $0["writekey"] = longString }
XCTAssertTrue(Telemetry.shared.queue.count < 1000)
}

func testConcurrentErrorReporting() {
Telemetry.shared.enable = true
let operationCount = 200

let concurrentExpectation = XCTestExpectation(description: "High pressure operations")
concurrentExpectation.expectedFulfillmentCount = operationCount

// Use multiple dispatch queues to increase concurrency
let queues = [
DispatchQueue.global(qos: .userInitiated),
DispatchQueue.global(qos: .default),
DispatchQueue.global(qos: .utility)
]
for i in 0..<operationCount {
// Round-robin between different queues
let queue = queues[i % queues.count]
queue.async {
Telemetry.shared.error(
metric: Telemetry.INVOKE_ERROR_METRIC,
log: "High pressure test \(i)"
) { tags in
tags["error"] = "pressure_test_key"
tags["queue"] = "\(i % queues.count)"
tags["iteration"] = "\(i)"
}
concurrentExpectation.fulfill()
}
}
wait(for: [concurrentExpectation], timeout: 15.0)
XCTAssertTrue(Telemetry.shared.queue.count == Telemetry.shared.maxQueueSize)
}
}

// Mock URLSession
Expand All @@ -154,12 +186,13 @@ class URLSessionMock: RestrictedHTTPSession {
var shouldThrow = false

override func dataTask(with request: URLRequest, completionHandler: @escaping (Data?, URLResponse?, Error?) -> Void) -> URLSessionDataTask {
let task = URLSession.shared.dataTask(with: request) { _, _, _ in }
if shouldThrow {
completionHandler(nil, nil, NSError(domain: "Test", code: 1, userInfo: nil))
} else {
completionHandler(nil, HTTPURLResponse(url: request.url!, statusCode: 200, httpVersion: nil, headerFields: nil), nil)
}
return URLSessionDataTaskMock()
return task
}
}

Expand Down
Loading