Skip to content

Commit

Permalink
Refactoring of flush completion.
Browse files Browse the repository at this point in the history
  • Loading branch information
bsneed committed Apr 17, 2024
1 parent 83e2288 commit 7d9849f
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 109 deletions.
65 changes: 4 additions & 61 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ extension Analytics {
return nil
}

/// Tells this instance of Analytics to flush any queued events up to Segment.com. This command will also
/// be sent to each plugin present in the system. A completion handler can be optionally given and will be
/// called when flush has completed.
public func flush(completion: (() -> Void)? = nil) {
// only flush if we're enabled.
guard enabled == true else { return }
Expand All @@ -231,9 +234,7 @@ extension Analytics {
apply { plugin in
completionGroup.add { group in
if let p = plugin as? FlushCompletion {
p.flush(group: group) { plugin in
// we don't really care about the plugin value .. yet.
}
p.flush(group: group)
} else if let p = plugin as? EventPlugin {
group.enter()
p.flush()
Expand All @@ -247,64 +248,6 @@ extension Analytics {
}
}

/// Tells this instance of Analytics to flush any queued events up to Segment.com. This command will also
/// be sent to each plugin present in the system. A completion handler can be optionally given and will be
/// called when flush has completed.
public func flush2(completion: (() -> Void)? = nil) {
// only flush if we're enabled.
guard enabled == true else { return }

let flushGroup = DispatchGroup()
// gotta call enter at least once before we ask to be notified.
flushGroup.enter()

apply { plugin in
// we want to enter as soon as possible. waiting to do it from
// another queue just takes too long.
operatingMode.run(queue: configuration.values.flushQueue) {
if let p = plugin as? FlushCompletion {
// flush handles the groups enter/leave calls
p.flush(group: flushGroup) { plugin in
// we don't really care about the plugin value .. yet.
}
} else if let p = plugin as? EventPlugin {
flushGroup.enter()
// we have no idea if this will be async or not, assume it's sync.
p.flush()
flushGroup.leave()
}
}
}

flushGroup.leave() // matches our initial enter().

// if we ARE in sync mode, we need to wait on the group.
// This effectively ends up being a `sync` operation.
if operatingMode == .synchronous {
flushGroup.wait()
// we need to call completion on our own since
// we skipped setting up notify. we don't need to do it on
// .main since we are in synchronous mode.
if let completion { completion() }
} else if operatingMode == .asynchronous {
// if we're not, flip over to our serial queue, tell it to wait on the flush
// group to complete if we have a completion to hit. Otherwise, no need to
// wait on completion.
if let completion {
// NOTE: DispatchGroup's `notify` method on linux ended up getting called
// before the tasks have actually completed, so we went with this instead.
OperatingMode.defaultQueue.async { [weak self] in
let timedOut = flushGroup.wait(timeout: .now() + 15 /*seconds*/)
if timedOut == .timedOut {
self?.log(message: "flush(completion:) timed out waiting for completion.")
}
completion()
//DispatchQueue.main.async { completion() }
}
}
}
}

/// Resets this instance of Analytics to a clean slate. Traits, UserID's, anonymousId, etc are all cleared or reset. This
/// command will also be sent to each plugin present in the system.
public func reset() {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Plugins.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public protocol VersionedPlugin {
}

public protocol FlushCompletion {
func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void)
func flush(group: DispatchGroup)
}

// For internal platform-specific bits
Expand Down
44 changes: 17 additions & 27 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,15 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
// unused .. see flush(group:completion:)
}

public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
public func flush(group: DispatchGroup) {
group.enter()
defer { group.leave() }

guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }

// don't flush if analytics is disabled.
guard analytics.enabled == true else { return }

// enter for the high level flush, allow us time to run through any existing files..
group.enter()
print("flush entering group")

eventCount = 0
cleanupUploads()
Expand All @@ -144,26 +143,19 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion

if pendingUploads == 0 {
if type == .file, hasData {
flushFiles(group: group, completion: completion)
flushFiles(group: group)
} else if type == .data, hasData {
// we know it's a data-based transaction as opposed to file I/O
flushData(group: group, completion: completion)
} else {
// there was nothing to do ...
completion(self)
flushData(group: group)
}
} else {
analytics.log(message: "Skipping processing; Uploads in progress.")
}

// leave for the high level flush
print("flush leaving group")
group.leave()
}
}

extension SegmentDestination {
private func flushFiles(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
private func flushFiles(group: DispatchGroup) {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
guard let httpClient = self.httpClient else { return }
Expand All @@ -173,11 +165,13 @@ extension SegmentDestination {
for url in files {
// enter for this url we're going to kick off
group.enter()
print("flushFiles entered group \(url)")
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")

// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { [weak self] result in
defer {
group.leave()
}
guard let self else { return }
switch result {
case .success(_):
Expand All @@ -198,11 +192,6 @@ extension SegmentDestination {
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
cleanupUploads()
// call the completion
completion(self)
// leave for the url we kicked off.
print("flushFiles leaving group \(url)")
group.leave()
}

// we have a legit upload in progress now, so add it to our list.
Expand All @@ -212,7 +201,7 @@ extension SegmentDestination {
}
}

private func flushData(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
private func flushData(group: DispatchGroup) {
// DO NOT CALL THIS FROM THE MAIN THREAD, IT BLOCKS!
// Don't make me add a check here; i'll be sad you didn't follow directions.
guard let storage = self.storage else { return }
Expand Down Expand Up @@ -243,6 +232,12 @@ extension SegmentDestination {

// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, data: data) { [weak self] result in
defer {
// leave for the url we kicked off.
group.leave()
semaphore.signal()
}

guard let self else { return }
switch result {
case .success(_):
Expand All @@ -263,11 +258,6 @@ extension SegmentDestination {
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
cleanupUploads()
// call the completion
completion(self)
// leave for the url we kicked off.
group.leave()
semaphore.signal()
}

// we have a legit upload in progress now, so add it to our list.
Expand Down
20 changes: 0 additions & 20 deletions Tests/Segment-Tests/Analytics_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -754,16 +754,6 @@ final class Analytics_Tests: XCTestCase {
.flushAt(9999)
.operatingMode(.asynchronous))

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)
Expand All @@ -790,16 +780,6 @@ final class Analytics_Tests: XCTestCase {
.flushAt(9999)
.operatingMode(.synchronous))

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)
Expand Down

0 comments on commit 7d9849f

Please sign in to comment.