diff --git a/Examples/other_plugins/IDFACollection.swift b/Examples/other_plugins/IDFACollection.swift index a2fe52e6..f3ceadb7 100644 --- a/Examples/other_plugins/IDFACollection.swift +++ b/Examples/other_plugins/IDFACollection.swift @@ -78,7 +78,7 @@ extension IDFACollection: iOSLifecycle { func applicationDidBecomeActive(application: UIApplication?) { let status = ATTrackingManager.trackingAuthorizationStatus - _alreadyAsked.withValue { alreadyAsked in + _alreadyAsked.mutate { alreadyAsked in if status == .notDetermined && !alreadyAsked { // we don't know, so should ask the user. alreadyAsked = true diff --git a/Sources/Segment/Analytics.swift b/Sources/Segment/Analytics.swift index 85dd400c..031ae6a4 100644 --- a/Sources/Segment/Analytics.swift +++ b/Sources/Segment/Analytics.swift @@ -423,12 +423,16 @@ extension Analytics { } internal static func addActiveWriteKey(_ writeKey: String) { - Self.activeWriteKeys.append(writeKey) + Self._activeWriteKeys.mutate { keys in + keys.append(writeKey) + } } internal static func removeActiveWriteKey(_ writeKey: String) { - Self.activeWriteKeys.removeAll { key in - writeKey == key + Self._activeWriteKeys.mutate { keys in + keys.removeAll { key in + writeKey == key + } } } } diff --git a/Sources/Segment/Plugins/Platforms/Mac/macOSLifecycleEvents.swift b/Sources/Segment/Plugins/Platforms/Mac/macOSLifecycleEvents.swift index a34e28c9..1664c770 100644 --- a/Sources/Segment/Plugins/Platforms/Mac/macOSLifecycleEvents.swift +++ b/Sources/Segment/Plugins/Platforms/Mac/macOSLifecycleEvents.swift @@ -27,7 +27,7 @@ class macOSLifecycleEvents: PlatformPlugin, macOSLifecycle { func application(didFinishLaunchingWithOptions launchOptions: [String : Any]?) { // Make sure we aren't double calling application:didFinishLaunchingWithOptions // by resetting the check at the start - didFinishLaunching = true + _didFinishLaunching.set(true) if analytics?.configuration.values.trackApplicationLifecycleEvents == false { return diff --git a/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift b/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift index 901b5ace..0c5106a2 100644 --- a/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift +++ b/Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift @@ -348,17 +348,17 @@ internal class ConnectionMonitor { SCNetworkReachabilityCreateWithAddress(nil, zeroSockAddress) } }) else { - connectionStatus = .unknown + _connectionStatus.set(.unknown) return } var flags : SCNetworkReachabilityFlags = [] if !SCNetworkReachabilityGetFlags(defaultRouteReachability, &flags) { - connectionStatus = .unknown + _connectionStatus.set(.unknown) return } - connectionStatus = ConnectionStatus(reachabilityFlags: flags) + _connectionStatus.set(ConnectionStatus(reachabilityFlags: flags)) } } diff --git a/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift b/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift index edfb91d7..dcc340e5 100644 --- a/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift +++ b/Sources/Segment/Plugins/Platforms/iOS/iOSLifecycleEvents.swift @@ -28,7 +28,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle { // Make sure we aren't double calling application:didFinishLaunchingWithOptions // by resetting the check at the start - didFinishLaunching = true + _didFinishLaunching.set(true) if analytics?.configuration.values.trackApplicationLifecycleEvents == false { return @@ -88,7 +88,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle { } func applicationDidEnterBackground(application: UIApplication?) { - didFinishLaunching = false + _didFinishLaunching.set(false) if analytics?.configuration.values.trackApplicationLifecycleEvents == false { return } diff --git a/Sources/Segment/Plugins/SegmentDestination.swift b/Sources/Segment/Plugins/SegmentDestination.swift index 1ab2260f..8a741dcf 100644 --- a/Sources/Segment/Plugins/SegmentDestination.swift +++ b/Sources/Segment/Plugins/SegmentDestination.swift @@ -116,7 +116,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion guard let storage = self.storage else { return } // Send Event to File System storage.write(.events, value: event) - self._eventCount.withValue { count in + self._eventCount.mutate { count in count += 1 } } @@ -135,7 +135,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion // don't flush if analytics is disabled. guard analytics.enabled == true else { return } - eventCount = 0 + _eventCount.set(0) cleanupUploads() let type = storage.dataStore.transactionType diff --git a/Sources/Segment/Plugins/StartupQueue.swift b/Sources/Segment/Plugins/StartupQueue.swift index 8f316f8e..6e7a3479 100644 --- a/Sources/Segment/Plugins/StartupQueue.swift +++ b/Sources/Segment/Plugins/StartupQueue.swift @@ -47,7 +47,7 @@ public class StartupQueue: Plugin, Subscriber { extension StartupQueue { internal func runningUpdate(state: System) { - running = state.running + _running.set(state.running) if state.running { replayEvents() } diff --git a/Sources/Segment/Utilities/Atomic.swift b/Sources/Segment/Utilities/Atomic.swift index 1dc6c07e..e3bbf266 100644 --- a/Sources/Segment/Utilities/Atomic.swift +++ b/Sources/Segment/Utilities/Atomic.swift @@ -7,34 +7,80 @@ import Foundation -// NOTE: Revised from previous implementation which used a struct and NSLock's. -// Thread Sanitizer was *correctly* capturing this issue, which was a little obscure -// given the property wrapper PLUS the semantics of a struct. Moving to `class` -// removes the semantics problem and lets TSan approve of what's happening. -// -// Additionally, moving to a lock free version is just desirable, so moved to a queue. -// -// Also see thread here: https://github.com/apple/swift-evolution/pull/1387 +/* + Revised the implementation yet again. Tiziano Coriano noticed that this wrapper + can be misleading about it's atomicity. A single set would be atomic, but a compound + operation like += would cause an atomic read, and a separate atomic write, in which + point another thread could've changed the value we're now working off of. + + This implementation removes the ability to set wrappedValue, and callers now must use + the set() or mutate() functions explicitly to ensure a proper atomic mutation. + + The use of a dispatch queue was also removed in favor of an unfair lock (yes, it's + implemented correctly). + */ @propertyWrapper public class Atomic { - private var value: T - private let queue = DispatchQueue(label: "com.segment.atomic.\(UUID().uuidString)") - + #if os(Linux) + let swiftLock: NSLock + #else + internal typealias os_unfair_lock_t = UnsafeMutablePointer + internal var unfairLock: os_unfair_lock_t + #endif + + internal var value: T + public init(wrappedValue value: T) { + #if os(Linux) + self.swiftLock = NSLock() + #else + self.unfairLock = UnsafeMutablePointer.allocate(capacity: 1) + self.unfairLock.initialize(to: os_unfair_lock()) + #endif self.value = value } - + + deinit { + #if !os(Linux) + unfairLock.deallocate() + #endif + } + public var wrappedValue: T { - get { return queue.sync { return value } } - set { queue.sync { value = newValue } } + get { + lock() + defer { unlock() } + return value + } + // set is not allowed, use set() or mutate() } + + public func set(_ newValue: T) { + mutate { $0 = newValue } + } + + public func mutate(_ mutation: (inout T) -> Void) { + lock() + defer { unlock() } + mutation(&value) + } +} - @discardableResult - public func withValue(_ operation: (inout T) -> Void) -> T { - queue.sync { - operation(&self.value) - return self.value - } +extension Atomic { + internal func lock() { + #if os(Linux) + swiftLock.lock() + #else + os_unfair_lock_lock(unfairLock) + #endif + } + + internal func unlock() { + #if os(Linux) + swiftLock.unlock() + #else + os_unfair_lock_unlock(unfairLock) + #endif } } diff --git a/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift b/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift index 0a07edf1..d5be3d0d 100644 --- a/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift +++ b/Sources/Segment/Utilities/Policies/CountBasedFlushPolicy.swift @@ -37,12 +37,12 @@ public class CountBasedFlushPolicy: FlushPolicy { } public func updateState(event: RawEvent) { - _count.withValue { value in + _count.mutate { value in value += 1 } } public func reset() { - count = 0 + _count.set(0) } } diff --git a/Sources/Segment/Utilities/QueueTimer.swift b/Sources/Segment/Utilities/QueueTimer.swift index da6d97fd..4f01a700 100644 --- a/Sources/Segment/Utilities/QueueTimer.swift +++ b/Sources/Segment/Utilities/QueueTimer.swift @@ -57,7 +57,7 @@ internal class QueueTimer { if state == .suspended { return } - state = .suspended + _state.set(.suspended) timer.suspend() } @@ -65,7 +65,7 @@ internal class QueueTimer { if state == .resumed { return } - state = .resumed + _state.set(.resumed) timer.resume() } } diff --git a/Sources/Segment/Utilities/Storage/Storage.swift b/Sources/Segment/Utilities/Storage/Storage.swift index 12fd4d1d..3cd37432 100644 --- a/Sources/Segment/Utilities/Storage/Storage.swift +++ b/Sources/Segment/Utilities/Storage/Storage.swift @@ -13,7 +13,6 @@ internal class Storage: Subscriber { let userDefaults: UserDefaults static let MAXFILESIZE = 475000 // Server accepts max 500k per batch - internal var onFinish: ((URL) -> Void)? = nil internal weak var analytics: Analytics? = nil internal let dataStore: TransientDB diff --git a/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift b/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift index 26f062c2..575e4add 100644 --- a/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift +++ b/Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift @@ -8,6 +8,8 @@ import Foundation public class DirectoryStore: DataStore { + internal static var fileValidator: ((URL) -> Void)? = nil + public typealias StoreConfiguration = Configuration public struct Configuration { @@ -174,6 +176,11 @@ extension DirectoryStore { try? writer.writeLine(fileEnding) let url = writer.url + + // do validation before we rename to prevent the file disappearing out from under us. + DirectoryStore.fileValidator?(url) + + // move it to make availble for flushing ... let newURL = url.appendingPathExtension(Self.tempExtension) try? FileManager.default.moveItem(at: url, to: newURL) self.writer = nil diff --git a/Sources/Segment/Utilities/UserAgent.swift b/Sources/Segment/Utilities/UserAgent.swift index 60ab8a5e..6530f2b9 100644 --- a/Sources/Segment/Utilities/UserAgent.swift +++ b/Sources/Segment/Utilities/UserAgent.swift @@ -35,13 +35,18 @@ internal struct UserAgent { private static let defaultWebKitAppName = "" #endif - internal static var _value: String = "" + @Atomic internal static var _value: String = "" + internal static let lock = NSLock() public static var value: String { + lock.lock() + defer { lock.unlock() } + if _value.isEmpty { - _value = value(applicationName: defaultWebKitAppName) + __value.set(value(applicationName: defaultWebKitAppName)) } return _value + //return "someUserAgent" } private static func version() -> String { diff --git a/Tests/Segment-Tests/Atomic_Tests.swift b/Tests/Segment-Tests/Atomic_Tests.swift index 6e1d1216..d6b420b0 100644 --- a/Tests/Segment-Tests/Atomic_Tests.swift +++ b/Tests/Segment-Tests/Atomic_Tests.swift @@ -13,7 +13,7 @@ final class Atomic_Tests: XCTestCase { // `queue.sync { counter = oldValue + 1 }` // And the threads are free to suspend in between the two calls to `queue.sync`. - _counter.withValue { value in + _counter.mutate { value in value += 1 } } diff --git a/Tests/Segment-Tests/FlushPolicy_Tests.swift b/Tests/Segment-Tests/FlushPolicy_Tests.swift index 0de096c0..636a5792 100644 --- a/Tests/Segment-Tests/FlushPolicy_Tests.swift +++ b/Tests/Segment-Tests/FlushPolicy_Tests.swift @@ -142,7 +142,7 @@ class FlushPolicyTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) if analytics.pendingUploads!.count > 0 { // flush was triggered - flushSent = true + _flushSent.set(true) } } diff --git a/Tests/Segment-Tests/Storage_Tests.swift b/Tests/Segment-Tests/Storage_Tests.swift index d4c89894..4d6cb7e7 100644 --- a/Tests/Segment-Tests/Storage_Tests.swift +++ b/Tests/Segment-Tests/Storage_Tests.swift @@ -290,7 +290,7 @@ class StorageTests: XCTestCase { @Atomic var done = false analytics.flush { print("flush completed") - done = true + _done.set(true) } while !done { diff --git a/Tests/Segment-Tests/StressTests.swift b/Tests/Segment-Tests/StressTests.swift index 36ed6454..4617f530 100644 --- a/Tests/Segment-Tests/StressTests.swift +++ b/Tests/Segment-Tests/StressTests.swift @@ -18,8 +18,94 @@ class StressTests: XCTestCase { // Put teardown code here. This method is called after the invocation of each test method in the class. } - // Linux doesn't know what URLProtocol is and on watchOS it somehow works differently and isn't hit. - #if !os(Linux) && !os(watchOS) + // Linux doesn't know what URLProtocol is and on tvOS/watchOS it somehow works differently and isn't hit. + #if !os(Linux) && !os(tvOS) && !os(watchOS) + func testDirectoryStorageStress2() throws { + // register our network blocker + guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return } + + let analytics = Analytics(configuration: Configuration(writeKey: "stressTest2").errorHandler({ error in + XCTFail("Storage Error: \(error)") + })) + analytics.purgeStorage() + analytics.storage.hardReset(doYouKnowHowToUseThis: true) + + DirectoryStore.fileValidator = { url in + do { + let eventBundle = try JSONSerialization.jsonObject(with: Data(contentsOf: url)) + XCTAssertNotNil(eventBundle, "The event bundle parsed out to null. \(url)") + } catch { + XCTFail("Unable to parse JSON bundle; It must be corrupt! \(error), \(url)") + } + } + + waitUntilStarted(analytics: analytics) + + // set the httpclient to use our blocker session + let segment = analytics.find(pluginType: SegmentDestination.self) + let configuration = URLSessionConfiguration.ephemeral + configuration.allowsCellularAccess = true + configuration.timeoutIntervalForResource = 30 + configuration.timeoutIntervalForRequest = 60 + configuration.httpMaximumConnectionsPerHost = 2 + configuration.protocolClasses = [BlockNetworkCalls.self] + configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8", + "Authorization": "Basic test", + "User-Agent": "analytics-ios/\(Analytics.version())"] + let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil) + segment?.httpClient?.session = blockSession + + @Atomic var ready = false + var queues = [DispatchQueue]() + for i in 0..<30 { + queues.append(DispatchQueue(label: "write queue \(i))", attributes: .concurrent)) + } + + let group = DispatchGroup() + group.enter() + + let lock = NSLock() + var eventsWritten = 0 + let writeBlock: (Int) -> Void = { queueNum in + analytics.track(name: "dummy event") + lock.lock() + eventsWritten += 1 + lock.unlock() + } + + // schedule a bunch of events to go out + for _ in 0..<500_000 { + let randomInt = Int.random(in: 0..<30) + let queue = queues[randomInt] + group.enter() + queue.async { + writeBlock(randomInt) + group.leave() + } + } + + group.notify(queue: DispatchQueue.main) { + _ready.set(false) + print("\(eventsWritten) events written, across 30 queues.") + print("all queues finished.") + } + + _ready.set(true) + + group.leave() + + while (ready) { + RunLoop.main.run(until: Date.distantPast) + } + + // wait for everything to settle down flush-wise... + while (analytics.hasUnsentEvents) { + RunLoop.main.run(until: Date(timeIntervalSinceNow: .seconds(5))) + } + + analytics.purgeStorage() + } + func testDirectoryStorageStress() throws { // register our network blocker guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return } @@ -28,13 +114,13 @@ class StressTests: XCTestCase { XCTFail("Storage Error: \(error)") })) analytics.storage.hardReset(doYouKnowHowToUseThis: true) - analytics.storage.onFinish = { url in - // check that each one is valid json + + DirectoryStore.fileValidator = { url in do { - let json = try Data(contentsOf: url) - _ = try JSONSerialization.jsonObject(with: json) + let eventBundle = try JSONSerialization.jsonObject(with: Data(contentsOf: url)) + XCTAssertNotNil(eventBundle, "The event bundle parsed out to null. \(url)") } catch { - XCTFail("\(error) in \(url)") + XCTFail("Unable to parse JSON bundle; It must be corrupt! \(url)") } } @@ -54,13 +140,17 @@ class StressTests: XCTestCase { let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil) segment?.httpClient?.session = blockSession - let writeQueue1 = DispatchQueue(label: "write queue 1") - let writeQueue2 = DispatchQueue(label: "write queue 2") + let writeQueue1 = DispatchQueue(label: "write queue 1", attributes: .concurrent) + let writeQueue2 = DispatchQueue(label: "write queue 2", attributes: .concurrent) + let writeQueue3 = DispatchQueue(label: "write queue 3", attributes: .concurrent) + let writeQueue4 = DispatchQueue(label: "write queue 4", attributes: .concurrent) let flushQueue = DispatchQueue(label: "flush queue") @Atomic var ready = false @Atomic var queue1Done = false @Atomic var queue2Done = false + @Atomic var queue3Done = false + @Atomic var queue4Done = false writeQueue1.async { while (ready == false) { usleep(1) } @@ -73,7 +163,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 1 wrote \(eventsWritten) events.") - queue1Done = true + _queue1Done.set(true) } writeQueue2.async { @@ -87,7 +177,35 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 2 wrote \(eventsWritten) events.") - queue2Done = true + _queue2Done.set(true) + } + + writeQueue3.async { + while (ready == false) { usleep(1) } + var eventsWritten = 0 + while (eventsWritten < 10000) { + let event = "write queue 3: \(eventsWritten)" + analytics.track(name: event) + eventsWritten += 1 + //usleep(0001) + RunLoop.main.run(until: Date.distantPast) + } + print("queue 3 wrote \(eventsWritten) events.") + _queue3Done.set(true) + } + + writeQueue4.async { + while (ready == false) { usleep(1) } + var eventsWritten = 0 + while (eventsWritten < 10000) { + let event = "write queue 4: \(eventsWritten)" + analytics.track(name: event) + eventsWritten += 1 + //usleep(0001) + RunLoop.main.run(until: Date.distantPast) + } + print("queue 4 wrote \(eventsWritten) events.") + _queue4Done.set(true) } flushQueue.async { @@ -95,7 +213,7 @@ class StressTests: XCTestCase { var counter = 0 //sleep(1) RunLoop.main.run(until: Date(timeIntervalSinceNow: 1)) - while (queue1Done == false || queue2Done == false) { + while (queue1Done == false || queue2Done == false || queue3Done == false || queue4Done == false) { let sleepTime = UInt32.random(in: 1..<3000) //usleep(sleepTime) RunLoop.main.run(until: Date(timeIntervalSinceNow: Double(sleepTime / 1000) )) @@ -103,19 +221,16 @@ class StressTests: XCTestCase { counter += 1 } print("flushed \(counter) times.") - ready = false + _ready.set(false) } - ready = true + _ready.set(true) while (ready) { RunLoop.main.run(until: Date.distantPast) } } - #endif - - // Linux doesn't know what URLProtocol is and on watchOS it somehow works differently and isn't hit. - #if !os(Linux) && !os(watchOS) + func testMemoryStorageStress() throws { // register our network blocker guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return } @@ -127,15 +242,6 @@ class StressTests: XCTestCase { XCTFail("Storage Error: \(error)") })) analytics.storage.hardReset(doYouKnowHowToUseThis: true) - analytics.storage.onFinish = { url in - // check that each one is valid json - do { - let json = try Data(contentsOf: url) - _ = try JSONSerialization.jsonObject(with: json) - } catch { - XCTFail("\(error) in \(url)") - } - } waitUntilStarted(analytics: analytics) @@ -172,7 +278,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 1 wrote \(eventsWritten) events.") - queue1Done = true + _queue1Done.set(true) } writeQueue2.async { @@ -186,7 +292,7 @@ class StressTests: XCTestCase { RunLoop.main.run(until: Date.distantPast) } print("queue 2 wrote \(eventsWritten) events.") - queue2Done = true + _queue2Done.set(true) } flushQueue.async { @@ -202,10 +308,10 @@ class StressTests: XCTestCase { counter += 1 } print("flushed \(counter) times.") - ready = false + _ready.set(false) } - ready = true + _ready.set(true) while (ready) { RunLoop.main.run(until: Date.distantPast)