Skip to content

Commit

Permalink
brotli: make sure stream encode on large inputs works as expected (#1…
Browse files Browse the repository at this point in the history
…0936)

Co-authored-by: nektro <[email protected]>
  • Loading branch information
nektro and nektro committed May 9, 2024
1 parent c378feb commit 93c0a37
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 17 deletions.
22 changes: 5 additions & 17 deletions src/bun.js/api/brotli.zig
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,8 @@ pub const BrotliEncoder = struct {
this.output_lock.lock();
defer this.output_lock.unlock();

if (this.output.items.len > 16 * 1024) {
defer this.output.items = "";
defer this.output.deinit(bun.default_allocator);
return JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator);
} else {
defer this.output.items = "";
defer this.output.deinit(bun.default_allocator);
return JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items);
}
defer this.output.clearRetainingCapacity();
return JSC.ArrayBuffer.create(this.globalThis, this.output.items, .Buffer);
}

pub fn runFromJSThread(this: *BrotliEncoder) void {
Expand Down Expand Up @@ -293,7 +286,7 @@ pub const BrotliEncoder = struct {
this.input.writeItem(input_to_queue) catch unreachable;
}
task.run();
return if (!is_last) .undefined else this.collectOutputValue();
return if (!is_last and this.output.items.len == 0) .undefined else this.collectOutputValue();
}

pub fn end(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue {
Expand Down Expand Up @@ -389,13 +382,8 @@ pub const BrotliDecoder = struct {
this.output_lock.lock();
defer this.output_lock.unlock();

if (this.output.items.len > 16 * 1024) {
defer this.output.clearRetainingCapacity();
return JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator);
} else {
defer this.output.clearRetainingCapacity();
return JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items);
}
defer this.output.clearRetainingCapacity();
return JSC.ArrayBuffer.create(this.globalThis, this.output.items, .Buffer);
}

pub fn runFromJSThread(this: *BrotliDecoder) void {
Expand Down
46 changes: 46 additions & 0 deletions test/js/node/http/node-http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,52 @@ it("can send brotli from Server and receive with fetch", async () => {
}
});

it("can send gzip from Server and receive with fetch", async () => {
try {
var server = createServer((req, res) => {
expect(req.url).toBe("/hello");
res.writeHead(200);
res.setHeader("content-encoding", "gzip");

const inputStream = new stream.Readable();
inputStream.push("Hello World");
inputStream.push(null);

inputStream.pipe(zlib.createGzip()).pipe(res);
});
const url = await listen(server);
const res = await fetch(new URL("/hello", url));
expect(await res.text()).toBe("Hello World");
} catch (e) {
throw e;
} finally {
server.close();
}
});

it("can send deflate from Server and receive with fetch", async () => {
try {
var server = createServer((req, res) => {
expect(req.url).toBe("/hello");
res.writeHead(200);
res.setHeader("content-encoding", "deflate");

const inputStream = new stream.Readable();
inputStream.push("Hello World");
inputStream.push(null);

inputStream.pipe(zlib.createDeflate()).pipe(res);
});
const url = await listen(server);
const res = await fetch(new URL("/hello", url));
expect(await res.text()).toBe("Hello World");
} catch (e) {
throw e;
} finally {
server.close();
}
});

it("can send brotli from Server and receive with Client", async () => {
try {
var server = createServer((req, res) => {
Expand Down
23 changes: 23 additions & 0 deletions test/js/node/zlib/zlib.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import * as buffer from "node:buffer";
import * as util from "node:util";
import { resolve } from "node:path";
import { tmpdirSync } from "harness";
import * as stream from "node:stream";

describe("zlib", () => {
it("should be able to deflate and inflate", () => {
Expand Down Expand Up @@ -180,4 +181,26 @@ describe("zlib.brotli", () => {
expect(actual).toEqual(expected);
}
});

it("streaming encode doesn't wait for entire input", async () => {
const createPRNG = seed => {
let state = seed ?? Math.floor(Math.random() * 0x7fffffff);
return () => (state = (1103515245 * state + 12345) % 0x80000000) / 0x7fffffff;
};
const readStream = new stream.Readable();
const brotliStream = zlib.createBrotliCompress();
const rand = createPRNG(1);
let all = [];

brotliStream.on("data", chunk => all.push(chunk.length));
brotliStream.on("end", () => expect(all).toEqual([11180, 13, 14, 13, 13, 13, 14]));

for (let i = 0; i < 50; i++) {
let buf = Buffer.alloc(1024 * 1024);
for (let j = 0; j < buf.length; j++) buf[j] = (rand() * 256) | 0;
readStream.push(buf);
}
readStream.push(null);
readStream.pipe(brotliStream);
}, 15_000);
});

0 comments on commit 93c0a37

Please sign in to comment.