Skip to content

Commit

Permalink
Merge pull request #229 from lightpanda-io/cancellation
Browse files Browse the repository at this point in the history
Cancellation
  • Loading branch information
krichprollsch authored Sep 19, 2024
2 parents ade128f + 054e20c commit f2a6e94
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 5 deletions.
60 changes: 58 additions & 2 deletions src/loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub const IO = @import("tigerbeetle-io").IO;
const public = @import("api.zig");
const JSCallback = public.Callback;

const log = std.log.scoped(.loop);

fn report(comptime fmt: []const u8, args: anytype) void {
const max_len = 200;
var buf: [max_len]u8 = undefined;
Expand Down Expand Up @@ -113,7 +115,13 @@ pub const SingleThreaded = struct {
defer ctx.loop.freeCbk(completion, ctx);

// TODO: return the error to the callback
result catch |err| @panic(@errorName(err));
result catch |err| {
switch (err) {
error.Canceled => {},
else => log.err("timeout callback: {any}", .{err}),
}
return;
};

const old_events_nb = ctx.loop.removeEvent();
if (builtin.is_test) {
Expand All @@ -129,7 +137,7 @@ pub const SingleThreaded = struct {
}
}

pub fn timeout(self: *Self, nanoseconds: u63, js_cbk: ?JSCallback) void {
pub fn timeout(self: *Self, nanoseconds: u63, js_cbk: ?JSCallback) usize {
const completion = self.alloc.create(IO.Completion) catch unreachable;
completion.* = undefined;
const ctx = self.alloc.create(ContextTimeout) catch unreachable;
Expand All @@ -142,6 +150,54 @@ pub const SingleThreaded = struct {
if (builtin.is_test) {
report("start timeout {d} for {d} nanoseconds", .{ old_events_nb + 1, nanoseconds });
}

return @intFromPtr(completion);
}

const ContextCancel = struct {
loop: *Self,
js_cbk: ?JSCallback,
};

fn cancelCallback(
ctx: *ContextCancel,
completion: *IO.Completion,
result: IO.CancelError!void,
) void {
defer ctx.loop.freeCbk(completion, ctx);

// TODO: return the error to the callback
result catch |err| {
log.err("cancel callback: {any}", .{err});
return;
};

const old_events_nb = ctx.loop.removeEvent();
if (builtin.is_test) {
report("timeout done, remaining events: {d}", .{old_events_nb - 1});
}

// js callback
if (ctx.js_cbk) |js_cbk| {
defer js_cbk.deinit(ctx.loop.alloc);
js_cbk.call(null) catch {
ctx.loop.cbk_error = true;
};
}
}

pub fn cancel(self: *Self, id: usize, js_cbk: ?JSCallback) void {
const comp_cancel: *IO.Completion = @ptrFromInt(id);

const completion = self.alloc.create(IO.Completion) catch unreachable;
completion.* = undefined;
const ctx = self.alloc.create(ContextCancel) catch unreachable;
ctx.* = ContextCancel{
.loop = self,
.js_cbk = js_cbk,
};

self.io.cancel(*ContextCancel, ctx, cancelCallback, completion, comp_cancel);
}

// Yield
Expand Down
4 changes: 2 additions & 2 deletions src/tests/cbk_test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub const Window = struct {
) void {
const n: u63 = @intCast(milliseconds);
// TODO: check this value can be holded in u63
loop.timeout(n * std.time.ns_per_ms, callback);
_ = loop.timeout(n * std.time.ns_per_ms, callback);
}

pub fn _cbkAsyncWithJSArg(
Expand All @@ -64,7 +64,7 @@ pub const Window = struct {
) void {
const n: u63 = @intCast(milliseconds);
// TODO: check this value can be holded in u63
loop.timeout(n * std.time.ns_per_ms, callback);
_ = loop.timeout(n * std.time.ns_per_ms, callback);
}

pub fn _cbkAsyncWithNatArg(_: Window, callback: Callback) !void {
Expand Down
2 changes: 1 addition & 1 deletion vendor/tigerbeetle-io

0 comments on commit f2a6e94

Please sign in to comment.