Skip to content

Commit

Permalink
object: graph: replace switching-consumer pattern with dedicated grap…
Browse files Browse the repository at this point in the history
…h processing thread

Signed-off-by: Stephen Gutekanst <[email protected]>
  • Loading branch information
slimsag committed Nov 30, 2024
1 parent d72facd commit 60df3e7
Showing 1 changed file with 20 additions and 22 deletions.
42 changes: 20 additions & 22 deletions src/graph.zig
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ const Op = union(enum) {
/// graph as /operations/ enqueued to a lock-free Multi Producer, Single Consumer (MPSC) FIFO queue.
///
/// When an operation is desired (adding a parent to a child, querying the children or parent of a
/// node, etc.) it is enqueued. Then, if the queue contains entries, that thread becomes the
/// consumer of the MPSC queue temporarily and processes all pending operations in the queue.
/// node, etc.) it is enqueued. Then, a background thread processes all pending operations. Atomics
/// are used to wait for reads to complete, and parallel writes are lock-free.
///
/// The graph uses lock-free pools to manage all nodes internally, eliminating runtime allocations
/// during operation processing.
Expand All @@ -130,7 +130,15 @@ pub const Graph = struct {

preallocate_result_list_size: u32,

/// Thread that processes operations from the queue
thread: ?std.Thread = null,

/// Flag to signal the processing thread to stop
should_stop: std.atomic.Value(bool) = .init(false),

/// Initialize the graph with the given pre-allocated space for nodes and operations.
///
/// Spawns a backgroound thread for processing operations to the graph.
pub fn init(
graph: *Graph,
allocator: std.mem.Allocator,
Expand Down Expand Up @@ -173,9 +181,13 @@ pub const Graph = struct {
try list.ensureTotalCapacity(allocator, preallocate.result_list_size);
try graph.result_lists.available.append(allocator, list);
}

graph.thread = try std.Thread.spawn(.{ .allocator = allocator }, processThread, .{ graph, allocator });
}

pub fn deinit(graph: *Graph, allocator: std.mem.Allocator) void {
graph.should_stop.store(true, .release);
graph.thread.?.join();
for (graph.result_lists.available.items) |list| {
list.deinit(allocator);
allocator.destroy(list);
Expand All @@ -193,20 +205,12 @@ pub const Graph = struct {
return graph.id_to_node.map.get(id);
}

/// Tries to take all queued operations to the graph and, if successful, processes them.
///
/// A different thread which calls processQueue() may beat us to acquiring all of the queued
/// operations, in which case this function may return before they are processed.
fn processQueue(graph: *Graph, allocator: std.mem.Allocator) void {
if (graph.queue.takeAll()) |nodes| {
defer graph.queue.releaseAll(nodes);

// Process the entire chain of nodes
var current: ?*Queue(Op).Node = nodes;
while (current) |node| {
graph.processOp(allocator, node.value);
current = node.next;
}
/// The thread that runs continuously in the background to process queue submissions.
fn processThread(graph: *Graph, allocator: std.mem.Allocator) void {
while (!graph.should_stop.load(.acquire)) {
// Process the entire queue
while (graph.queue.pop()) |op| graph.processOp(allocator, op);
std.Thread.yield() catch {};
}
}

Expand Down Expand Up @@ -316,7 +320,6 @@ pub const Graph = struct {
.parent_id = parent_id,
.child_id = child_id,
} });
graph.processQueue(allocator);
}

pub fn removeChild(graph: *Graph, allocator: std.mem.Allocator, parent_id: u64, child_id: u64) Error!void {
Expand All @@ -325,7 +328,6 @@ pub const Graph = struct {
.parent_id = parent_id,
.child_id = child_id,
} });
graph.processQueue(allocator);
}

pub fn setParent(graph: *Graph, allocator: std.mem.Allocator, child_id: u64, parent_id: u64) Error!void {
Expand All @@ -335,14 +337,12 @@ pub const Graph = struct {
.child_id = child_id,
.parent_id = parent_id,
} });
graph.processQueue(allocator);
}

pub fn removeParent(graph: *Graph, allocator: std.mem.Allocator, child_id: u64) Error!void {
try graph.queue.push(allocator, .{ .remove_parent = .{
.child_id = child_id,
} });
graph.processQueue(allocator);
}

const Results = struct {
Expand Down Expand Up @@ -374,7 +374,6 @@ pub const Graph = struct {
} });

while (!done.load(.acquire)) {
graph.processQueue(allocator);
std.Thread.yield() catch {};
}

Expand Down Expand Up @@ -429,7 +428,6 @@ pub const Graph = struct {
} });

while (!done.load(.acquire)) {
graph.processQueue(allocator);
std.Thread.yield() catch {};
}

Expand Down

0 comments on commit 60df3e7

Please sign in to comment.