Skip to content

Commit

Permalink
Lock-free ringbuf output
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Jowett <[email protected]>
  • Loading branch information
Alan Jowett committed Nov 20, 2024
1 parent 0a14ab3 commit 60e2ad5
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 173 deletions.
27 changes: 20 additions & 7 deletions libs/api/ebpf_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4266,15 +4266,28 @@ _ebpf_ring_buffer_map_async_query_completion(_Inout_ void* completion_context) N
break;
}

int callback_result = subscription->sample_callback(
subscription->sample_callback_context,
const_cast<void*>(reinterpret_cast<const void*>(record->data)),
record->header.length - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));
if (callback_result != 0) {
break;
long size;
for (;;) {
size = ReadAcquire(&record->size);
if (size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED) {
// The record is being written to by the producer.
// Wait for the producer to finish writing.
YieldProcessor();
} else {
break;
}
}

consumer += record->header.length;
if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED)) {
int callback_result = subscription->sample_callback(
subscription->sample_callback_context,
const_cast<void*>(reinterpret_cast<const void*>(record->data)),
size - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));
if (callback_result != 0) {
break;
}
}
consumer += size;
}
}

Expand Down
271 changes: 114 additions & 157 deletions libs/runtime/ebpf_ring_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,80 +8,13 @@

typedef struct _ebpf_ring_buffer
{
ebpf_lock_t lock;
size_t length;
size_t consumer_offset;
size_t producer_offset;
int64_t length;
volatile int64_t consumer_offset;
volatile int64_t producer_offset;
uint8_t* shared_buffer;
ebpf_ring_descriptor_t* ring_descriptor;
} ebpf_ring_buffer_t;

inline static size_t
_ring_get_length(_In_ const ebpf_ring_buffer_t* ring)
{
return ring->length;
}

inline static size_t
_ring_get_producer_offset(_In_ const ebpf_ring_buffer_t* ring)
{
return ring->producer_offset % ring->length;
}

inline static size_t
_ring_get_consumer_offset(_In_ const ebpf_ring_buffer_t* ring)
{
return ring->consumer_offset % ring->length;
}

inline static size_t
_ring_get_used_capacity(_In_ const ebpf_ring_buffer_t* ring)
{
ebpf_assert(ring->producer_offset >= ring->consumer_offset);
return ring->producer_offset - ring->consumer_offset;
}

inline static void
_ring_advance_producer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length)
{
ring->producer_offset += length;
}

inline static void
_ring_advance_consumer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length)
{
ring->consumer_offset += length;
}

inline static _Ret_notnull_ ebpf_ring_buffer_record_t*
_ring_record_at_offset(_In_ const ebpf_ring_buffer_t* ring, size_t offset)
{
return (ebpf_ring_buffer_record_t*)&ring->shared_buffer[offset % ring->length];
}

inline static _Ret_notnull_ ebpf_ring_buffer_record_t*
_ring_next_consumer_record(_In_ const ebpf_ring_buffer_t* ring)
{
return _ring_record_at_offset(ring, _ring_get_consumer_offset(ring));
}

inline static _Ret_maybenull_ ebpf_ring_buffer_record_t*
_ring_buffer_acquire_record(_Inout_ ebpf_ring_buffer_t* ring, size_t requested_length)
{
ebpf_ring_buffer_record_t* record = NULL;
requested_length += EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data);
size_t remaining_space = ring->length - (ring->producer_offset - ring->consumer_offset);

