Skip to content

Commit

Permalink
Make segment.io flush asynchronous.
Browse files Browse the repository at this point in the history
  • Loading branch information
bsneed committed Oct 16, 2023
1 parent 3066c42 commit 4c3311d
Showing 1 changed file with 38 additions and 34 deletions.
72 changes: 38 additions & 34 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class SegmentDestination: DestinationPlugin, Subscriber {

internal var httpClient: HTTPClient?
private var uploads = [UploadTaskInfo]()
private let uploadsQueue = DispatchQueue(label: "uploadsQueue.segment.com")
private let uploadsQueue = DispatchQueue(label: "com.segment.uploadsQueue")
private let flushQueue = DispatchQueue(label: "com.segment.flushQueue")
private var storage: Storage?

@Atomic internal var eventCount: Int = 0
Expand Down Expand Up @@ -113,47 +114,50 @@ public class SegmentDestination: DestinationPlugin, Subscriber {
}

public func flush() {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
guard let httpClient = self.httpClient else { return }

// don't flush if analytics is disabled.
guard analytics.enabled == true else { return }

// Read events from file system
guard let data = storage.read(Storage.Constants.events) else { return }

eventCount = 0
cleanupUploads()

analytics.log(message: "Uploads in-progress: \(pendingUploads)")

if pendingUploads == 0 {
for url in data {
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")

let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in
switch result {
flushQueue.async { [weak self] in
guard let self = self else { return }
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
guard let httpClient = self.httpClient else { return }

// don't flush if analytics is disabled.
guard analytics.enabled == true else { return }

// Read events from file system
guard let data = storage.read(Storage.Constants.events) else { return }

eventCount = 0
cleanupUploads()

analytics.log(message: "Uploads in-progress: \(pendingUploads)")

if pendingUploads == 0 {
for url in data {
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")

let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in
switch result {
case .success(_):
storage.remove(file: url)
self.cleanupUploads()
default:
break
}

analytics.log(message: "Processed: \(url.lastPathComponent)")
// the upload we have here has just finished.
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
self.cleanupUploads()
}
// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
add(uploadTask: UploadTaskInfo(url: url, task: upload))
}

analytics.log(message: "Processed: \(url.lastPathComponent)")
// the upload we have here has just finished.
// make sure it gets removed and it's cleanup() called rather
// than waiting on the next flush to come around.
self.cleanupUploads()
}
// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
add(uploadTask: UploadTaskInfo(url: url, task: upload))
}
} else {
analytics.log(message: "Skipping processing; Uploads in progress.")
}
} else {
analytics.log(message: "Skipping processing; Uploads in progress.")
}
}
}
Expand Down

0 comments on commit 4c3311d

Please sign in to comment.