Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving threading safety for telemetry #250

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 54 additions & 39 deletions core/src/main/java/com/segment/analytics/kotlin/core/Telemetry.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import java.util.concurrent.Executors
import kotlin.math.min
import kotlin.math.roundToInt
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlinx.coroutines.channels.Channel

class MetricsRequestFactory : RequestFactory() {
override fun upload(apiHost: String): HttpURLConnection {
Expand Down Expand Up @@ -77,21 +80,14 @@ object Telemetry: Subscriber {
var host: String = Constants.DEFAULT_API_HOST
// 1.0 is 100%, will get set by Segment setting before start()
// Values are adjusted by the sampleRate on send
@Volatile private var _sampleRate: Double = 1.0
var sampleRate: Double
get() = _sampleRate
set(value) {
synchronized(this) {
_sampleRate = value
}
}
var flushTimer: Int = 30 * 1000 // 30s
private var sampleRate = AtomicReference<Double>(1.0)
private var flushTimer: Int = 30 * 1000 // 30s
var httpClient: HTTPClient = HTTPClient("", MetricsRequestFactory())
var sendWriteKeyOnError: Boolean = true
var sendErrorLogData: Boolean = false
var errorHandler: ((Throwable) -> Unit)? = ::logError
var maxQueueSize: Int = 20
var errorLogSizeMax: Int = 4000
private var maxQueueSize: Int = 20
private var errorLogSizeMax: Int = 4000

private const val MAX_QUEUE_BYTES = 28000
var maxQueueBytes: Int = MAX_QUEUE_BYTES
Expand All @@ -100,7 +96,7 @@ object Telemetry: Subscriber {
}

private val queue = ConcurrentLinkedQueue<RemoteMetric>()
private var queueBytes = 0
private var queueBytes = AtomicInteger(0)
private var started = AtomicBoolean(false)
private var rateLimitEndTime: Long = 0
private var flushFirstError = AtomicBoolean(true)
Expand All @@ -116,16 +112,27 @@ object Telemetry: Subscriber {
private var telemetryDispatcher: ExecutorCoroutineDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private var telemetryJob: Job? = null

private val flushChannel = Channel<Unit>(Channel.UNLIMITED)

// Start a coroutine to process flush requests
init {
telemetryScope.launch(telemetryDispatcher) {
for (event in flushChannel) {
performFlush()
}
}
}

/**
* Starts the telemetry if it is enabled and not already started, and the sample rate is greater than 0.
* Called automatically when Telemetry.enable is set to true and when configuration data is received from Segment.
*/
fun start() {
if (!enable || started.get() || sampleRate == 0.0) return
if (!enable || started.get() || sampleRate.get() == 0.0) return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should enable be atomic too?

started.set(true)

// Everything queued was sampled at default 100%, downsample adjustment and send will adjust values
if (Math.random() > sampleRate) {
if (Math.random() > sampleRate.get()) {
resetQueue()
}

Expand Down Expand Up @@ -170,10 +177,10 @@ object Telemetry: Subscriber {
val tags = mutableMapOf<String, String>()
buildTags(tags)

if (!enable || sampleRate == 0.0) return
if (!enable || sampleRate.get() == 0.0) return
if (!metric.startsWith(METRICS_BASE_TAG)) return
if (tags.isEmpty()) return
if (Math.random() > sampleRate) return
if (Math.random() > sampleRate.get()) return

addRemoteMetric(metric, tags)
}
Expand All @@ -189,10 +196,10 @@ object Telemetry: Subscriber {
val tags = mutableMapOf<String, String>()
buildTags(tags)

if (!enable || sampleRate == 0.0) return
if (!enable || sampleRate.get() == 0.0) return
if (!metric.startsWith(METRICS_BASE_TAG)) return
if (tags.isEmpty()) return
if (Math.random() > sampleRate) return
if (Math.random() > sampleRate.get()) return

var filteredTags = if(sendWriteKeyOnError) {
tags.toMap()
Expand All @@ -216,33 +223,41 @@ object Telemetry: Subscriber {
}
}

@Synchronized
fun flush() {
if (!enable) return
flushChannel.trySend(Unit)
}

private fun performFlush() {
if (!enable || queue.isEmpty()) return
if (rateLimitEndTime > (System.currentTimeMillis() / 1000).toInt()) {
return
}
rateLimitEndTime = 0

flushFirstError.set(false)
try {
send()
} catch (error: Throwable) {
errorHandler?.invoke(error)
sampleRate = 0.0
sampleRate.set(0.0)
}
}

private fun send() {
if (sampleRate == 0.0) return
val sendQueue: MutableList<RemoteMetric>
synchronized(queue) {
sendQueue = queue.toMutableList()
queue.clear()
queueBytes = 0
}
sendQueue.forEach { m ->
m.value = (m.value / sampleRate).roundToInt()
if (sampleRate.get() == 0.0) return
val sendQueue = mutableListOf<RemoteMetric>()
// Reset queue data size counter since all current queue items will be removed
queueBytes.set(0)
var queueCount = queue.size
while(queueCount > 0 && !queue.isEmpty()) {
--queueCount
val m = queue.poll()
if(m != null) {
m.value = (m.value / sampleRate.get()).roundToInt()
sendQueue.add(m)
}
}
assert(queue.size == 0)
try {
// Json.encodeToString by default does not include default values
// We're using this to leave off the 'log' parameter if unset.
Expand Down Expand Up @@ -314,10 +329,12 @@ object Telemetry: Subscriber {
tags = fullTags
)
val newMetricSize = newMetric.toString().toByteArray().size
synchronized(queue) {
if (queueBytes + newMetricSize <= maxQueueBytes) {
queue.add(newMetric)
queueBytes += newMetricSize
// Avoid synchronization issue by adding the size before checking.
if (queueBytes.addAndGet(newMetricSize) <= maxQueueBytes) {
queue.add(newMetric)
} else {
if(queueBytes.addAndGet(-newMetricSize) < 0) {
queueBytes.set(0)
}
}
}
Expand All @@ -334,7 +351,7 @@ object Telemetry: Subscriber {
private suspend fun systemUpdate(system: com.segment.analytics.kotlin.core.System) {
system.settings?.let { settings ->
settings.metrics["sampleRate"]?.jsonPrimitive?.double?.let {
sampleRate = it
sampleRate.set(it)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if use sovran, sampleRate does not have to be atomic, since sovran serialize the task in the same thread. same thing applies to enable and started

// We don't want to start telemetry until two conditions are met:
// Telemetry.enable is set to true
// Settings from the server have adjusted the sampleRate
Expand All @@ -345,9 +362,7 @@ object Telemetry: Subscriber {
}

private fun resetQueue() {
synchronized(queue) {
queue.clear()
queueBytes = 0
}
queue.clear()
queueBytes.set(0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import kotlin.random.Random

class TelemetryTest {
Expand All @@ -31,6 +32,22 @@ class TelemetryTest {
queueBytesField.isAccessible = true
return queueBytesField.get(Telemetry) as Int
}
fun TelemetryMaxQueueSize(): Int {
val maxQueueSizeField: Field = Telemetry::class.java.getDeclaredField("maxQueueSize")
maxQueueSizeField.isAccessible = true
return maxQueueSizeField.get(Telemetry) as Int
}
var TelemetrySampleRate: Double
get() {
val sampleRateField: Field = Telemetry::class.java.getDeclaredField("sampleRate")
sampleRateField.isAccessible = true
return (sampleRateField.get(Telemetry) as AtomicReference<Double>).get()
}
set(value) {
val sampleRateField: Field = Telemetry::class.java.getDeclaredField("sampleRate")
sampleRateField.isAccessible = true
(sampleRateField.get(Telemetry) as AtomicReference<Double>).set(value)
}
var TelemetryStarted: AtomicBoolean
get() {
val startedField: Field = Telemetry::class.java.getDeclaredField("started")
Expand Down Expand Up @@ -69,20 +86,20 @@ class TelemetryTest {
Telemetry.reset()
Telemetry.errorHandler = ::errorHandler
errors.clear()
Telemetry.sampleRate = 1.0
TelemetrySampleRate = 1.0
MockKAnnotations.init(this)
mockTelemetryHTTPClient()
// Telemetry.enable = true <- this will call start(), so don't do it here
}

@Test
fun `Test telemetry start`() {
Telemetry.sampleRate = 0.0
TelemetrySampleRate = 0.0
Telemetry.enable = true
Telemetry.start()
assertEquals(false, TelemetryStarted.get())

Telemetry.sampleRate = 1.0
TelemetrySampleRate = 1.0
Telemetry.start()
assertEquals(true, TelemetryStarted.get())
assertEquals(0,errors.size)
Expand Down Expand Up @@ -186,11 +203,11 @@ class TelemetryTest {
fun `Test increment and error methods when queue is full`() {
Telemetry.enable = true
Telemetry.start()
for (i in 1..Telemetry.maxQueueSize + 1) {
for (i in 1..TelemetryMaxQueueSize() + 1) {
Telemetry.increment(Telemetry.INVOKE_METRIC) { it["test"] = "test" + i }
Telemetry.error(Telemetry.INVOKE_ERROR_METRIC, "error") { it["error"] = "test" + i }
}
assertEquals(Telemetry.maxQueueSize, TelemetryQueueSize())
assertEquals(TelemetryMaxQueueSize(), TelemetryQueueSize())
}

@Test
Expand Down Expand Up @@ -239,6 +256,6 @@ class TelemetryTest {
} finally {
executor.shutdown()
}
assertTrue(TelemetryQueueSize() == Telemetry.maxQueueSize)
assertTrue(TelemetryQueueSize() == TelemetryMaxQueueSize())
}
}
Loading