if (remaining_space > requested_length) {
record = _ring_record_at_offset(ring, _ring_get_producer_offset(ring));
_ring_advance_producer_offset(ring, requested_length);
record->header.length = (uint32_t)requested_length;
record->header.locked = 1;
record->header.discarded = 0;
}
return record;
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_create(_Outptr_ ebpf_ring_buffer_t** ring, size_t capacity)
{
Expand Down Expand Up @@ -135,78 +68,68 @@ _Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_output(_Inout_ ebpf_ring_buffer_t* ring, _In_reads_bytes_(length) uint8_t* data, size_t length)
{
ebpf_result_t result;
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length);
uint8_t* buffer;

if (record == NULL) {
result = EBPF_OUT_OF_SPACE;
goto Done;
result = ebpf_ring_buffer_reserve(ring, &buffer, length);
if (result != EBPF_SUCCESS) {
return result;
}

record->header.discarded = 0;
record->header.locked = 0;
memcpy(record->data, data, length);
result = EBPF_SUCCESS;
Done:
ebpf_lock_unlock(&ring->lock, state);
return result;
memcpy(buffer, data, length);

return ebpf_ring_buffer_submit(buffer);
}

void
ebpf_ring_buffer_query(_In_ ebpf_ring_buffer_t* ring, _Out_ size_t* consumer, _Out_ size_t* producer)
{
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
*consumer = ring->consumer_offset;
*producer = ring->producer_offset;
ebpf_lock_unlock(&ring->lock, state);
*consumer = (size_t)ReadAcquire64(&ring->consumer_offset);
*producer = (size_t)ReadAcquire64(&ring->producer_offset);
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_return(_Inout_ ebpf_ring_buffer_t* ring, size_t length)
{
EBPF_LOG_ENTRY();
ebpf_result_t result;
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
size_t local_length = length;
size_t offset = _ring_get_consumer_offset(ring);

if ((length > _ring_get_length(ring)) || length > _ring_get_used_capacity(ring)) {
EBPF_LOG_MESSAGE_UINT64_UINT64(
EBPF_TRACELOG_LEVEL_ERROR,
EBPF_TRACELOG_KEYWORD_MAP,
"ebpf_ring_buffer_return: Buffer too large",
ring->producer_offset,
ring->consumer_offset);
result = EBPF_INVALID_ARGUMENT;
goto Done;
}
int64_t length_remaining = (int64_t)length;

// Verify count.
while (local_length != 0) {
ebpf_ring_buffer_record_t* record = _ring_record_at_offset(ring, offset);
if (local_length < record->header.length) {
for (;;) {
int64_t producer_offset = ReadAcquire64(&ring->producer_offset);
int64_t consumer_offset = ReadAcquire64(&ring->consumer_offset);

if (length_remaining > (producer_offset - consumer_offset)) {
return EBPF_INVALID_ARGUMENT;
}

if (length_remaining == 0) {
break;
}
offset += record->header.length;
local_length -= record->header.length;
}
// Did it end on a record boundary?
if (local_length != 0) {
EBPF_LOG_MESSAGE_UINT64(
EBPF_TRACELOG_LEVEL_ERROR,
EBPF_TRACELOG_KEYWORD_MAP,
"ebpf_ring_buffer_return: Invalid buffer length",
local_length);
result = EBPF_INVALID_ARGUMENT;
goto Done;
}

_ring_advance_consumer_offset(ring, length);
result = EBPF_SUCCESS;
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(ring->shared_buffer + consumer_offset % ring->length);

Done:
ebpf_lock_unlock(&ring->lock, state);
EBPF_RETURN_RESULT(result);
long size = ReadAcquire(&record->size);

// Can't return a locked record.
if (size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED) {
continue;
}

long data_size = size - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data);

memset(record->data, 0, data_size);
WriteRelease(&record->size, 0);

length_remaining -= size;

if (consumer_offset + size > producer_offset) {
return EBPF_INVALID_ARGUMENT;
}

WriteRelease64(&ring->consumer_offset, consumer_offset + size);
}

EBPF_RETURN_RESULT(EBPF_SUCCESS);
}

_Must_inspect_result_ ebpf_result_t
Expand All @@ -224,56 +147,90 @@ _Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_reserve(
_Inout_ ebpf_ring_buffer_t* ring, _Outptr_result_bytebuffer_(length) uint8_t** data, size_t length)
{
ebpf_result_t result;
ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock);
ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length);
if (record == NULL) {
result = EBPF_INVALID_ARGUMENT;
goto Done;
}

record->header.locked = 1;
MemoryBarrier();
for (;;) {
int64_t producer_offset = ReadAcquire64(&ring->producer_offset);
// Record points to the next record to allocate.
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(ring->shared_buffer + producer_offset % ring->length);

int64_t remaining_space = ring->length - (producer_offset - ReadAcquire64(&ring->consumer_offset));
long effective_length = (long)length + 4;
if (remaining_space < effective_length) {
return EBPF_NO_MEMORY;
}

// Check if locked.
if (record->size != 0) {
// If locked, pause then try again.
// Use _mm_pause() on x86 and __yield() on ARM.
#if defined(_M_X64) || defined(_M_IX86)
_mm_pause();
#else
__yield();
#endif
continue;
}

if (InterlockedCompareExchange(&record->size, EBPF_RING_BUFFER_RECORD_FLAG_LOCKED, 0) != 0) {
continue;
}

// Check if the producer offset changed after we read it.
if (ReadAcquire64(&ring->producer_offset) != producer_offset) {
// Clear the lock bit
InterlockedAnd(&record->size, ~(EBPF_RING_BUFFER_RECORD_FLAG_LOCKED));
continue;
}

remaining_space = ring->length - (producer_offset - ReadAcquire64(&ring->consumer_offset));
if (remaining_space < effective_length) {
// Clear the lock bit
InterlockedAnd(&record->size, ~(EBPF_RING_BUFFER_RECORD_FLAG_LOCKED));
return EBPF_NO_MEMORY;
}

// Grab the pointer to the record.
*data = record->data;

*data = record->data;
result = EBPF_SUCCESS;
Done:
ebpf_lock_unlock(&ring->lock, state);
return result;
WriteRelease(&record->size, effective_length | EBPF_RING_BUFFER_RECORD_FLAG_LOCKED);
int64_t new_producer_offset = producer_offset + effective_length;
WriteRelease64(&ring->producer_offset, new_producer_offset);
break;
}

return EBPF_SUCCESS;
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_submit(_Frees_ptr_opt_ uint8_t* data)
{
if (!data) {
ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data);
long size = ReadAcquire(&record->size);

if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) {
return EBPF_INVALID_ARGUMENT;
}
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

record->header.discarded = 0;
// Place a memory barrier here so that all prior writes to the record are completed before the record
// is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and
// the data in the record.
MemoryBarrier();
record->header.locked = 0;

size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED;

WriteRelease(&record->size, size);
return EBPF_SUCCESS;
}

_Must_inspect_result_ ebpf_result_t
ebpf_ring_buffer_discard(_Frees_ptr_opt_ uint8_t* data)
{
if (!data) {
ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data);
long size = ReadAcquire(&record->size);

if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) {
return EBPF_INVALID_ARGUMENT;
}
ebpf_ring_buffer_record_t* record =
(ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data));

record->header.discarded = 1;
// Place a memory barrier here so that all prior writes to the record are completed before the record
// is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and
// the data in the record.
MemoryBarrier();
record->header.locked = 0;

size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED;
size |= EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED;

WriteRelease(&record->size, size);
return EBPF_SUCCESS;
}
Loading

0 comments on commit 60e2ad5

Please sign in to comment.