Skip to content

Commit

Permalink
linux: add cancel support
Browse files Browse the repository at this point in the history
  • Loading branch information
krichprollsch committed May 21, 2024
1 parent 885fc61 commit 2f13b34
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 0 deletions.
22 changes: 22 additions & 0 deletions io/darwin.zig
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,28 @@ pub const IO = struct {
);
}

pub const CancelError = error{ NotFound, ExpirationInProgress } || os.UnexpectedError;

pub fn cancel(
self: *IO,
comptime Context: type,
context: Context,
comptime callback: fn (
context: Context,
completion: *Completion,
result: CancelError!void,
) void,
completion: *Completion,
cancel_completion: *Completion,
) void {
_ = self;
_ = context;
_ = callback;
_ = completion;
_ = cancel_completion;
// TODO implement cancellation w/ kqueue.
}

pub const TimeoutError = error{Canceled} || os.UnexpectedError;

pub fn timeout(
Expand Down
58 changes: 58 additions & 0 deletions io/linux.zig
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ pub const IO = struct {
op.offset,
);
},
.cancel => |op| {
linux.io_uring_prep_cancel(sqe, op.c, 0);
},
}
sqe.user_data = @intFromPtr(completion);
}
Expand Down Expand Up @@ -468,6 +471,22 @@ pub const IO = struct {
};
completion.callback(completion.context, completion, &result);
},
.cancel => {
const result: CancelError!void = blk: {
if (completion.result < 0) {
const err = switch (@as(os.E, @enumFromInt(-completion.result))) {
.SUCCESS => {},
.NOENT => error.NotFound,
.ALREADY => error.ExpirationInProgress,
else => |errno| os.unexpectedErrno(errno),
};
break :blk err;
} else {
break :blk;
}
};
completion.callback(completion.context, completion, &result);
},
}
}
};
Expand Down Expand Up @@ -507,6 +526,9 @@ pub const IO = struct {
buffer: []const u8,
offset: u64,
},
cancel: struct {
c: u64,
},
};

pub const AcceptError = error{
Expand Down Expand Up @@ -797,6 +819,42 @@ pub const IO = struct {
self.enqueue(completion);
}

pub const CancelError = error{ NotFound, ExpirationInProgress } || os.UnexpectedError;

pub fn cancel(
self: *IO,
comptime Context: type,
context: Context,
comptime callback: fn (
context: Context,
completion: *Completion,
result: CancelError!void,
) void,
completion: *Completion,
cancel_completion: *Completion,
) void {
completion.* = .{
.io = self,
.context = context,
.callback = struct {
fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
callback(
@as(Context, @ptrFromInt(@intFromPtr(ctx))),
comp,
@as(*const CancelError!void, @ptrFromInt(@intFromPtr(res))).*,
);
}
}.wrapper,
.operation = .{
.cancel = .{
.c = @intFromPtr(cancel_completion),
},
},
};

self.enqueue(completion);
}

pub const TimeoutError = error{Canceled} || os.UnexpectedError;

pub fn timeout(
Expand Down
59 changes: 59 additions & 0 deletions io/test.zig
Original file line number Diff line number Diff line change
Expand Up @@ -640,3 +640,62 @@ test "pipe data over socket" {
}
}.run();
}

test "cancel" {
try struct {
const Context = @This();

io: IO,
timeout_res: IO.TimeoutError!void = undefined,
timeout_done: bool = false,
cancel_done: bool = false,

fn run_test() !void {
var self: Context = .{
.io = try IO.init(32, 0),
};
defer self.io.deinit();

var completion: IO.Completion = undefined;
self.io.timeout(
*Context,
&self,
timeout_callback,
&completion,
100 * std.time.ns_per_ms,
);

var cancel_completion: IO.Completion = undefined;
self.io.cancel(
*Context,
&self,
cancel_callback,
&cancel_completion,
&completion,
);
while (!self.cancel_done and !self.timeout_done) try self.io.tick();

try testing.expectEqual(true, self.timeout_done);
try testing.expectEqual(true, self.cancel_done);
try testing.expectError(IO.TimeoutError.Canceled, self.timeout_res);
}

fn timeout_callback(
self: *Context,
_: *IO.Completion,
result: IO.TimeoutError!void,
) void {
self.timeout_res = result;
self.timeout_done = true;
}

fn cancel_callback(
self: *Context,
_: *IO.Completion,
result: IO.CancelError!void,
) void {
result catch |err| std.debug.panic("cancel error: {}", .{err});
self.cancel_done = true;
}
}.run_test();
}

0 comments on commit 2f13b34

Please sign in to comment.