diff --git a/pkg/commands/mod.ts b/pkg/commands/mod.ts index d7af8779..3c214137 100644 --- a/pkg/commands/mod.ts +++ b/pkg/commands/mod.ts @@ -18,8 +18,8 @@ export * from "./flushall"; export * from "./flushdb"; export * from "./geo_add"; export * from "./geo_dist"; -export * from "./geo_pos"; export * from "./geo_hash"; +export * from "./geo_pos"; export * from "./geo_search"; export * from "./geo_search_store"; export * from "./get"; @@ -86,9 +86,9 @@ export * from "./msetnx"; export * from "./persist"; export * from "./pexpire"; export * from "./pexpireat"; -export * from './pfadd'; -export * from './pfcount'; -export * from './pfmerge'; +export * from "./pfadd"; +export * from "./pfcount"; +export * from "./pfmerge"; export * from "./ping"; export * from "./psetex"; export * from "./pttl"; @@ -131,7 +131,11 @@ export * from "./ttl"; export * from "./type"; export * from "./unlink"; export * from "./xadd"; +export * from "./xdel"; +export * from "./xlen"; export * from "./xrange"; +export * from "./xrevrange"; +export * from "./xtrim"; export * from "./zadd"; export * from "./zcard"; export * from "./zcount"; diff --git a/pkg/commands/xdel.test.ts b/pkg/commands/xdel.test.ts new file mode 100644 index 00000000..e07b656c --- /dev/null +++ b/pkg/commands/xdel.test.ts @@ -0,0 +1,66 @@ +import { keygen, newHttpClient } from "../test-utils"; + +import { afterAll, describe, expect, test } from "bun:test"; +import { XAddCommand } from "./xadd"; +import { XDelCommand } from "./xdel"; +import { XRangeCommand } from "./xrange"; + +const client = newHttpClient(); + +const { newKey, cleanup } = keygen(); +afterAll(cleanup); + +describe("XDEL", () => { + test("should delete one item from the stream", async () => { + const key = newKey(); + await new XAddCommand([key, "*", { name: "Jane", surname: "Austen" }]).exec( + client + ); + + const res = await new XAddCommand([ + key, + "*", + { name: "Toni", surname: "Morrison" }, + ]).exec(client); + + const xdelRes = await new XDelCommand([key, res]).exec(client); + const xrangeRes = await new XRangeCommand([key, "-", "+", 1]).exec(client); + + expect(Object.keys(xrangeRes).length).toBe(1); + expect(xdelRes).toBe(1); + }); + + test("should delete multiple items from the stream", async () => { + const key = newKey(); + + const id1 = await new XAddCommand([ + key, + "*", + { name: "Jane", surname: "Austen" }, + ]).exec(client); + + const id2 = await new XAddCommand([ + key, + "*", + { name: "Toni", surname: "Morrison" }, + ]).exec(client); + + const id3 = await new XAddCommand([ + key, + "*", + { name: "Agatha", surname: "Christie" }, + ]).exec(client); + + await new XAddCommand([ + key, + "*", + { name: "Ngozi", surname: "Adichie" }, + ]).exec(client); + + const xdelRes = await new XDelCommand([key, [id1, id2, id3]]).exec(client); + const xrangeRes = await new XRangeCommand([key, "-", "+", 1]).exec(client); + + expect(Object.keys(xrangeRes).length).toBe(1); + expect(xdelRes).toBe(3); + }); +}); diff --git a/pkg/commands/xdel.ts b/pkg/commands/xdel.ts new file mode 100644 index 00000000..c9e6fd87 --- /dev/null +++ b/pkg/commands/xdel.ts @@ -0,0 +1,14 @@ +import { Command, CommandOptions } from "./command"; + +/** + * @see https://redis.io/commands/xdel + */ +export class XDelCommand extends Command { + constructor( + [key, ids]: [key: string, ids: string[] | string], + opts?: CommandOptions + ) { + const cmds = Array.isArray(ids) ? [...ids] : [ids]; + super(["XDEL", key, ...cmds], opts); + } +} diff --git a/pkg/commands/xlen.test.ts b/pkg/commands/xlen.test.ts new file mode 100644 index 00000000..6e938071 --- /dev/null +++ b/pkg/commands/xlen.test.ts @@ -0,0 +1,39 @@ +import { keygen, newHttpClient } from "../test-utils"; + +import { afterAll, describe, expect, test } from "bun:test"; +import { XAddCommand } from "./xadd"; +import { XLenCommand } from "./xlen"; + +const client = newHttpClient(); + +const { newKey, cleanup } = keygen(); +afterAll(cleanup); + +describe("XLEN", () => { + test("should give size of the stream", async () => { + const key = newKey(); + await new XAddCommand([key, "*", { name: "Jane", surname: "Austen" }]).exec( + client + ); + await new XAddCommand([ + key, + "*", + { name: "Toni", surname: "Morrison" }, + ]).exec(client); + + await new XAddCommand([ + key, + "*", + { name: "Hezarfen", surname: "----" }, + ]).exec(client); + + const res = await new XLenCommand([key]).exec(client); + + expect(res).toBe(3); + }); + + test("should return 0 when specified key does not exist", async () => { + const res = await new XLenCommand(["missing-key"]).exec(client); + expect(res).toBe(0); + }); +}); diff --git a/pkg/commands/xlen.ts b/pkg/commands/xlen.ts new file mode 100644 index 00000000..01b5a316 --- /dev/null +++ b/pkg/commands/xlen.ts @@ -0,0 +1,10 @@ +import { Command, CommandOptions } from "./command"; + +/** + * @see https://redis.io/commands/xlen + */ +export class XLenCommand extends Command { + constructor(cmd: [key: string], opts?: CommandOptions) { + super(["XLEN", ...cmd], opts); + } +} diff --git a/pkg/commands/xrange.test.ts b/pkg/commands/xrange.test.ts index 33e2f49f..fbb77817 100644 --- a/pkg/commands/xrange.test.ts +++ b/pkg/commands/xrange.test.ts @@ -18,7 +18,11 @@ describe("without options", () => { const field2 = "field2"; const member2 = randomID(); - await new XAddCommand([key, "*", { [field1]: member1, [field2]: member2 }]).exec(client); + await new XAddCommand([ + key, + "*", + { [field1]: member1, [field2]: member2 }, + ]).exec(client); const res = await new XRangeCommand([key, "-", "+"]).exec(client); expect(Object.keys(res).length).toBe(1); @@ -49,7 +53,7 @@ describe("limit", () => { }); }); -test("many fields", () => { +describe("many fields", () => { test("returns all fields", async () => { const key = newKey(); diff --git a/pkg/commands/xrevrange.test.ts b/pkg/commands/xrevrange.test.ts new file mode 100644 index 00000000..19f2c3d3 --- /dev/null +++ b/pkg/commands/xrevrange.test.ts @@ -0,0 +1,62 @@ +import { keygen, newHttpClient } from "../test-utils"; + +import { afterAll, beforeEach, describe, expect, test } from "bun:test"; +import { XAddCommand } from "./xadd"; +import { XRevRangeCommand } from "./xrevrange"; + +const client = newHttpClient(); +const { newKey, cleanup } = keygen(); +const key = newKey(); +afterAll(cleanup); + +beforeEach(async () => { + await new XAddCommand([ + key, + "*", + { name: "Virginia", surname: "Woolf" }, + ]).exec(client); + + await new XAddCommand([key, "*", { name: "Jane", surname: "Austen" }]).exec( + client + ); + + await new XAddCommand([key, "*", { name: "Toni", surname: "Morrison" }]).exec( + client + ); + + await new XAddCommand([ + key, + "*", + { name: "Agatha", surname: "Christie" }, + ]).exec(client); + + await new XAddCommand([key, "*", { name: "Ngozi", surname: "Adichie" }]).exec( + client + ); +}); + +describe("without options", () => { + test("should return stream in a reverse order", async () => { + const res = await new XRevRangeCommand([key, "+", "-"]).exec(client); + + expect(Object.keys(res).length).toBe(5); + expect(Object.values(res)[0]).toEqual({ + name: "Ngozi", + surname: "Adichie", + }); + }); +}); + +describe("LIMIT", () => { + test("should return only last two", async () => { + const res = await new XRevRangeCommand([key, "+", "-", 2]).exec(client); + expect(Object.keys(res).length).toBe(2); + expect(Object.values(res)).toEqual([ + { + name: "Ngozi", + surname: "Adichie", + }, + { name: "Agatha", surname: "Christie" }, + ]); + }); +}); diff --git a/pkg/commands/xrevrange.ts b/pkg/commands/xrevrange.ts new file mode 100644 index 00000000..8ec5c256 --- /dev/null +++ b/pkg/commands/xrevrange.ts @@ -0,0 +1,51 @@ +import { Command, CommandOptions } from "./command"; + +export class XRevRangeCommand< + TData extends Record> +> extends Command { + constructor( + [key, end, start, count]: [ + key: string, + end: string, + start: string, + count?: number + ], + opts?: CommandOptions + ) { + const command: unknown[] = ["XREVRANGE", key, end, start]; + if (typeof count === "number") { + command.push("COUNT", count); + } + super(command, { + deserialize: (result) => deserialize(result as any), + ...opts, + }); + } +} + +function deserialize>>( + result: [string, string[]][] +): TData { + const obj: Record> = {}; + for (const e of result) { + while (e.length >= 2) { + const streamId = e.shift() as string; + const entries = e.shift()!; + + if (!(streamId in obj)) { + obj[streamId] = {}; + } + while (entries.length >= 2) { + const field = (entries as string[]).shift()! as string; + const value = (entries as string[]).shift()! as string; + + try { + obj[streamId][field] = JSON.parse(value); + } catch { + obj[streamId][field] = value; + } + } + } + } + return obj as TData; +} diff --git a/pkg/commands/xtrim.test.ts b/pkg/commands/xtrim.test.ts new file mode 100644 index 00000000..e633bdf1 --- /dev/null +++ b/pkg/commands/xtrim.test.ts @@ -0,0 +1,107 @@ +import { keygen, newHttpClient, randomID } from "../test-utils"; + +import { afterAll, describe, expect, test } from "bun:test"; +import { XAddCommand } from "./xadd"; +import { XLenCommand } from "./xlen"; +import { XTrimCommand } from "./xtrim"; + +const client = newHttpClient(); + +const { newKey, cleanup } = keygen(); +afterAll(cleanup); + +describe("XLEN", () => { + test( + "should approximately trim stream to 300 items", + async () => { + const key = newKey(); + + const promises = []; + for (let i = 1; i <= 10000; i++) { + promises.push( + new XAddCommand([key, "*", { [randomID()]: randomID() }]).exec(client) + ); + } + await Promise.all(promises); + + await new XTrimCommand([ + key, + { strategy: "MAXLEN", threshold: 300, exactness: "~" }, + ]).exec(client); + + const len = await new XLenCommand([key]).exec(client); + + expect(len).toBeGreaterThanOrEqual(290); + expect(len).toBeLessThanOrEqual(310); + }, + { timeout: 1000 * 60 } + ); + + test("should trim with zero threshold and remove everything", async () => { + const key = newKey(); + + const promises = []; + for (let i = 1; i <= 50; i++) { + promises.push( + new XAddCommand([key, "*", { [randomID()]: randomID() }]).exec(client) + ); + } + await Promise.all(promises); + + await new XTrimCommand([ + key, + { strategy: "MAXLEN", threshold: 0, exactness: "=" }, + ]).exec(client); + + const len = await new XLenCommand([key]).exec(client); + expect(len).toBeLessThanOrEqual(1); + }); + + test( + "should trim with MINID and a limit and only remove 10 items that satisfies MINID", + async () => { + const key = newKey(); + const baseTimestamp = Date.now(); + + for (let i = 0; i < 100; i++) { + const id = `${baseTimestamp}-${i}`; + await new XAddCommand([key, id, { data: `value${i}` }]).exec(client); + } + + const midRangeId = `${baseTimestamp}-50`; + + await new XTrimCommand([ + key, + { strategy: "MINID", threshold: midRangeId, limit: 10 }, + ]).exec(client); + + const len = await new XLenCommand([key]).exec(client); + expect(len).toBeLessThanOrEqual(100); + }, + { timeout: 20000 } + ); + + test( + "should trim with MINID and a without limit and delete half of the elements", + async () => { + const key = newKey(); + const baseTimestamp = Date.now(); + + for (let i = 0; i < 100; i++) { + const id = `${baseTimestamp}-${i}`; + await new XAddCommand([key, id, { data: `value${i}` }]).exec(client); + } + + const midRangeId = `${baseTimestamp}-50`; + + await new XTrimCommand([ + key, + { strategy: "MINID", threshold: midRangeId }, + ]).exec(client); + + const len = await new XLenCommand([key]).exec(client); + expect(len).toBeLessThanOrEqual(50); + }, + { timeout: 20000 } + ); +}); diff --git a/pkg/commands/xtrim.ts b/pkg/commands/xtrim.ts new file mode 100644 index 00000000..986c009b --- /dev/null +++ b/pkg/commands/xtrim.ts @@ -0,0 +1,33 @@ +import { Command, CommandOptions } from "./command"; + +/** + * @see https://redis.io/commands/xtrim + */ + +type XTrimOptions = { + strategy: "MAXLEN" | "MINID"; + exactness?: "~" | "="; + threshold: number | string; + limit?: number; +}; + +export class XTrimCommand extends Command { + constructor( + [key, options]: [key: string, options: XTrimOptions], + opts?: CommandOptions + ) { + const { limit, strategy, threshold, exactness = "~" } = options; + + super( + [ + "XTRIM", + key, + strategy, + exactness, + threshold, + ...(limit ? ["LIMIT", limit] : []), + ], + opts + ); + } +} diff --git a/pkg/pipeline.ts b/pkg/pipeline.ts index 3e65f355..a31ad930 100644 --- a/pkg/pipeline.ts +++ b/pkg/pipeline.ts @@ -133,6 +133,12 @@ import { TtlCommand, TypeCommand, UnlinkCommand, + XAddCommand, + XDelCommand, + XLenCommand, + XRangeCommand, + XRevRangeCommand, + XTrimCommand, ZAddCommand, ZAddCommandOptions, ZCardCommand, @@ -992,6 +998,42 @@ export class Pipeline[] = []> { ); }; + /** + * @see https://redis.io/commands/xadd + */ + xadd = (...args: CommandArgs) => + this.chain(new XAddCommand(args, this.commandOptions)); + + /** + * @see https://redis.io/commands/xdel + */ + xdel = (...args: CommandArgs) => + this.chain(new XDelCommand(args, this.commandOptions)); + + /** + * @see https://redis.io/commands/xlen + */ + xlen = (...args: CommandArgs) => + this.chain(new XLenCommand(args, this.commandOptions)); + + /** + * @see https://redis.io/commands/xtrim + */ + xtrim = (...args: CommandArgs) => + this.chain(new XTrimCommand(args, this.commandOptions)); + + /** + * @see https://redis.io/commands/xrange + */ + xrange = (...args: CommandArgs) => + this.chain(new XRangeCommand(args, this.commandOptions)); + + /** + * @see https://redis.io/commands/xrevrange + */ + xrevrange = (...args: CommandArgs) => + this.chain(new XRevRangeCommand(args, this.commandOptions)); + /** * @see https://redis.io/commands/zcard */ diff --git a/pkg/redis.ts b/pkg/redis.ts index 8c297464..930677e2 100644 --- a/pkg/redis.ts +++ b/pkg/redis.ts @@ -88,10 +88,10 @@ import { PExpireCommand, PSetEXCommand, PTtlCommand, + PersistCommand, PfAddCommand, PfCountCommand, PfMergeCommand, - PersistCommand, PingCommand, PublishCommand, RPopCommand, @@ -134,7 +134,11 @@ import { TypeCommand, UnlinkCommand, XAddCommand, + XDelCommand, + XLenCommand, XRangeCommand, + XRevRangeCommand, + XTrimCommand, ZAddCommand, ZAddCommandOptions, ZCardCommand, @@ -1094,12 +1098,36 @@ export class Redis { xadd = (...args: CommandArgs) => new XAddCommand(args, this.opts).exec(this.client); + /** + * @see https://redis.io/commands/xdel + */ + xdel = (...args: CommandArgs) => + new XDelCommand(args, this.opts).exec(this.client); + + /** + * @see https://redis.io/commands/xlen + */ + xlen = (...args: CommandArgs) => + new XLenCommand(args, this.opts).exec(this.client); + + /** + * @see https://redis.io/commands/xtrim + */ + xtrim = (...args: CommandArgs) => + new XTrimCommand(args, this.opts).exec(this.client); + /** * @see https://redis.io/commands/xrange */ xrange = (...args: CommandArgs) => new XRangeCommand(args, this.opts).exec(this.client); + /** + * @see https://redis.io/commands/xrevrange + */ + xrevrange = (...args: CommandArgs) => + new XRevRangeCommand(args, this.opts).exec(this.client); + /** * @see https://redis.io/commands/zadd */