-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improving threading safety for telemetry #250
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ import java.util.concurrent.ConcurrentLinkedQueue | |
import java.util.concurrent.Executors | ||
import kotlin.math.min | ||
import kotlin.math.roundToInt | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
|
||
class MetricsRequestFactory : RequestFactory() { | ||
override fun upload(apiHost: String): HttpURLConnection { | ||
|
@@ -76,7 +77,14 @@ object Telemetry: Subscriber { | |
var host: String = Constants.DEFAULT_API_HOST | ||
// 1.0 is 100%, will get set by Segment setting before start() | ||
// Values are adjusted by the sampleRate on send | ||
var sampleRate: Double = 1.0 | ||
@Volatile private var _sampleRate: Double = 1.0 | ||
var sampleRate: Double | ||
get() = _sampleRate | ||
set(value) { | ||
synchronized(this) { | ||
_sampleRate = value | ||
} | ||
} | ||
var flushTimer: Int = 30 * 1000 // 30s | ||
var httpClient: HTTPClient = HTTPClient("", MetricsRequestFactory()) | ||
var sendWriteKeyOnError: Boolean = true | ||
|
@@ -93,9 +101,9 @@ object Telemetry: Subscriber { | |
|
||
private val queue = ConcurrentLinkedQueue<RemoteMetric>() | ||
private var queueBytes = 0 | ||
private var started = false | ||
private var started = AtomicBoolean(false) | ||
private var rateLimitEndTime: Long = 0 | ||
private var flushFirstError = true | ||
private var flushFirstError = AtomicBoolean(true) | ||
private val exceptionHandler = CoroutineExceptionHandler { _, t -> | ||
errorHandler?.let { | ||
it( Exception( | ||
|
@@ -113,8 +121,8 @@ object Telemetry: Subscriber { | |
* Called automatically when Telemetry.enable is set to true and when configuration data is received from Segment. | ||
*/ | ||
fun start() { | ||
if (!enable || started || sampleRate == 0.0) return | ||
started = true | ||
if (!enable || started.get() || sampleRate == 0.0) return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we need atomic on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Started could maybe get away without as well, but enable will in most cases not ever change and there are several checks where if it happens to pass by one check, it will avoid sending later. But mainly I thought it not worth changing the interface and documentation just yet, since enable is the only exposed API. |
||
started.set(true) | ||
|
||
// Everything queued was sampled at default 100%, downsample adjustment and send will adjust values | ||
if (Math.random() > sampleRate) { | ||
|
@@ -124,7 +132,7 @@ object Telemetry: Subscriber { | |
telemetryJob = telemetryScope.launch(telemetryDispatcher) { | ||
while (isActive) { | ||
if (!enable) { | ||
started = false | ||
started.set(false) | ||
return@launch | ||
} | ||
try { | ||
|
@@ -148,7 +156,7 @@ object Telemetry: Subscriber { | |
fun reset() { | ||
telemetryJob?.cancel() | ||
resetQueue() | ||
started = false | ||
started.set(false) | ||
rateLimitEndTime = 0 | ||
} | ||
|
||
|
@@ -202,8 +210,8 @@ object Telemetry: Subscriber { | |
|
||
addRemoteMetric(metric, filteredTags, log=logData) | ||
|
||
if(flushFirstError) { | ||
flushFirstError = false | ||
if(flushFirstError.get()) { | ||
flushFirstError.set(false) | ||
flush() | ||
} | ||
} | ||
|
@@ -218,7 +226,6 @@ object Telemetry: Subscriber { | |
|
||
try { | ||
send() | ||
queueBytes = 0 | ||
} catch (error: Throwable) { | ||
errorHandler?.invoke(error) | ||
sampleRate = 0.0 | ||
|
@@ -227,16 +234,14 @@ object Telemetry: Subscriber { | |
|
||
private fun send() { | ||
if (sampleRate == 0.0) return | ||
var queueCount = queue.size | ||
// Reset queue data size counter since all current queue items will be removed | ||
queueBytes = 0 | ||
val sendQueue = mutableListOf<RemoteMetric>() | ||
while (queueCount-- > 0 && !queue.isEmpty()) { | ||
val m = queue.poll() | ||
if(m != null) { | ||
m.value = (m.value / sampleRate).roundToInt() | ||
sendQueue.add(m) | ||
} | ||
val sendQueue: MutableList<RemoteMetric> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of snapshot the whole queue, snapshot the size of the queue and dequeue just for that size should work and that avoid of using synchronized There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's what the code was doing originally, can probably just make the queueBytes atomic and leave it at that. |
||
synchronized(queue) { | ||
sendQueue = queue.toMutableList() | ||
queue.clear() | ||
queueBytes = 0 | ||
} | ||
sendQueue.forEach { m -> | ||
m.value = (m.value / sampleRate).roundToInt() | ||
} | ||
try { | ||
// Json.encodeToString by default does not include default values | ||
|
@@ -309,9 +314,11 @@ object Telemetry: Subscriber { | |
tags = fullTags | ||
) | ||
val newMetricSize = newMetric.toString().toByteArray().size | ||
if (queueBytes + newMetricSize <= maxQueueBytes) { | ||
queue.add(newMetric) | ||
queueBytes += newMetricSize | ||
synchronized(queue) { | ||
if (queueBytes + newMetricSize <= maxQueueBytes) { | ||
queue.add(newMetric) | ||
queueBytes += newMetricSize | ||
} | ||
} | ||
} | ||
|
||
|
@@ -338,7 +345,9 @@ object Telemetry: Subscriber { | |
} | ||
|
||
private fun resetQueue() { | ||
queue.clear() | ||
queueBytes = 0 | ||
synchronized(queue) { | ||
queue.clear() | ||
queueBytes = 0 | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should try our best to avoid using
synchronized
since our API is coroutine-based. people could use our API inside a coroutine which causes a suspend function to block.