Skip to content

Commit

Permalink
Add network path monitoring / backoff.
Browse files Browse the repository at this point in the history
  • Loading branch information
bsneed committed Jul 31, 2024
1 parent 9f6cd28 commit 93278af
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 16 deletions.
2 changes: 2 additions & 0 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class Analytics {
static internal weak var firstInstance: Analytics? = nil

@Atomic static internal var activeWriteKeys = [String]()

internal var settingsRefreshTimer: QueueTimer? = nil

/**
This method isn't a traditional singleton implementation. It's provided here
Expand Down
9 changes: 9 additions & 0 deletions Sources/Segment/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class Configuration {
var storageMode: StorageMode = .disk
var anonymousIdGenerator: AnonymousIdGenerator = SegmentAnonymousId()
var httpSession: (() -> any HTTPSession) = HTTPSessions.urlSession
var monitorNetworkPath: NetworkPathMonitor = .none
}

internal var values: Values
Expand Down Expand Up @@ -283,6 +284,14 @@ public extension Configuration {
values.httpSession = httpSession
return self
}

/// Monitors the network path. When set to `true`, segment must be reachable for event batches to be uploaded.
/// Setting to `false` preserves existing behavior and is the default.
@discardableResult
func monitorNetworkPath(_ value: NetworkPathMonitor) -> Configuration {
values.monitorNetworkPath = value
return self
}
}

extension Analytics {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ internal class ConnectionMonitor {
@Atomic var connectionStatus: ConnectionStatus = .unknown

init() {
self.timer = QueueTimer(interval: 300, immediate: true) { [weak self] in
self.timer = QueueTimer(interval: 300, immediate: true) { [weak self] _ in
guard let self else { return }
self.check()
}
Expand Down
59 changes: 55 additions & 4 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import Sovran
import FoundationNetworking
#endif

public enum NetworkPathMonitor: Equatable {
case none
case onInterval(resetTime: TimeInterval)
}

public class SegmentAnonymousId: AnonymousIdGenerator {
public func newAnonymousId() -> String {
return UUID().uuidString
Expand Down Expand Up @@ -51,6 +56,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
private var uploads = [UploadTaskInfo]()
private let uploadsQueue = DispatchQueue(label: "uploadsQueue.segment.com")
private var storage: Storage?

private var monitor: NetworkPathMonitor = .none
internal var monitorTimer: QueueTimer? = nil
@Atomic internal var segmentReachable: Bool = true

@Atomic internal var eventCount: Int = 0

Expand Down Expand Up @@ -92,6 +101,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
httpClient = HTTPClient(analytics: analytics)
}
}

// Set the monitor value given by the configuration
monitor = analytics.configuration.values.monitorNetworkPath
updateMonitoring(available: true)
}

// MARK: - Event Handling Methods
Expand Down Expand Up @@ -126,6 +139,8 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
}

public func flush(group: DispatchGroup) {
guard httpClient?.segmentReachable == true else { return }

group.enter()
defer { group.leave() }

Expand Down Expand Up @@ -157,6 +172,26 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
}

extension SegmentDestination {
private func updateMonitoring(available: Bool) {
var value = available
if monitor == .none { value = true }
print("SegmentReachable = \(value)")
_segmentReachable.set(value)
if value == true {
monitorTimer = nil
} else {
switch monitor {
case .none:
break
case .onInterval(let resetTime):
monitorTimer = QueueTimer.schedule(interval: resetTime, handler: { [weak self] timer in
guard let self else { return }
updateMonitoring(available: true)
})
}
}
}

private func flushFiles(group: DispatchGroup) {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
Expand Down Expand Up @@ -185,12 +220,20 @@ extension SegmentDestination {
case .success(_):
storage.remove(data: [url])
cleanupUploads()

updateMonitoring(available: true)
// we don't want to retry events in a given batch when a 400
// response for malformed JSON is returned
case .failure(Segment.HTTPClientErrors.statusCode(code: 400)):
case .failure(HTTPClientErrors.statusCode(code: 400)):
storage.remove(data: [url])
cleanupUploads()
// while it did fail, we did reach the server.
updateMonitoring(available: true)
case .failure(HTTPClientErrors.statusCode(code: 540)):
// simulating not making it over the wire.
updateMonitoring(available: false)
case .failure(HTTPClientErrors.unknown):
// we didn't even make it over the wire. :(
updateMonitoring(available: false)
default:
break
}
Expand Down Expand Up @@ -256,12 +299,20 @@ extension SegmentDestination {
case .success(_):
storage.remove(data: removable)
cleanupUploads()

updateMonitoring(available: true)
// we don't want to retry events in a given batch when a 400
// response for malformed JSON is returned
case .failure(Segment.HTTPClientErrors.statusCode(code: 400)):
case .failure(HTTPClientErrors.statusCode(code: 400)):
storage.remove(data: removable)
cleanupUploads()
// while it did fail, we did reach the server.
updateMonitoring(available: true)
case .failure(HTTPClientErrors.statusCode(code: 540)):
// simulating not making it over the wire.
updateMonitoring(available: false)
case .failure(HTTPClientErrors.unknown):
// we didn't even make it over the wire. :(
updateMonitoring(available: false)
default:
break
}
Expand Down
6 changes: 5 additions & 1 deletion Sources/Segment/Settings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ extension Analytics {
}
}

internal func checkSettings() {
/**
Stops the analytics timeline and begins to fetch settings from Segment. Upon
completion, all plugins will be notified, and the timeline started again.
*/
public func checkSettings() {
#if DEBUG
if isUnitTesting {
// we don't really wanna wait for this network call during tests...
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Startup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ extension Analytics {
// now set up a timer to do it every 24 hrs.
// mac apps change focus a lot more than iOS apps, so this
// seems more appropriate here.
QueueTimer.schedule(interval: .days(1), queue: .main) { [weak self] in
settingsRefreshTimer = QueueTimer.schedule(interval: .days(1), queue: .main) { [weak self] _ in
self?.checkSettings()
}
}
Expand Down
5 changes: 5 additions & 0 deletions Sources/Segment/Utilities/Networking/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class HTTPClient {
private var cdnHost: String

private weak var analytics: Analytics?

// say segment is reachable until we know otherwise.
@Atomic internal var segmentReachable: Bool = true

init(analytics: Analytics) {
self.analytics = analytics
Expand Down Expand Up @@ -111,6 +114,8 @@ public class HTTPClient {
case 429:
analytics?.reportInternalError(AnalyticsError.networkServerLimited(httpResponse.statusCode))
completion(.failure(HTTPClientErrors.statusCode(code: httpResponse.statusCode)))
case 540: // used to test offline mode
completion(.failure(HTTPClientErrors.statusCode(code: httpResponse.statusCode)))
default:
analytics?.reportInternalError(AnalyticsError.networkServerRejected(httpResponse.statusCode))
completion(.failure(HTTPClientErrors.statusCode(code: httpResponse.statusCode)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class IntervalBasedFlushPolicy: FlushPolicy,
guard let self = self else { return }
guard let a = self.analytics else { return }
guard let system: System = a.store.currentState() else { return }
self.flushTimer = QueueTimer(interval: system.configuration.values.flushInterval) { [weak self] in
self.flushTimer = QueueTimer(interval: system.configuration.values.flushInterval) { [weak self] _ in
self?.analytics?.flush()
}
}
Expand Down
17 changes: 10 additions & 7 deletions Sources/Segment/Utilities/QueueTimer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ internal class QueueTimer {
let interval: TimeInterval
let timer: DispatchSourceTimer
let queue: DispatchQueue
let handler: () -> Void
let handler: (QueueTimer?) -> Void

@Atomic var state: State = .suspended

static var timers = [QueueTimer]()

static func schedule(interval: TimeInterval, immediate: Bool = false, queue: DispatchQueue = .main, handler: @escaping () -> Void) {
static func schedule(interval: TimeInterval, immediate: Bool = false, queue: DispatchQueue = .main, handler: @escaping (QueueTimer?) -> Void) -> QueueTimer {
let timer = QueueTimer(interval: interval, queue: queue, handler: handler)
Self.timers.append(timer)
//Self.timers.append(timer)
return timer
}

init(interval: TimeInterval, immediate: Bool = false, queue: DispatchQueue = .main, handler: @escaping () -> Void) {
init(interval: TimeInterval, immediate: Bool = false, queue: DispatchQueue = .main, handler: @escaping (QueueTimer?) -> Void) {
self.interval = interval
self.queue = queue
self.handler = handler
Expand All @@ -39,7 +38,7 @@ internal class QueueTimer {
timer.schedule(deadline: .now() + self.interval, repeating: self.interval)
}
timer.setEventHandler { [weak self] in
self?.handler()
self?.handler(self)
}
resume()
}
Expand All @@ -53,6 +52,10 @@ internal class QueueTimer {
resume()
}

func cancel() {

}

func suspend() {
if state == .suspended {
return
Expand Down
46 changes: 46 additions & 0 deletions Tests/Segment-Tests/HTTPClient_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,52 @@ class HTTPClientTests: XCTestCase {
override func tearDownWithError() throws {
// Put teardown code here. This method is called after the invocation of each test method in the class.
}

func testNetworkPathMonitoring() throws {
guard URLProtocol.registerClass(OfflineNetworkCalls.self) else { XCTFail(); return }

// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(
configuration: Configuration(writeKey: "testCustomSesh")
.flushInterval(9999)
.flushAt(9999)
.httpSession(RestrictedHTTPSession(blocking: false, offline: true))
.monitorNetworkPath(.onInterval(resetTime: 5))
)

waitUntilStarted(analytics: analytics)

let seg = analytics.find(pluginType: SegmentDestination.self)!
XCTAssertTrue(seg.segmentReachable)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

analytics.identify(userId: "brandon", traits: MyTraits(email: "[email protected]"))

let flushDone = XCTestExpectation(description: "flush done")
analytics.flush {
flushDone.fulfill()
}

wait(for: [flushDone])

XCTAssertFalse(seg.segmentReachable)

RunLoop.main.run(until: Date.init(timeIntervalSinceNow: 10))

XCTAssertTrue(seg.segmentReachable)

analytics.identify(userId: "brandon", traits: MyTraits(email: "[email protected]"))

let anotherFlushDone = XCTestExpectation(description: "flush 2 done")
analytics.flush {
anotherFlushDone.fulfill()
}

wait(for: [anotherFlushDone])

XCTAssertFalse(seg.segmentReachable)
}

func testCustomHTTPSessionUpload() throws {
// Use a specific writekey to this test so we do not collide with other cached items.
Expand Down
27 changes: 26 additions & 1 deletion Tests/Segment-Tests/Support/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class RestrictedHTTPSession: HTTPSession {
static var dataTasks: Int = 0
static var invalidated: Int = 0

init(blocking: Bool = true, failing: Bool = false) {
init(blocking: Bool = true, failing: Bool = false, offline: Bool = false) {
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
Expand All @@ -213,6 +213,7 @@ class RestrictedHTTPSession: HTTPSession {
var protos = [URLProtocol.Type]()
if blocking { protos.append(BlockNetworkCalls.self) }
if failing { protos.append(FailedNetworkCalls.self) }
if offline { protos.append(OfflineNetworkCalls.self) }
configuration.protocolClasses = protos

sesh = URLSession(configuration: configuration)
Expand Down Expand Up @@ -294,4 +295,28 @@ class FailedNetworkCalls: URLProtocol {
}
}

class OfflineNetworkCalls: URLProtocol {
var initialURL: URL? = nil
override class func canInit(with request: URLRequest) -> Bool {

return true
}

override class func canonicalRequest(for request: URLRequest) -> URLRequest {
return request
}

override var cachedResponse: CachedURLResponse? { return nil }

override func startLoading() {
// using code 540 to activate our logic since we can't simulate offline effectively.
client?.urlProtocol(self, didReceive: HTTPURLResponse(url: URL(string: "http://api.segment.com")!, statusCode: 540, httpVersion: nil, headerFields: ["blocked": "true"])!, cacheStoragePolicy: .notAllowed)
client?.urlProtocolDidFinishLoading(self)
}

override func stopLoading() {

}
}

#endif

0 comments on commit 93278af

Please sign in to comment.