Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix remaining issues in async/sync mode switching. #270

Merged
merged 10 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -212,40 +212,49 @@ extension Analytics {
flushGroup.enter()

apply { plugin in
// we want to enter as soon as possible. waiting to do it from
// another queue just takes too long.
operatingMode.run(queue: configuration.values.flushQueue) {
if let p = plugin as? FlushCompletion {
// this is async
// flush(group:completion:) handles the enter/leave.
// flush handles the groups enter/leave calls
p.flush(group: flushGroup) { plugin in
// we don't really care about the plugin value .. yet.
}
} else if let p = plugin as? EventPlugin {
// we have no idea if this will be async or not, assume it's sync.
flushGroup.enter()
// we have no idea if this will be async or not, assume it's sync.
p.flush()
flushGroup.leave()
}
}
}

// if we're not in server mode, we need to be notified when it's done.
if let completion, operatingMode != .synchronous {
// set up our callback to know when the group has completed, if we're not
// in .server operating mode.
flushGroup.notify(queue: configuration.values.flushQueue) {
DispatchQueue.main.async { completion() }
}
}

flushGroup.leave() // matches our initial enter().

// if we ARE in server mode, we need to wait on the group.
// if we ARE in sync mode, we need to wait on the group.
// This effectively ends up being a `sync` operation.
if operatingMode == .synchronous {
flushGroup.wait()
// we need to call completion on our own since
// we skipped setting up notify.
if let completion { DispatchQueue.main.async { completion() }}
// we skipped setting up notify. we don't need to do it on
// .main since we are in synchronous mode.
if let completion { completion() }
} else if operatingMode == .asynchronous {
// if we're not, flip over to our serial queue, tell it to wait on the flush
// group to complete if we have a completion to hit. Otherwise, no need to
// wait on completion.
if let completion {
// NOTE: DispatchGroup's `notify` method on linux ended up getting called
// before the tasks have actually completed, so we went with this instead.
OperatingMode.defaultQueue.async { [weak self] in
let timedOut = flushGroup.wait(timeout: .now() + 15 /*seconds*/)
if timedOut == .timedOut {
self?.log(message: "flush(completion:) timed out waiting for completion.")
}
completion()
//DispatchQueue.main.async { completion() }
}
}
}
}

Expand Down Expand Up @@ -437,16 +446,11 @@ extension OperatingMode {
task()
}
case .synchronous:
// if for some reason, we're told to do all this stuff on
// main, ignore it, and use the default queue. this prevents
// a possible deadlock.
if queue === DispatchQueue.main {
OperatingMode.defaultQueue.asyncAndWait {
task()
}
} else {
queue.asyncAndWait { task() }
}
// in synchronous mode, always use our own queue to
// prevent deadlocks.
let workItem = DispatchWorkItem(block: task)
OperatingMode.defaultQueue.asyncAndWait(execute: workItem)
}
}
}

14 changes: 10 additions & 4 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
// don't flush if analytics is disabled.
guard analytics.enabled == true else { return }

// enter for the high level flush, allow us time to run through any existing files..
group.enter()

// Read events from file system
guard let data = storage.read(Storage.Constants.events) else { return }
guard let data = storage.read(Storage.Constants.events) else { group.leave(); return }

eventCount = 0
cleanupUploads()
Expand All @@ -134,9 +137,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion

if pendingUploads == 0 {
for url in data {
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")
// enter the dispatch group
// enter for this url we're going to kick off
group.enter()
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")
// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in
switch result {
Expand All @@ -154,7 +157,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
self.cleanupUploads()
// call the completion
completion(self)
// leave the dispatch group
// leave for the url we kicked off.
group.leave()
}
// we have a legit upload in progress now, so add it to our list.
Expand All @@ -165,6 +168,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
} else {
analytics.log(message: "Skipping processing; Uploads in progress.")
}

// leave for the high level flush
group.leave()
}
}

Expand Down
44 changes: 44 additions & 0 deletions Sources/Segment/Utilities/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@

import Foundation

#if os(Linux)
extension DispatchQueue {
func asyncAndWait(execute workItem: DispatchWorkItem) {
async {
workItem.perform()
}
workItem.wait()
}
}
#endif

/// Inquire as to whether we are within a Unit Testing environment.
#if DEBUG
internal var isUnitTesting: Bool = {
Expand Down Expand Up @@ -58,3 +69,36 @@ extension Optional: Flattenable {
}
}

class TrackingDispatchGroup: CustomStringConvertible {
internal let group = DispatchGroup()

var description: String {
return "DispatchGroup Enters: \(enters), Leaves: \(leaves)"
}

var enters: Int = 0
var leaves: Int = 0
var current: Int = 0

func enter() {
enters += 1
current += 1
group.enter()
}

func leave() {
leaves += 1
current -= 1
group.leave()
}

init() { }

func wait() {
group.wait()
}

public func notify(qos: DispatchQoS = .unspecified, flags: DispatchWorkItemFlags = [], queue: DispatchQueue, execute work: @escaping @convention(block) () -> Void) {
group.notify(qos: qos, flags: flags, queue: queue, execute: work)
}
}
33 changes: 31 additions & 2 deletions Tests/Segment-Tests/Analytics_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -692,9 +692,38 @@ final class Analytics_Tests: XCTestCase {

}

func testServerOperatingMode() {
func testAsyncOperatingMode() {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_serverMode")
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_asyncMode")
.flushInterval(9999)
.flushAt(9999)
.operatingMode(.asynchronous))

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

@Atomic var completionCalled = false

// put an event in the pipe ...
analytics.track(name: "completion test1")
// flush it, that'll get us an upload going
analytics.flush {
// verify completion is called.
completionCalled = true
}

while !completionCalled {
RunLoop.main.run(until: Date.distantPast)
}

XCTAssertTrue(completionCalled)
XCTAssertEqual(analytics.pendingUploads!.count, 0)
}

func testSyncOperatingMode() {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(configuration: Configuration(writeKey: "testFlush_syncMode")
.flushInterval(9999)
.flushAt(9999)
.operatingMode(.synchronous))
Expand Down
12 changes: 8 additions & 4 deletions Tests/Segment-Tests/StressTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class StressTests: XCTestCase {
let event = "write queue 1: \(eventsWritten)"
analytics.track(name: event)
eventsWritten += 1
usleep(0001)
//usleep(0001)
RunLoop.main.run(until: Date.distantPast)
}
print("queue 1 wrote \(eventsWritten) events.")
queue1Done = true
Expand All @@ -82,7 +83,8 @@ class StressTests: XCTestCase {
let event = "write queue 2: \(eventsWritten)"
analytics.track(name: event)
eventsWritten += 1
usleep(0001)
//usleep(0001)
RunLoop.main.run(until: Date.distantPast)
}
print("queue 2 wrote \(eventsWritten) events.")
queue2Done = true
Expand All @@ -91,10 +93,12 @@ class StressTests: XCTestCase {
flushQueue.async {
while (ready == false) { usleep(1) }
var counter = 0
sleep(1)
//sleep(1)
RunLoop.main.run(until: Date(timeIntervalSinceNow: 1))
while (queue1Done == false || queue2Done == false) {
let sleepTime = UInt32.random(in: 1..<3000)
usleep(sleepTime)
//usleep(sleepTime)
RunLoop.main.run(until: Date(timeIntervalSinceNow: Double(sleepTime / 1000) ))
analytics.flush()
counter += 1
}
Expand Down
Loading