From ea47230aec71c557c519218886d3a14e91490abf Mon Sep 17 00:00:00 2001 From: Michael Agun Date: Fri, 18 Oct 2024 13:08:17 -0700 Subject: [PATCH] Ringbuffer design updates. Splits the design so callback consumers use the existing libbpf APIs, and mapped memory consumers use the new windows-specific functions. --- docs/RingBuffer.md | 124 +++++++++++++++------------------------------ 1 file changed, 41 insertions(+), 83 deletions(-) diff --git a/docs/RingBuffer.md b/docs/RingBuffer.md index 5b7b55be6e..24e54686c0 100644 --- a/docs/RingBuffer.md +++ b/docs/RingBuffer.md @@ -9,16 +9,15 @@ The new API will support 2 consumer types: callbacks and direct access to the ma Callback consumer: -1. Call `ring_buffer__new` to set up callback -2. Call `ring_buffer__consume` as needed to invoke the callback on any outsanding data that is ready +1. Call `ring_buffer__new` to set up callback. +2. The callback will be invoked for each record written to the ring buffer. Mapped memory consumer: -1. Call `ring_buffer__new` to get a ringbuffer manager. -2. Call `ebpf_ring_buffer_get_buffer` to get pointers to the mapped producer/consumer pages. -3. Call `ebpf_ring_buffer_get_wait_handle` to get the wait handle. -4. Directly read records from the producer pages (and update consumer offset as we read). -5. Call `WaitForSingleObject`/`WaitForMultipleObject` as needed to wait for new data to be available. +1. Call `ebpf_ring_buffer_get_buffer` to get pointers to the mapped producer/consumer pages. +2. Call `ebpf_ring_buffer_get_wait_handle` to get the wait handle. +3. Directly read records from the producer pages (and update consumer offset as we read). +4. Call `WaitForSingleObject`/`WaitForMultipleObject` as needed to wait for new data to be available. ## API Changes @@ -40,10 +39,15 @@ ebpf_result_t ebpf_ring_buffer_output(ebpf_ring_buffer_t* ring, uint8_t* data, size_t length, size_t flags) ``` -### Updated supported libbpf functions +### Existing libbpf functions for callback consumer -```c +The behaviour of these functions will be unchanged. + +Use the existing `ring_buffer__new()` to set up automatic callbacks for each record. +Call `ebpf_ring_buffer_get_buffer()` ([New eBPF APIs](#new-ebpf-apis-for-mapped-memory-consumer)) +to get direct access to the mapped ringbuffer memory. +```c struct ring_buffer; typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); @@ -68,17 +72,6 @@ struct ring_buffer * ring_buffer__new(int map_fd, ring_buffer_sample_fn sample_cb, void *ctx, const struct ring_buffer_opts *opts); -/** - * @brief Add extra RINGBUF maps to this ring buffer manager - * - * @param[in] rb Pointer to ring buffer manager. - * @param[in] map_fd File descriptor to ring buffer map. - * @param[in] sample_cb Pointer to ring buffer notification callback function. - * @param[in] ctx Pointer to sample_cb callback function context. - */ -int ring_buffer__add(struct ring_buffer *rb, int map_fd, - ring_buffer_sample_fn sample_cb, void *ctx) - /** * @brief Frees a ring buffer manager. * @@ -86,32 +79,9 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, * */ void ring_buffer__free(struct ring_buffer *rb); - -/** - * @brief poll ringbuf for new data - * Poll for available data and consume records, if any are available. - * Returns number of records consumed (or INT_MAX, whichever is less), or - * negative number, if any of the registered callbacks returned error. - * - * @param[in] rb Pointer to ring buffer manager. - * @param[in] timeout_ms maximum time to wait for (in milliseconds). - * - */ -int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); - -/** - * @brief catch consumer up to producer by invoking the callback for every available record - * Consume available ring buffer(s) data without event polling. - * Returns number of records consumed across all registered ring buffers (or - * INT_MAX, whichever is less), or negative number if any of the callbacks - * return error. - * - * @param[in] rb Pointer to ring buffer manager. - */ -int ring_buffer__consume(struct ring_buffer *rb); ``` -### New ebpf APIs +### New ebpf APIs for mapped memory consumer ```c /** @@ -120,23 +90,19 @@ int ring_buffer__consume(struct ring_buffer *rb); * @param[out] producer pointer* to start of read-only mapped producer pages * @param[out] consumer pointer* to start of read-write mapped consumer page */ -ebpf_result_t ebpf_ring_buffer_get_buffer(const ebpf_map_t* map, void **producer, void **consumer); +ebpf_result_t ebpf_ring_buffer_get_buffer(fd_t map_fd, void **producer, void **consumer); /** * get the wait handle to use with WaitForSingleObject/WaitForMultipleObject * - * @param[out] producer pointer* to start of read-only mapped producer pages - * @param[out] consumer pointer* to start of read-write mapped consumer page - * * @returns Wait handle */ -HANDLE ebpf_ring_buffer_get_wait_handle(const ebpf_map_t* map); +HANDLE ebpf_ring_buffer_get_wait_handle(fd_t map_fd); ``` ## Ringbuffer consumer -### libbpf mapped memory consumer example - +### mapped memory consumer example This consumer directly accesses the records from the producer memory and directly updates the consumer offset to show the logic. @@ -149,7 +115,7 @@ This consumer directly accesses the records from the producer memory and directl // Ring buffer record is 64 bit header + data. typedef struct _rb_header { - //NOTE: bit fields are not portable, so this is just for simpler example code -- the actual code should use bit masking to perform equivalent operations on the header bits. + //NOTE: bit fields are not portable, so this is just for simpler example code -- the actual code should use bit masking to perform equivalent operations on the header bits, and ReadAcquire to read the header. uint8_t locked : 1; uint8_t discarded : 1; uint32_t length : 30; @@ -166,7 +132,7 @@ typedef struct _rb_record * @brief clear the ringbuffer. */ void rb_flush(uint64_t *cons_offset, const uint64_t *prod_offset) { - *cons_offset = *prod_offset; + WriteRelease64(cons_offset,ReadAcquire64(prod_offset)); } @@ -181,32 +147,28 @@ void *rb_prod; // Pointer to start of read-only producer pages. fd_t map_fd = bpf_obj_get(rb_map_name.c_str()); if (map_fd == ebpf_fd_invalid) return 1; -auto rb = ring_buffer__new( - map_fd, - NULL, //callback function for callback-based consumer - nullptr, nullptr); -if (!rb) return 1; - -// Initialize iocp wait handle. -HANDLE wait_handle = ebpf_ring_buffer_get_wait_handle(rb); +// Initialize wait handle for map. +HANDLE wait_handle = ebpf_ring_buffer_get_wait_handle(map_fd); if (!wait_handle) { // … log error … - goto Cleanup; + goto Exit; } // get pointers to the producer/consumer pages -int err = ebpf_ring_buffer_get_buffer(&rb_prod, &rb_cons); +int err = ebpf_ring_buffer_get_buffer(map_fd, &rb_prod, &rb_cons); if (err) { - goto Cleanup; + goto Exit; } const uint64_t *prod_offset = (const uint64_t*)rb_prod; // Producer offset ptr (r only). uint64_t *cons_offset = (uint64_t*)rb_cons; // Consumer offset ptr (r/w mapped). const uint8_t *rb_data = ((const uint8_t*)rb_prod) + PAGESIZE; // Double-mapped rb data ptr (r only). +uint64_t producer_offset = ReadAcquire64(prod_offset); +uint64_t consumer_offset = *cons_offset; // only one consumer so don't need ReadAcquire. // have_data used to track whether we should wait for notification or just keep reading. -bool have_data = *prod_offset > *cons_offset; +bool have_data = producer_offset > consumer_offset; void *lp_ctx = NULL; OVERLAPPED *overlapped = NULL; @@ -215,7 +177,7 @@ DWORD bytesTransferred = 0; // Now loop until error. For(;;) { if (!have_data) { // Only wait if we have already caught up. - // Wait for rb to notify -- or we could spin/poll on *prod_offset > *cons_offset. + // Wait for rb to notify -- or we could spin/poll until *prod_offset > *cons_offset. DWORD wait_status = WaitForSingleObject(wait_handle, INFINITE); if (wait_status != WAIT_OBJECT_0) { // No notification @@ -224,15 +186,14 @@ For(;;) { // … log error … break; } - have_data = *prod_offset > *cons_offset; // It's possible we still have data. + producer_offset = ReadAcquire64(prod_offset); + have_data = producer_offset > consumer_offset; // It's possible we still have data. if (!have_data) continue; } else { // We got notified of new data. have_data = true; } } - uint64_t prod = *prod_offset; - uint64_t cons = *cons_offset; - uint64_t remaining = prod - cons; + uint64_t remaining = producer_offset - consumer_offset; if (remaining == 0) { have_data = false; // Caught up to producer. @@ -244,30 +205,27 @@ For(;;) { } // Check header flags first, then read/skip data and update offset. - rb_header_t header = *(rb_header_t*)(&rb_data[cons % rb_size]); - if (header.locked) { // Next record not ready yet, wait on iocp. + rb_header_t header = (rb_header_t)(&rb_data[consumer_offset % rb_size]); + if (header.locked) { // Next record not ready yet, wait. have_data = false; continue; - // Or we could spin/poll on ((rb_header_t*)(&rb_data[cons % rb_size]))->locked. + // Or we could spin/poll on ((rb_header_t*)(&rb_data[consumer_offset % rb_size]))->locked. } if (!header.discarded) { - const rb_record_t *record = *(const rb_record_t*)(&rb_data[cons % rb_size]); + const rb_record_t *record = *(const rb_record_t*)(&rb_data[consumer_offset % rb_size]); // Read data from record->data[0 ... record->length-1]. // … business logic … } // Else it was discarded, skip and continue. // Update consumer offset (and pad record length to multiple of 8). - cons += sizeof(rb_header_t) + (record->length + 7 & ~7); - *cons_offset = cons; + consumer_offset += sizeof(rb_header_t) + (record->length + 7 & ~7); + WriteRelease64(cons_offset, consumer_offset); } -Cleanup: - -// Close ringbuffer. -ring_buffer__free(rb); +Exit: ``` -### Simplified blocking ringbuf consumer +### Simplified polling ringbuf consumer This consumer uses some possible helpers to simplify the above logic (might also want timeout). @@ -285,8 +243,8 @@ for(;;) { // … Do record handling here … } // 3 cases for err: - // 1) Ringbuf empty - Wait with epoll, or poll for !rb__empty(prod,cons). - // 2) Record locked - Wait with epoll, or spin/poll on header lock bit. + // 1) Ringbuf empty - Wait on handle, or poll for !rb__empty(prod,cons). + // 2) Record locked - Wait on handle, or spin/poll on header lock bit. // 3) Corrupt record or consumer offset - Break (could flush to continue reading from next good record). if (err!=E_EMPTY && err!=E_LOCKED) { // … log error …