Skip to content

Commit

Permalink
Refactored operatingMode related code. (#328)
Browse files Browse the repository at this point in the history
* Refactoring of flush completion.

* Catching places where groups/completions/semaphores might not get hit.

* Fixed linux tests

* Removed debug line; Added fix for long-running ios task.
  • Loading branch information
bsneed authored Apr 18, 2024
1 parent d43c7a8 commit 01d39d2
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 104 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/swift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ jobs:
needs: cancel_previous
runs-on: macos-14
steps:
- name: Install yeetd
run: |
wget https://github.com/biscuitehh/yeetd/releases/download/1.0/yeetd-normal.pkg
sudo installer -pkg yeetd-normal.pkg -target /
yeetd &
- uses: maxim-lobanov/setup-xcode@v1
with:
xcode-version: "15.2"
Expand Down
49 changes: 8 additions & 41 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -228,56 +228,23 @@ extension Analytics {
/// called when flush has completed.
public func flush(completion: (() -> Void)? = nil) {
// only flush if we're enabled.
guard enabled == true else { return }

let flushGroup = DispatchGroup()
// gotta call enter at least once before we ask to be notified.
flushGroup.enter()
guard enabled == true else { completion?(); return }

let completionGroup = CompletionGroup(queue: configuration.values.flushQueue)
apply { plugin in
// we want to enter as soon as possible. waiting to do it from
// another queue just takes too long.
operatingMode.run(queue: configuration.values.flushQueue) {
completionGroup.add { group in
if let p = plugin as? FlushCompletion {
// flush handles the groups enter/leave calls
p.flush(group: flushGroup) { plugin in
// we don't really care about the plugin value .. yet.
}
p.flush(group: group)
} else if let p = plugin as? EventPlugin {
flushGroup.enter()
// we have no idea if this will be async or not, assume it's sync.
group.enter()
p.flush()
flushGroup.leave()
group.leave()
}
}
}

flushGroup.leave() // matches our initial enter().

// if we ARE in sync mode, we need to wait on the group.
// This effectively ends up being a `sync` operation.
if operatingMode == .synchronous {
flushGroup.wait()
// we need to call completion on our own since
// we skipped setting up notify. we don't need to do it on
// .main since we are in synchronous mode.
if let completion { completion() }
} else if operatingMode == .asynchronous {
// if we're not, flip over to our serial queue, tell it to wait on the flush
// group to complete if we have a completion to hit. Otherwise, no need to
// wait on completion.
if let completion {
// NOTE: DispatchGroup's `notify` method on linux ended up getting called
// before the tasks have actually completed, so we went with this instead.
OperatingMode.defaultQueue.async { [weak self] in
let timedOut = flushGroup.wait(timeout: .now() + 15 /*seconds*/)
if timedOut == .timedOut {
self?.log(message: "flush(completion:) timed out waiting for completion.")
}
completion()
//DispatchQueue.main.async { completion() }
}
}
completionGroup.run(mode: operatingMode) {
completion?()
}
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Plugins.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public protocol VersionedPlugin {
}

public protocol FlushCompletion {
func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void)
func flush(group: DispatchGroup)
}

// For internal platform-specific bits
Expand Down
47 changes: 24 additions & 23 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
// unused .. see flush(group:completion:)
}

public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
public func flush(group: DispatchGroup) {
group.enter()
defer { group.leave() }

guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }

// don't flush if analytics is disabled.
guard analytics.enabled == true else { return }

// enter for the high level flush, allow us time to run through any existing files..
group.enter()

eventCount = 0
cleanupUploads()
Expand All @@ -143,25 +143,19 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion

if pendingUploads == 0 {
if type == .file, hasData {
flushFiles(group: group, completion: completion)
flushFiles(group: group)
} else if type == .data, hasData {
// we know it's a data-based transaction as opposed to file I/O
flushData(group: group, completion: completion)
} else {
// there was nothing to do ...
completion(self)
flushData(group: group)
}
} else {
analytics.log(message: "Skipping processing; Uploads in progress.")
}

// leave for the high level flush
group.leave()
}
}

extension SegmentDestination {
private func flushFiles(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
private func flushFiles(group: DispatchGroup) {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
guard let httpClient = self.httpClient else { return }
Expand All @@ -175,6 +169,9 @@ extension SegmentDestination {

// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { [weak self] result in
defer {
group.leave()
}
guard let self else { return }
switch result {
case .success(_):
Expand All @@ -195,20 +192,19 @@ extension SegmentDestination {
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
cleanupUploads()
// call the completion
completion(self)
// leave for the url we kicked off.
group.leave()
}

// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
add(uploadTask: UploadTaskInfo(url: url, data: nil, task: upload))
} else {
// we couldn't get a task, so we need to leave the group or things will hang.
group.leave()
}
}
}

private func flushData(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
private func flushData(group: DispatchGroup) {
// DO NOT CALL THIS FROM THE MAIN THREAD, IT BLOCKS!
// Don't make me add a check here; i'll be sad you didn't follow directions.
guard let storage = self.storage else { return }
Expand Down Expand Up @@ -239,6 +235,12 @@ extension SegmentDestination {

// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, data: data) { [weak self] result in
defer {
// leave for the url we kicked off.
group.leave()
semaphore.signal()
}

guard let self else { return }
switch result {
case .success(_):
Expand All @@ -259,16 +261,15 @@ extension SegmentDestination {
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
cleanupUploads()
// call the completion
completion(self)
// leave for the url we kicked off.
group.leave()
semaphore.signal()
}

// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
add(uploadTask: UploadTaskInfo(url: nil, data: data, task: upload))
} else {
// we couldn't get a task, so we need to leave the group or things will hang.
group.leave()
semaphore.signal()
}

_ = semaphore.wait(timeout: .distantFuture)
Expand Down
54 changes: 54 additions & 0 deletions Sources/Segment/Utilities/CompletionGroup.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// CompletionGroup.swift
//
//
// Created by Brandon Sneed on 4/17/24.
//

import Foundation

class CompletionGroup {
let queue: DispatchQueue
var items = [(DispatchGroup) -> Void]()

init(queue: DispatchQueue) {
self.queue = queue
}

func add(workItem: @escaping (DispatchGroup) -> Void) {
items.append(workItem)
}

func run(mode: OperatingMode, completion: @escaping () -> Void) {
// capture self strongly on purpose
let task: () -> Void = { [self] in
let group = DispatchGroup()
group.enter()
group.notify(queue: queue) { [weak self] in
completion()
self?.items.removeAll()
}

for item in items {
item(group)
}

group.leave()

if mode == .synchronous {
group.wait()
}
}

switch mode {
case .synchronous:
queue.sync {
task()
}
case .asynchronous:
queue.async {
task()
}
}
}
}
1 change: 0 additions & 1 deletion Sources/Segment/Utilities/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,3 @@ internal func eventStorageDirectory(writeKey: String) -> URL {
return segmentURL
}


7 changes: 0 additions & 7 deletions Tests/LinuxMain.swift

This file was deleted.

36 changes: 5 additions & 31 deletions Tests/Segment-Tests/Analytics_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,6 @@ final class Analytics_Tests: XCTestCase {
let shared2 = Analytics.shared()
XCTAssertFalse(alive2 === shared2)
XCTAssertTrue(shared2 === shared)

}

func testAsyncOperatingMode() throws {
Expand All @@ -755,20 +754,6 @@ final class Analytics_Tests: XCTestCase {
.flushAt(9999)
.operatingMode(.asynchronous))

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)
Expand All @@ -777,13 +762,16 @@ final class Analytics_Tests: XCTestCase {

// put an event in the pipe ...
analytics.track(name: "completion test1")

RunLoop.main.run(until: .distantPast)

// flush it, that'll get us an upload going
analytics.flush {
// verify completion is called.
expectation.fulfill()
}

wait(for: [expectation], timeout: 5)
wait(for: [expectation], timeout: 10)

XCTAssertNil(analytics.pendingUploads)
}
Expand All @@ -795,20 +783,6 @@ final class Analytics_Tests: XCTestCase {
.flushAt(9999)
.operatingMode(.synchronous))

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)
Expand All @@ -822,7 +796,7 @@ final class Analytics_Tests: XCTestCase {
expectation.fulfill()
}

wait(for: [expectation], timeout: 1)
wait(for: [expectation], timeout: 10)

XCTAssertNil(analytics.pendingUploads)

Expand Down
Loading

0 comments on commit 01d39d2

Please sign in to comment.