diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 337d092d56e36..aed8a6d9533fa 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -443,6 +443,13 @@ pub const Subprocess = struct { .capture => Output.panic("TODO: implement capture support in Stdio readable", .{}), }; } + + if (comptime Environment.isPosix) { + if (stdio == .pipe) { + _ = bun.sys.setNonblocking(result.?); + } + } + return switch (stdio) { .inherit => Readable{ .inherit = {} }, .ignore => Readable{ .ignore = {} }, @@ -1262,6 +1269,13 @@ pub const Subprocess = struct { }, } } + + if (comptime Environment.isPosix) { + if (stdio == .pipe) { + _ = bun.sys.setNonblocking(result.?); + } + } + switch (stdio) { .dup2 => @panic("TODO dup2 stdio"), .pipe => { diff --git a/src/cli/filter_run.zig b/src/cli/filter_run.zig index c9a2c4275a59f..302c299b38def 100644 --- a/src/cli/filter_run.zig +++ b/src/cli/filter_run.zig @@ -92,9 +92,11 @@ pub const ProcessHandle = struct { if (Environment.isPosix) { if (spawned.stdout) |stdout| { + _ = bun.sys.setNonblocking(stdout); try handle.stdout.start(stdout, true).unwrap(); } if (spawned.stderr) |stderr| { + _ = bun.sys.setNonblocking(stderr); try handle.stderr.start(stderr, true).unwrap(); } } else { diff --git a/src/install/lifecycle_script_runner.zig b/src/install/lifecycle_script_runner.zig index 937ec2c223b8f..105a5ce3c52ee 100644 --- a/src/install/lifecycle_script_runner.zig +++ b/src/install/lifecycle_script_runner.zig @@ -196,6 +196,7 @@ pub const LifecycleScriptSubprocess = struct { if (spawned.stdout) |stdout| { if (!spawned.memfds[1]) { this.stdout.setParent(this); + _ = bun.sys.setNonblocking(stdout); this.remaining_fds += 1; try this.stdout.start(stdout, true).unwrap(); } else { @@ -206,6 +207,7 @@ pub const LifecycleScriptSubprocess = struct { if (spawned.stderr) |stderr| { if (!spawned.memfds[2]) { this.stderr.setParent(this); + _ = bun.sys.setNonblocking(stderr); this.remaining_fds += 1; try this.stderr.start(stderr, true).unwrap(); } else { diff --git a/src/sys.zig b/src/sys.zig index 479a319231172..035e9b8bb62ec 100644 --- a/src/sys.zig +++ b/src/sys.zig @@ -592,10 +592,11 @@ pub fn mkdirOSPath(file_path: bun.OSPathSliceZ, flags: bun.Mode) Maybe(void) { }; } -pub fn fcntl(fd: bun.FileDescriptor, cmd: i32, arg: usize) Maybe(usize) { +const fnctl_int = if (Environment.isLinux) usize else c_int; +pub fn fcntl(fd: bun.FileDescriptor, cmd: i32, arg: fnctl_int) Maybe(fnctl_int) { const result = fcntl_symbol(fd.cast(), cmd, arg); - if (Maybe(usize).errnoSys(result, .fcntl)) |err| return err; - return .{ .result = @as(usize, @intCast(result)) }; + if (Maybe(fnctl_int).errnoSys(result, .fcntl)) |err| return err; + return .{ .result = @intCast(result) }; } pub fn getErrno(rc: anytype) bun.C.E { @@ -2277,6 +2278,26 @@ pub fn directoryExistsAt(dir_: anytype, subpath: anytype) JSC.Maybe(bool) { return faccessat(dir_fd, subpath); } +pub fn setNonblocking(fd: bun.FileDescriptor) Maybe(void) { + const flags = switch (bun.sys.fcntl( + fd, + std.os.F.GETFL, + 0, + )) { + .result => |f| f, + .err => |err| return .{ .err = err }, + }; + + const new_flags = flags | std.os.O.NONBLOCK; + + switch (bun.sys.fcntl(fd, std.os.F.SETFL, new_flags)) { + .err => |err| return .{ .err = err }, + .result => {}, + } + + return Maybe(void).success; +} + pub fn existsAt(fd: bun.FileDescriptor, subpath: [:0]const u8) bool { if (comptime Environment.isPosix) { return faccessat(fd, subpath).result; @@ -2556,7 +2577,7 @@ pub fn linkatTmpfile(tmpfd: bun.FileDescriptor, dirfd: bun.FileDescriptor, name: if (Maybe(void).errnoSysFd(rc, .link, tmpfd)) |err| { switch (err.getErrno()) { .INTR => continue, - .NOENT, .OPNOTSUPP, .PERM, .INVAL => { + .ISDIR, .NOENT, .OPNOTSUPP, .PERM, .INVAL => { // CAP_DAC_READ_SEARCH is required to linkat with an empty path. if (current_status == 0) { CAP_DAC_READ_SEARCH.status.store(-1, .Monotonic); diff --git a/test/regression/issue/011297.fixture.ts b/test/regression/issue/011297.fixture.ts new file mode 100644 index 0000000000000..e3205a1f7f3aa --- /dev/null +++ b/test/regression/issue/011297.fixture.ts @@ -0,0 +1,60 @@ +const string = Buffer.alloc(1024 * 1024, "zombo.com\n").toString(); +process.exitCode = 1; +const proc = Bun.spawn({ + cmd: ["cat"], + stdio: ["pipe", "pipe", "inherit"], +}); + +const writer = (async function () { + console.time("Sent " + string.length + " bytes x 10"); + for (let i = 0; i < 10; i += 1) { + // TODO: investigate if the need for this "await" is a bug. + // I believe FileSink should be buffering internally. + // + // To reproduce: + // + // 1. Remove "await" from proc.stdin.write(string) (keep the .end() await) + // 2. Run `hyperfine "bun test/regression/issue/011297.fixture.ts"` (or run this many times on macOS.) + // + await proc.stdin.write(string); + } + await proc.stdin.end(); + console.timeEnd("Sent " + string.length + " bytes x 10"); +})(); + +const reader = (async function () { + console.time("Read " + string.length + " bytes x 10"); + + const chunks = []; + for await (const chunk of proc.stdout) { + chunks.push(chunk); + } + + console.timeEnd("Read " + string.length + " bytes x 10"); + + return chunks; +})(); + +const [chunks, exitCode] = await Promise.all([reader, proc.exited, writer]); +const combined = Buffer.concat(chunks).toString().trim(); +if (combined !== string.repeat(10)) { + await Bun.write("a.txt", string.repeat(10)); + await Bun.write("b.txt", combined); + throw new Error(`string mismatch! + exit code: ${exitCode} + + hash: + input ${Bun.SHA1.hash(string.repeat(10), "hex")} + output: ${Bun.SHA1.hash(combined, "hex")} + length: + input ${string.length * 10} + output: ${combined.length} + +`); +} + +if (exitCode !== 0) { + throw new Error("process exited with non-zero code"); +} +console.timeEnd("Read " + string.length + " bytes x 10"); +process.exitCode = 0; diff --git a/test/regression/issue/011297.test.ts b/test/regression/issue/011297.test.ts new file mode 100644 index 0000000000000..7238ad7e3c488 --- /dev/null +++ b/test/regression/issue/011297.test.ts @@ -0,0 +1,8 @@ +import { test, expect } from "bun:test"; +import { bunExe, isWindows } from "harness"; +import { join } from "path"; +import "harness"; + +test("issue #11297", async () => { + expect([join(import.meta.dir, "./011297.fixture.ts")]).toRun(); +});