diff --git a/libs/api/ebpf_api.cpp b/libs/api/ebpf_api.cpp index 366e7b553b..fd2f230baa 100644 --- a/libs/api/ebpf_api.cpp +++ b/libs/api/ebpf_api.cpp @@ -4266,15 +4266,28 @@ _ebpf_ring_buffer_map_async_query_completion(_Inout_ void* completion_context) N break; } - int callback_result = subscription->sample_callback( - subscription->sample_callback_context, - const_cast(reinterpret_cast(record->data)), - record->header.length - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); - if (callback_result != 0) { - break; + long size; + for (;;) { + size = ReadAcquire(&record->size); + if (size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED) { + // The record is being written to by the producer. + // Wait for the producer to finish writing. + YieldProcessor(); + } else { + break; + } } - consumer += record->header.length; + if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED)) { + int callback_result = subscription->sample_callback( + subscription->sample_callback_context, + const_cast(reinterpret_cast(record->data)), + size - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); + if (callback_result != 0) { + break; + } + } + consumer += size; } } diff --git a/libs/runtime/ebpf_ring_buffer.c b/libs/runtime/ebpf_ring_buffer.c index 65ffbdb6bc..a6cfe7bb62 100644 --- a/libs/runtime/ebpf_ring_buffer.c +++ b/libs/runtime/ebpf_ring_buffer.c @@ -8,80 +8,13 @@ typedef struct _ebpf_ring_buffer { - ebpf_lock_t lock; - size_t length; - size_t consumer_offset; - size_t producer_offset; + int64_t length; + volatile int64_t consumer_offset; + volatile int64_t producer_offset; uint8_t* shared_buffer; ebpf_ring_descriptor_t* ring_descriptor; } ebpf_ring_buffer_t; -inline static size_t -_ring_get_length(_In_ const ebpf_ring_buffer_t* ring) -{ - return ring->length; -} - -inline static size_t -_ring_get_producer_offset(_In_ const ebpf_ring_buffer_t* ring) -{ - return ring->producer_offset % ring->length; -} - -inline static size_t -_ring_get_consumer_offset(_In_ const ebpf_ring_buffer_t* ring) -{ - return ring->consumer_offset % ring->length; -} - -inline static size_t -_ring_get_used_capacity(_In_ const ebpf_ring_buffer_t* ring) -{ - ebpf_assert(ring->producer_offset >= ring->consumer_offset); - return ring->producer_offset - ring->consumer_offset; -} - -inline static void -_ring_advance_producer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length) -{ - ring->producer_offset += length; -} - -inline static void -_ring_advance_consumer_offset(_Inout_ ebpf_ring_buffer_t* ring, size_t length) -{ - ring->consumer_offset += length; -} - -inline static _Ret_notnull_ ebpf_ring_buffer_record_t* -_ring_record_at_offset(_In_ const ebpf_ring_buffer_t* ring, size_t offset) -{ - return (ebpf_ring_buffer_record_t*)&ring->shared_buffer[offset % ring->length]; -} - -inline static _Ret_notnull_ ebpf_ring_buffer_record_t* -_ring_next_consumer_record(_In_ const ebpf_ring_buffer_t* ring) -{ - return _ring_record_at_offset(ring, _ring_get_consumer_offset(ring)); -} - -inline static _Ret_maybenull_ ebpf_ring_buffer_record_t* -_ring_buffer_acquire_record(_Inout_ ebpf_ring_buffer_t* ring, size_t requested_length) -{ - ebpf_ring_buffer_record_t* record = NULL; - requested_length += EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data); - size_t remaining_space = ring->length - (ring->producer_offset - ring->consumer_offset); - - if (remaining_space > requested_length) { - record = _ring_record_at_offset(ring, _ring_get_producer_offset(ring)); - _ring_advance_producer_offset(ring, requested_length); - record->header.length = (uint32_t)requested_length; - record->header.locked = 1; - record->header.discarded = 0; - } - return record; -} - _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_create(_Outptr_ ebpf_ring_buffer_t** ring, size_t capacity) { @@ -135,78 +68,68 @@ _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_output(_Inout_ ebpf_ring_buffer_t* ring, _In_reads_bytes_(length) uint8_t* data, size_t length) { ebpf_result_t result; - ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock); - ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length); + uint8_t* buffer; - if (record == NULL) { - result = EBPF_OUT_OF_SPACE; - goto Done; + result = ebpf_ring_buffer_reserve(ring, &buffer, length); + if (result != EBPF_SUCCESS) { + return result; } - record->header.discarded = 0; - record->header.locked = 0; - memcpy(record->data, data, length); - result = EBPF_SUCCESS; -Done: - ebpf_lock_unlock(&ring->lock, state); - return result; + memcpy(buffer, data, length); + + return ebpf_ring_buffer_submit(buffer); } void ebpf_ring_buffer_query(_In_ ebpf_ring_buffer_t* ring, _Out_ size_t* consumer, _Out_ size_t* producer) { - ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock); - *consumer = ring->consumer_offset; - *producer = ring->producer_offset; - ebpf_lock_unlock(&ring->lock, state); + *consumer = (size_t)ReadAcquire64(&ring->consumer_offset); + *producer = (size_t)ReadAcquire64(&ring->producer_offset); } _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_return(_Inout_ ebpf_ring_buffer_t* ring, size_t length) { EBPF_LOG_ENTRY(); - ebpf_result_t result; - ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock); - size_t local_length = length; - size_t offset = _ring_get_consumer_offset(ring); - - if ((length > _ring_get_length(ring)) || length > _ring_get_used_capacity(ring)) { - EBPF_LOG_MESSAGE_UINT64_UINT64( - EBPF_TRACELOG_LEVEL_ERROR, - EBPF_TRACELOG_KEYWORD_MAP, - "ebpf_ring_buffer_return: Buffer too large", - ring->producer_offset, - ring->consumer_offset); - result = EBPF_INVALID_ARGUMENT; - goto Done; - } + int64_t length_remaining = (int64_t)length; - // Verify count. - while (local_length != 0) { - ebpf_ring_buffer_record_t* record = _ring_record_at_offset(ring, offset); - if (local_length < record->header.length) { + for (;;) { + int64_t producer_offset = ReadAcquire64(&ring->producer_offset); + int64_t consumer_offset = ReadAcquire64(&ring->consumer_offset); + + if (length_remaining > (producer_offset - consumer_offset)) { + return EBPF_INVALID_ARGUMENT; + } + + if (length_remaining == 0) { break; } - offset += record->header.length; - local_length -= record->header.length; - } - // Did it end on a record boundary? - if (local_length != 0) { - EBPF_LOG_MESSAGE_UINT64( - EBPF_TRACELOG_LEVEL_ERROR, - EBPF_TRACELOG_KEYWORD_MAP, - "ebpf_ring_buffer_return: Invalid buffer length", - local_length); - result = EBPF_INVALID_ARGUMENT; - goto Done; - } - _ring_advance_consumer_offset(ring, length); - result = EBPF_SUCCESS; + ebpf_ring_buffer_record_t* record = + (ebpf_ring_buffer_record_t*)(ring->shared_buffer + consumer_offset % ring->length); -Done: - ebpf_lock_unlock(&ring->lock, state); - EBPF_RETURN_RESULT(result); + long size = ReadAcquire(&record->size); + + // Can't return a locked record. + if (size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED) { + continue; + } + + long data_size = size - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data); + + memset(record->data, 0, data_size); + WriteRelease(&record->size, 0); + + length_remaining -= size; + + if (consumer_offset + size > producer_offset) { + return EBPF_INVALID_ARGUMENT; + } + + WriteRelease64(&ring->consumer_offset, consumer_offset + size); + } + + EBPF_RETURN_RESULT(EBPF_SUCCESS); } _Must_inspect_result_ ebpf_result_t @@ -224,56 +147,90 @@ _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_reserve( _Inout_ ebpf_ring_buffer_t* ring, _Outptr_result_bytebuffer_(length) uint8_t** data, size_t length) { - ebpf_result_t result; - ebpf_lock_state_t state = ebpf_lock_lock(&ring->lock); - ebpf_ring_buffer_record_t* record = _ring_buffer_acquire_record(ring, length); - if (record == NULL) { - result = EBPF_INVALID_ARGUMENT; - goto Done; - } - record->header.locked = 1; - MemoryBarrier(); + for (;;) { + int64_t producer_offset = ReadAcquire64(&ring->producer_offset); + // Record points to the next record to allocate. + ebpf_ring_buffer_record_t* record = + (ebpf_ring_buffer_record_t*)(ring->shared_buffer + producer_offset % ring->length); + + int64_t remaining_space = ring->length - (producer_offset - ReadAcquire64(&ring->consumer_offset)); + long effective_length = (long)length + 4; + if (remaining_space < effective_length) { + return EBPF_NO_MEMORY; + } + + // Check if locked. + if (record->size != 0) { + // If locked, pause then try again. + // Use _mm_pause() on x86 and __yield() on ARM. +#if defined(_M_X64) || defined(_M_IX86) + _mm_pause(); +#else + __yield(); +#endif + continue; + } + + if (InterlockedCompareExchange(&record->size, EBPF_RING_BUFFER_RECORD_FLAG_LOCKED, 0) != 0) { + continue; + } + + // Check if the producer offset changed after we read it. + if (ReadAcquire64(&ring->producer_offset) != producer_offset) { + // Clear the lock bit + InterlockedAnd(&record->size, ~(EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)); + continue; + } + + remaining_space = ring->length - (producer_offset - ReadAcquire64(&ring->consumer_offset)); + if (remaining_space < effective_length) { + // Clear the lock bit + InterlockedAnd(&record->size, ~(EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)); + return EBPF_NO_MEMORY; + } + + // Grab the pointer to the record. + *data = record->data; - *data = record->data; - result = EBPF_SUCCESS; -Done: - ebpf_lock_unlock(&ring->lock, state); - return result; + WriteRelease(&record->size, effective_length | EBPF_RING_BUFFER_RECORD_FLAG_LOCKED); + int64_t new_producer_offset = producer_offset + effective_length; + WriteRelease64(&ring->producer_offset, new_producer_offset); + break; + } + + return EBPF_SUCCESS; } _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_submit(_Frees_ptr_opt_ uint8_t* data) { - if (!data) { + ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data); + long size = ReadAcquire(&record->size); + + if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) { return EBPF_INVALID_ARGUMENT; } - ebpf_ring_buffer_record_t* record = - (ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); - - record->header.discarded = 0; - // Place a memory barrier here so that all prior writes to the record are completed before the record - // is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and - // the data in the record. - MemoryBarrier(); - record->header.locked = 0; + + size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED; + + WriteRelease(&record->size, size); return EBPF_SUCCESS; } _Must_inspect_result_ ebpf_result_t ebpf_ring_buffer_discard(_Frees_ptr_opt_ uint8_t* data) { - if (!data) { + ebpf_ring_buffer_record_t* record = CONTAINING_RECORD(data, ebpf_ring_buffer_record_t, data); + long size = ReadAcquire(&record->size); + + if (!(size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED)) { return EBPF_INVALID_ARGUMENT; } - ebpf_ring_buffer_record_t* record = - (ebpf_ring_buffer_record_t*)(data - EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); - - record->header.discarded = 1; - // Place a memory barrier here so that all prior writes to the record are completed before the record - // is unlocked. Caller needs to ensure a MemoryBarrier between reading the record->header.locked and - // the data in the record. - MemoryBarrier(); - record->header.locked = 0; + + size &= ~EBPF_RING_BUFFER_RECORD_FLAG_LOCKED; + size |= EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED; + + WriteRelease(&record->size, size); return EBPF_SUCCESS; } diff --git a/libs/runtime/unit/platform_unit_test.cpp b/libs/runtime/unit/platform_unit_test.cpp index 5ec5849237..a7e17b839d 100644 --- a/libs/runtime/unit/platform_unit_test.cpp +++ b/libs/runtime/unit/platform_unit_test.cpp @@ -1062,9 +1062,9 @@ TEST_CASE("ring_buffer_output", "[platform]") auto record = ebpf_ring_buffer_next_record(buffer, size, consumer, producer); REQUIRE(record != nullptr); - REQUIRE(record->header.length == data.size() + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); + REQUIRE(record->size == data.size() + EBPF_OFFSET_OF(ebpf_ring_buffer_record_t, data)); - REQUIRE(ebpf_ring_buffer_return(ring_buffer, record->header.length) == EBPF_SUCCESS); + REQUIRE(ebpf_ring_buffer_return(ring_buffer, record->size) == EBPF_SUCCESS); ebpf_ring_buffer_query(ring_buffer, &consumer, &producer); record = ebpf_ring_buffer_next_record(buffer, size, consumer, producer); @@ -1134,7 +1134,7 @@ TEST_CASE("ring_buffer_reserve_submit_discard", "[platform]") } uint8_t* mem3 = nullptr; - REQUIRE(ebpf_ring_buffer_reserve(ring_buffer, &mem3, size + 1) == EBPF_INVALID_ARGUMENT); + REQUIRE(ebpf_ring_buffer_reserve(ring_buffer, &mem3, size + 1) == EBPF_NO_MEMORY); ebpf_ring_buffer_query(ring_buffer, &consumer, &producer); @@ -1146,6 +1146,98 @@ TEST_CASE("ring_buffer_reserve_submit_discard", "[platform]") ring_buffer = nullptr; } +TEST_CASE("ring_buffer_stress", "[platform]") +{ + _test_helper test_helper; + test_helper.initialize(); + ebpf_ring_buffer_t* ring_buffer; + + uint8_t* buffer; + std::vector data(10); + size_t size = 64 * 1024; + bool bad_record = false; + std::atomic a_records = 0; + std::atomic b_records = 0; + std::atomic stop{false}; + + REQUIRE(ebpf_ring_buffer_create(&ring_buffer, size) == EBPF_SUCCESS); + REQUIRE(ebpf_ring_buffer_map_buffer(ring_buffer, &buffer) == EBPF_SUCCESS); + + auto producer = [&](std::vector& data) { + while (!stop) { + if (ebpf_ring_buffer_output(ring_buffer, data.data(), data.size()) != EBPF_SUCCESS) { + YieldProcessor(); + } + } + }; + + auto consumer = [&]() { + size_t consumer; + size_t producer; + while (!stop) { + ebpf_ring_buffer_query(ring_buffer, &consumer, &producer); + if (consumer != producer) { + auto record = ebpf_ring_buffer_next_record(buffer, size, consumer, producer); + if (record != nullptr) { + volatile long actual_size = ReadAcquire(&record->size); + if (actual_size & EBPF_RING_BUFFER_RECORD_FLAG_LOCKED) { + YieldProcessor(); + continue; + } + switch (actual_size) { + case 17: + a_records++; + break; + case 23: + b_records++; + break; + default: + bad_record = true; + return; + break; + } + if (ebpf_ring_buffer_return(ring_buffer, actual_size) != EBPF_SUCCESS) { + bad_record = true; + break; + } + } + } else { + YieldProcessor(); + } + } + }; + + std::vector threads; + std::vector data1(13, 'a'); + std::vector data2(19, 'b'); + + auto producer_a = [&]() { producer(data1); }; + auto producer_b = [&]() { producer(data2); }; + + // Start consumer thread. + threads.emplace_back(std::thread(consumer)); + + // Start producer threads. + for (size_t i = 0; i < 10; i++) { + threads.emplace_back(std::thread(producer_a)); + threads.emplace_back(std::thread(producer_b)); + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + stop = true; + + for (auto& thread : threads) { + thread.join(); + } + + REQUIRE(!bad_record); + REQUIRE(a_records > 0); + // REQUIRE(b_records > 0); + + ebpf_ring_buffer_destroy(ring_buffer); +} + TEST_CASE("error codes", "[platform]") { for (ebpf_result_t result = EBPF_SUCCESS; result < EBPF_RESULT_COUNT; result = (ebpf_result_t)(result + 1)) { diff --git a/libs/shared/ebpf_ring_buffer_record.h b/libs/shared/ebpf_ring_buffer_record.h index 70c28a650a..323504834a 100644 --- a/libs/shared/ebpf_ring_buffer_record.h +++ b/libs/shared/ebpf_ring_buffer_record.h @@ -5,14 +5,12 @@ CXPLAT_EXTERN_C_BEGIN +#define EBPF_RING_BUFFER_RECORD_FLAG_LOCKED (long)(0x1ul << 31) +#define EBPF_RING_BUFFER_RECORD_FLAG_DISCARDED (long)(0x1ul << 30) + typedef struct _ebpf_ring_buffer_record { - struct - { - uint8_t locked : 1; - uint8_t discarded : 1; - uint32_t length : 30; - } header; + long size; uint8_t data[1]; } ebpf_ring_buffer_record_t;