Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network path monitoring / backoff. #355

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class Analytics {
static internal weak var firstInstance: Analytics? = nil

@Atomic static internal var activeWriteKeys = [String]()

internal var settingsRefreshTimer: QueueTimer? = nil

/**
This method isn't a traditional singleton implementation. It's provided here
Expand Down
9 changes: 9 additions & 0 deletions Sources/Segment/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class Configuration {
var storageMode: StorageMode = .disk
var anonymousIdGenerator: AnonymousIdGenerator = SegmentAnonymousId()
var httpSession: (() -> any HTTPSession) = HTTPSessions.urlSession
var monitorNetworkPath: NetworkPathMonitor = .none
}

internal var values: Values
Expand Down Expand Up @@ -283,6 +284,14 @@ public extension Configuration {
values.httpSession = httpSession
return self
}

/// Monitors the network path. When set to `true`, segment must be reachable for event batches to be uploaded.
/// Setting to `false` preserves existing behavior and is the default.
@discardableResult
func monitorNetworkPath(_ value: NetworkPathMonitor) -> Configuration {
values.monitorNetworkPath = value
return self
}
}

extension Analytics {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ internal class ConnectionMonitor {
@Atomic var connectionStatus: ConnectionStatus = .unknown

init() {
self.timer = QueueTimer(interval: 300, immediate: true) { [weak self] in
self.timer = QueueTimer(interval: 300, immediate: true) { [weak self] _ in
guard let self else { return }
self.check()
}
Expand Down
59 changes: 55 additions & 4 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import Sovran
import FoundationNetworking
#endif

public enum NetworkPathMonitor: Equatable {
case none
case onInterval(resetTime: TimeInterval)
}

public class SegmentAnonymousId: AnonymousIdGenerator {
public func newAnonymousId() -> String {
return UUID().uuidString
Expand Down Expand Up @@ -51,6 +56,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
private var uploads = [UploadTaskInfo]()
private let uploadsQueue = DispatchQueue(label: "uploadsQueue.segment.com")
private var storage: Storage?

private var monitor: NetworkPathMonitor = .none
internal var monitorTimer: QueueTimer? = nil
@Atomic internal var segmentReachable: Bool = true

@Atomic internal var eventCount: Int = 0

Expand Down Expand Up @@ -92,6 +101,10 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
httpClient = HTTPClient(analytics: analytics)
}
}

// Set the monitor value given by the configuration
monitor = analytics.configuration.values.monitorNetworkPath
updateMonitoring(available: true)
}

// MARK: - Event Handling Methods
Expand Down Expand Up @@ -126,6 +139,8 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
}

public func flush(group: DispatchGroup) {
guard httpClient?.segmentReachable == true else { return }

group.enter()
defer { group.leave() }

Expand Down Expand Up @@ -157,6 +172,26 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
}

extension SegmentDestination {
private func updateMonitoring(available: Bool) {
var value = available
if monitor == .none { value = true }
print("SegmentReachable = \(value)")
_segmentReachable.set(value)
if value == true {
monitorTimer = nil
} else {
switch monitor {
case .none:
break
case .onInterval(let resetTime):
monitorTimer = QueueTimer.schedule(interval: resetTime, handler: { [weak self] timer in
guard let self else { return }
updateMonitoring(available: true)
})
}
}
}

private func flushFiles(group: DispatchGroup) {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
Expand Down Expand Up @@ -185,12 +220,20 @@ extension SegmentDestination {
case .success(_):
storage.remove(data: [url])
cleanupUploads()

updateMonitoring(available: true)
// we don't want to retry events in a given batch when a 400
// response for malformed JSON is returned
case .failure(Segment.HTTPClientErrors.statusCode(code: 400)):
case .failure(HTTPClientErrors.statusCode(code: 400)):
storage.remove(data: [url])
cleanupUploads()
// while it did fail, we did reach the server.
updateMonitoring(available: true)
case .failure(HTTPClientErrors.statusCode(code: 540)):
// simulating not making it over the wire.
updateMonitoring(available: false)
case .failure(HTTPClientErrors.unknown):
// we didn't even make it over the wire. :(
updateMonitoring(available: false)
default:
break
}
Expand Down Expand Up @@ -256,12 +299,20 @@ extension SegmentDestination {
case .success(_):
storage.remove(data: removable)
cleanupUploads()

updateMonitoring(available: true)
// we don't want to retry events in a given batch when a 400
// response for malformed JSON is returned
case .failure(Segment.HTTPClientErrors.statusCode(code: 400)):
case .failure(HTTPClientErrors.statusCode(code: 400)):
storage.remove(data: removable)
cleanupUploads()
// while it did fail, we did reach the server.
updateMonitoring(available: true)
case .failure(HTTPClientErrors.statusCode(code: 540)):
// simulating not making it over the wire.
updateMonitoring(available: false)
case .failure(HTTPClientErrors.unknown):
// we didn't even make it over the wire. :(
updateMonitoring(available: false)
default:
break
}
Expand Down
6 changes: 5 additions & 1 deletion Sources/Segment/Settings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ extension Analytics {
}
}

internal func checkSettings() {
/**
Stops the analytics timeline and begins to fetch settings from Segment. Upon
completion, all plugins will be notified, and the timeline started again.
*/
public func checkSettings() {
#if DEBUG
if isUnitTesting {
// we don't really wanna wait for this network call during tests...
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Startup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ extension Analytics {
// now set up a timer to do it every 24 hrs.
// mac apps change focus a lot more than iOS apps, so this
// seems more appropriate here.
QueueTimer.schedule(interval: .days(1), queue: .main) { [weak self] in
settingsRefreshTimer = QueueTimer.schedule(interval: .days(1), queue: .main) { [weak self] _ in
self?.checkSettings()
}
}
Expand Down
5 changes: 5 additions & 0 deletions Sources/Segment/Utilities/Networking/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class HTTPClient {
private var cdnHost: String

private weak var analytics: Analytics?

// say segment is reachable until we know otherwise.
@Atomic internal var segmentReachable: Bool = true

init(analytics: Analytics) {
self.analytics = analytics
Expand Down Expand Up @@ -111,6 +114,8 @@ public class HTTPClient {
case 429:
analytics?.reportInternalError(AnalyticsError.networkServerLimited(httpResponse.statusCode))
completion(.failure(HTTPClientErrors.statusCode(code: httpResponse.statusCode)))
case 540: // used to test offline mode
completion(.failure(HTTPClientErrors.statusCode(code: httpResponse.statusCode)))
default:
analytics?.reportInternalError(AnalyticsError.networkServerRejected(httpResponse.statusCode))
completion(.failure(HTTPClientErrors.statusCode(code: httpResponse.statusCode)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class IntervalBasedFlushPolicy: FlushPolicy,
guard let self = self else { return }
guard let a = self.analytics else { return }
guard let system: System = a.store.currentState() else { return }
self.flushTimer = QueueTimer(interval: system.configuration.values.flushInterval) { [weak self] in
self.flushTimer = QueueTimer(interval: system.configuration.values.flushInterval) { [weak self] _ in
self?.analytics?.flush()
}
}
Expand Down
17 changes: 10 additions & 7 deletions Sources/Segment/Utilities/QueueTimer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ internal class QueueTimer {
let interval: TimeInterval
let timer: DispatchSourceTimer
let queue: DispatchQueue
let handler: () -> Void
let handler: (QueueTimer?) -> Void

@Atomic var state: State = .suspended

static var timers = [QueueTimer]()

static func schedule(interval: TimeInterval, immediate: Bool = false, queue: DispatchQueue = .main, handler: @escaping () -> Void) {
static func schedule(interval: TimeInterval, immediate: Bool = false, queue: DispatchQueue = .main, handler: @escaping (QueueTimer?) -> Void) -> QueueTimer {
let timer = QueueTimer(interval: interval, queue: queue, handler: handler)
Self.timers.append(timer)
//Self.timers.append(timer)
return timer
}

init(interval: TimeInterval, immediate: Bool = false, queue: DispatchQueue = .main, handler: @escaping () -> Void) {
init(interval: TimeInterval, immediate: Bool = false, queue: DispatchQueue = .main, handler: @escaping (QueueTimer?) -> Void) {
self.interval = interval
self.queue = queue
self.handler = handler
Expand All @@ -39,7 +38,7 @@ internal class QueueTimer {
timer.schedule(deadline: .now() + self.interval, repeating: self.interval)
}
timer.setEventHandler { [weak self] in
self?.handler()
self?.handler(self)
}
resume()
}
Expand All @@ -53,6 +52,10 @@ internal class QueueTimer {
resume()
}

func cancel() {

}

func suspend() {
if state == .suspended {
return
Expand Down
48 changes: 47 additions & 1 deletion Tests/Segment-Tests/HTTPClient_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// Created by Brandon Sneed on 1/21/21.
//

#if !os(Linux) && !os(Windows)
#if !os(Linux) && !os(Windows) && !os(watchOS)

import XCTest
@testable import Segment
Expand All @@ -20,6 +20,52 @@ class HTTPClientTests: XCTestCase {
override func tearDownWithError() throws {
// Put teardown code here. This method is called after the invocation of each test method in the class.
}

func testNetworkPathMonitoring() throws {
guard URLProtocol.registerClass(OfflineNetworkCalls.self) else { XCTFail(); return }

// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(
configuration: Configuration(writeKey: "testCustomSesh")
.flushInterval(9999)
.flushAt(9999)
.httpSession(RestrictedHTTPSession(blocking: false, offline: true))
.monitorNetworkPath(.onInterval(resetTime: 5))
)

waitUntilStarted(analytics: analytics)

let seg = analytics.find(pluginType: SegmentDestination.self)!
XCTAssertTrue(seg.segmentReachable)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

analytics.identify(userId: "brandon", traits: MyTraits(email: "[email protected]"))

let flushDone = XCTestExpectation(description: "flush done")
analytics.flush {
flushDone.fulfill()
}

wait(for: [flushDone])

XCTAssertFalse(seg.segmentReachable)

RunLoop.main.run(until: Date.init(timeIntervalSinceNow: 10))

XCTAssertTrue(seg.segmentReachable)

analytics.identify(userId: "brandon", traits: MyTraits(email: "[email protected]"))

let anotherFlushDone = XCTestExpectation(description: "flush 2 done")
analytics.flush {
anotherFlushDone.fulfill()
}

wait(for: [anotherFlushDone])

XCTAssertFalse(seg.segmentReachable)
}

func testCustomHTTPSessionUpload() throws {
// Use a specific writekey to this test so we do not collide with other cached items.
Expand Down
27 changes: 26 additions & 1 deletion Tests/Segment-Tests/Support/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class RestrictedHTTPSession: HTTPSession {
static var dataTasks: Int = 0
static var invalidated: Int = 0

init(blocking: Bool = true, failing: Bool = false) {
init(blocking: Bool = true, failing: Bool = false, offline: Bool = false) {
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
Expand All @@ -213,6 +213,7 @@ class RestrictedHTTPSession: HTTPSession {
var protos = [URLProtocol.Type]()
if blocking { protos.append(BlockNetworkCalls.self) }
if failing { protos.append(FailedNetworkCalls.self) }
if offline { protos.append(OfflineNetworkCalls.self) }
configuration.protocolClasses = protos

sesh = URLSession(configuration: configuration)
Expand Down Expand Up @@ -294,4 +295,28 @@ class FailedNetworkCalls: URLProtocol {
}
}

class OfflineNetworkCalls: URLProtocol {
var initialURL: URL? = nil
override class func canInit(with request: URLRequest) -> Bool {

return true
}

override class func canonicalRequest(for request: URLRequest) -> URLRequest {
return request
}

override var cachedResponse: CachedURLResponse? { return nil }

override func startLoading() {
// using code 540 to activate our logic since we can't simulate offline effectively.
client?.urlProtocol(self, didReceive: HTTPURLResponse(url: URL(string: "http://api.segment.com")!, statusCode: 540, httpVersion: nil, headerFields: ["blocked": "true"])!, cacheStoragePolicy: .notAllowed)
client?.urlProtocolDidFinishLoading(self)
}

override func stopLoading() {

}
}

#endif
Loading