Skip to content

Commit

Permalink
Improved atomicity; added additional stress tests & validation for fi…
Browse files Browse the repository at this point in the history
…le storage. (#343)

* Adding more stressful tests for debugging.

* Remove unused onFinish closure.

* Updated references to atomic

* Some platform specific fixes.

* some linux fixes

* Modified directory store to validate prior to rename.

* Reduced test size

* wait for flushes to finish out ...

* Combined linux/tvos/watchos skips.
  • Loading branch information
bsneed authored May 13, 2024
1 parent 3e8b5b0 commit a1af4aa
Show file tree
Hide file tree
Showing 17 changed files with 241 additions and 74 deletions.
2 changes: 1 addition & 1 deletion Examples/other_plugins/IDFACollection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ extension IDFACollection: iOSLifecycle {
func applicationDidBecomeActive(application: UIApplication?) {
let status = ATTrackingManager.trackingAuthorizationStatus

_alreadyAsked.withValue { alreadyAsked in
_alreadyAsked.mutate { alreadyAsked in
if status == .notDetermined && !alreadyAsked {
// we don't know, so should ask the user.
alreadyAsked = true
Expand Down
10 changes: 7 additions & 3 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,16 @@ extension Analytics {
}

internal static func addActiveWriteKey(_ writeKey: String) {
Self.activeWriteKeys.append(writeKey)
Self._activeWriteKeys.mutate { keys in
keys.append(writeKey)
}
}

internal static func removeActiveWriteKey(_ writeKey: String) {
Self.activeWriteKeys.removeAll { key in
writeKey == key
Self._activeWriteKeys.mutate { keys in
keys.removeAll { key in
writeKey == key
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class macOSLifecycleEvents: PlatformPlugin, macOSLifecycle {
func application(didFinishLaunchingWithOptions launchOptions: [String : Any]?) {
// Make sure we aren't double calling application:didFinishLaunchingWithOptions
// by resetting the check at the start
didFinishLaunching = true
_didFinishLaunching.set(true)

if analytics?.configuration.values.trackApplicationLifecycleEvents == false {
return
Expand Down
6 changes: 3 additions & 3 deletions Sources/Segment/Plugins/Platforms/Vendors/AppleUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -348,17 +348,17 @@ internal class ConnectionMonitor {
SCNetworkReachabilityCreateWithAddress(nil, zeroSockAddress)
}
}) else {
connectionStatus = .unknown
_connectionStatus.set(.unknown)
return
}

var flags : SCNetworkReachabilityFlags = []
if !SCNetworkReachabilityGetFlags(defaultRouteReachability, &flags) {
connectionStatus = .unknown
_connectionStatus.set(.unknown)
return
}

connectionStatus = ConnectionStatus(reachabilityFlags: flags)
_connectionStatus.set(ConnectionStatus(reachabilityFlags: flags))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle {

// Make sure we aren't double calling application:didFinishLaunchingWithOptions
// by resetting the check at the start
didFinishLaunching = true
_didFinishLaunching.set(true)

if analytics?.configuration.values.trackApplicationLifecycleEvents == false {
return
Expand Down Expand Up @@ -88,7 +88,7 @@ class iOSLifecycleEvents: PlatformPlugin, iOSLifecycle {
}

func applicationDidEnterBackground(application: UIApplication?) {
didFinishLaunching = false
_didFinishLaunching.set(false)
if analytics?.configuration.values.trackApplicationLifecycleEvents == false {
return
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
guard let storage = self.storage else { return }
// Send Event to File System
storage.write(.events, value: event)
self._eventCount.withValue { count in
self._eventCount.mutate { count in
count += 1
}
}
Expand All @@ -135,7 +135,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
// don't flush if analytics is disabled.
guard analytics.enabled == true else { return }

eventCount = 0
_eventCount.set(0)
cleanupUploads()

let type = storage.dataStore.transactionType
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Plugins/StartupQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class StartupQueue: Plugin, Subscriber {

extension StartupQueue {
internal func runningUpdate(state: System) {
running = state.running
_running.set(state.running)
if state.running {
replayEvents()
}
Expand Down
86 changes: 66 additions & 20 deletions Sources/Segment/Utilities/Atomic.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,80 @@

import Foundation

// NOTE: Revised from previous implementation which used a struct and NSLock's.
// Thread Sanitizer was *correctly* capturing this issue, which was a little obscure
// given the property wrapper PLUS the semantics of a struct. Moving to `class`
// removes the semantics problem and lets TSan approve of what's happening.
//
// Additionally, moving to a lock free version is just desirable, so moved to a queue.
//
// Also see thread here: https://github.com/apple/swift-evolution/pull/1387
/*
Revised the implementation yet again. Tiziano Coriano noticed that this wrapper
can be misleading about it's atomicity. A single set would be atomic, but a compound
operation like += would cause an atomic read, and a separate atomic write, in which
point another thread could've changed the value we're now working off of.

This implementation removes the ability to set wrappedValue, and callers now must use
the set() or mutate() functions explicitly to ensure a proper atomic mutation.

The use of a dispatch queue was also removed in favor of an unfair lock (yes, it's
implemented correctly).
*/

@propertyWrapper
public class Atomic<T> {
private var value: T
private let queue = DispatchQueue(label: "com.segment.atomic.\(UUID().uuidString)")

#if os(Linux)
let swiftLock: NSLock
#else
internal typealias os_unfair_lock_t = UnsafeMutablePointer<os_unfair_lock_s>
internal var unfairLock: os_unfair_lock_t
#endif

internal var value: T

public init(wrappedValue value: T) {
#if os(Linux)
self.swiftLock = NSLock()
#else
self.unfairLock = UnsafeMutablePointer<os_unfair_lock_s>.allocate(capacity: 1)
self.unfairLock.initialize(to: os_unfair_lock())
#endif
self.value = value
}


deinit {
#if !os(Linux)
unfairLock.deallocate()
#endif
}

public var wrappedValue: T {
get { return queue.sync { return value } }
set { queue.sync { value = newValue } }
get {
lock()
defer { unlock() }
return value
}
// set is not allowed, use set() or mutate()
}

public func set(_ newValue: T) {
mutate { $0 = newValue }
}

public func mutate(_ mutation: (inout T) -> Void) {
lock()
defer { unlock() }
mutation(&value)
}
}

@discardableResult
public func withValue(_ operation: (inout T) -> Void) -> T {
queue.sync {
operation(&self.value)
return self.value
}
extension Atomic {
internal func lock() {
#if os(Linux)
swiftLock.lock()
#else
os_unfair_lock_lock(unfairLock)
#endif
}

internal func unlock() {
#if os(Linux)
swiftLock.unlock()
#else
os_unfair_lock_unlock(unfairLock)
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ public class CountBasedFlushPolicy: FlushPolicy {
}

public func updateState(event: RawEvent) {
_count.withValue { value in
_count.mutate { value in
value += 1
}
}

public func reset() {
count = 0
_count.set(0)
}
}
4 changes: 2 additions & 2 deletions Sources/Segment/Utilities/QueueTimer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ internal class QueueTimer {
if state == .suspended {
return
}
state = .suspended
_state.set(.suspended)
timer.suspend()
}

func resume() {
if state == .resumed {
return
}
state = .resumed
_state.set(.resumed)
timer.resume()
}
}
Expand Down
1 change: 0 additions & 1 deletion Sources/Segment/Utilities/Storage/Storage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ internal class Storage: Subscriber {
let userDefaults: UserDefaults
static let MAXFILESIZE = 475000 // Server accepts max 500k per batch

internal var onFinish: ((URL) -> Void)? = nil
internal weak var analytics: Analytics? = nil

internal let dataStore: TransientDB
Expand Down
7 changes: 7 additions & 0 deletions Sources/Segment/Utilities/Storage/Types/DirectoryStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import Foundation

public class DirectoryStore: DataStore {
internal static var fileValidator: ((URL) -> Void)? = nil

public typealias StoreConfiguration = Configuration

public struct Configuration {
Expand Down Expand Up @@ -174,6 +176,11 @@ extension DirectoryStore {
try? writer.writeLine(fileEnding)

let url = writer.url

// do validation before we rename to prevent the file disappearing out from under us.
DirectoryStore.fileValidator?(url)

// move it to make availble for flushing ...
let newURL = url.appendingPathExtension(Self.tempExtension)
try? FileManager.default.moveItem(at: url, to: newURL)
self.writer = nil
Expand Down
9 changes: 7 additions & 2 deletions Sources/Segment/Utilities/UserAgent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@ internal struct UserAgent {
private static let defaultWebKitAppName = ""
#endif

internal static var _value: String = ""
@Atomic internal static var _value: String = ""
internal static let lock = NSLock()

public static var value: String {
lock.lock()
defer { lock.unlock() }

if _value.isEmpty {
_value = value(applicationName: defaultWebKitAppName)
__value.set(value(applicationName: defaultWebKitAppName))
}
return _value
//return "someUserAgent"
}

private static func version() -> String {
Expand Down
2 changes: 1 addition & 1 deletion Tests/Segment-Tests/Atomic_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ final class Atomic_Tests: XCTestCase {
// `queue.sync { counter = oldValue + 1 }`
// And the threads are free to suspend in between the two calls to `queue.sync`.

_counter.withValue { value in
_counter.mutate { value in
value += 1
}
}
Expand Down
2 changes: 1 addition & 1 deletion Tests/Segment-Tests/FlushPolicy_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class FlushPolicyTests: XCTestCase {
RunLoop.main.run(until: Date.distantPast)
if analytics.pendingUploads!.count > 0 {
// flush was triggered
flushSent = true
_flushSent.set(true)
}
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/Segment-Tests/Storage_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class StorageTests: XCTestCase {
@Atomic var done = false
analytics.flush {
print("flush completed")
done = true
_done.set(true)
}

while !done {
Expand Down
Loading

0 comments on commit a1af4aa

Please sign in to comment.