Skip to content

Commit

Permalink
DX 532 (#785)
Browse files Browse the repository at this point in the history
* Add XREVRANGE command and missing commands to pipeline

* Add XDEL command

* Add XLEN command

* Add XTRIM command
  • Loading branch information
ogzhanolguncu authored Dec 15, 2023
1 parent d9c08ac commit fc26789
Show file tree
Hide file tree
Showing 12 changed files with 467 additions and 7 deletions.
12 changes: 8 additions & 4 deletions pkg/commands/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ export * from "./flushall";
export * from "./flushdb";
export * from "./geo_add";
export * from "./geo_dist";
export * from "./geo_pos";
export * from "./geo_hash";
export * from "./geo_pos";
export * from "./geo_search";
export * from "./geo_search_store";
export * from "./get";
Expand Down Expand Up @@ -86,9 +86,9 @@ export * from "./msetnx";
export * from "./persist";
export * from "./pexpire";
export * from "./pexpireat";
export * from './pfadd';
export * from './pfcount';
export * from './pfmerge';
export * from "./pfadd";
export * from "./pfcount";
export * from "./pfmerge";
export * from "./ping";
export * from "./psetex";
export * from "./pttl";
Expand Down Expand Up @@ -131,7 +131,11 @@ export * from "./ttl";
export * from "./type";
export * from "./unlink";
export * from "./xadd";
export * from "./xdel";
export * from "./xlen";
export * from "./xrange";
export * from "./xrevrange";
export * from "./xtrim";
export * from "./zadd";
export * from "./zcard";
export * from "./zcount";
Expand Down
66 changes: 66 additions & 0 deletions pkg/commands/xdel.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { keygen, newHttpClient } from "../test-utils";

import { afterAll, describe, expect, test } from "bun:test";
import { XAddCommand } from "./xadd";
import { XDelCommand } from "./xdel";
import { XRangeCommand } from "./xrange";

const client = newHttpClient();

const { newKey, cleanup } = keygen();
afterAll(cleanup);

describe("XDEL", () => {
test("should delete one item from the stream", async () => {
const key = newKey();
await new XAddCommand([key, "*", { name: "Jane", surname: "Austen" }]).exec(
client
);

const res = await new XAddCommand([
key,
"*",
{ name: "Toni", surname: "Morrison" },
]).exec(client);

const xdelRes = await new XDelCommand([key, res]).exec(client);
const xrangeRes = await new XRangeCommand([key, "-", "+", 1]).exec(client);

expect(Object.keys(xrangeRes).length).toBe(1);
expect(xdelRes).toBe(1);
});

test("should delete multiple items from the stream", async () => {
const key = newKey();

const id1 = await new XAddCommand([
key,
"*",
{ name: "Jane", surname: "Austen" },
]).exec(client);

const id2 = await new XAddCommand([
key,
"*",
{ name: "Toni", surname: "Morrison" },
]).exec(client);

const id3 = await new XAddCommand([
key,
"*",
{ name: "Agatha", surname: "Christie" },
]).exec(client);

await new XAddCommand([
key,
"*",
{ name: "Ngozi", surname: "Adichie" },
]).exec(client);

const xdelRes = await new XDelCommand([key, [id1, id2, id3]]).exec(client);
const xrangeRes = await new XRangeCommand([key, "-", "+", 1]).exec(client);

expect(Object.keys(xrangeRes).length).toBe(1);
expect(xdelRes).toBe(3);
});
});
14 changes: 14 additions & 0 deletions pkg/commands/xdel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Command, CommandOptions } from "./command";

/**
* @see https://redis.io/commands/xdel
*/
export class XDelCommand extends Command<number, number> {
constructor(
[key, ids]: [key: string, ids: string[] | string],
opts?: CommandOptions<number, number>
) {
const cmds = Array.isArray(ids) ? [...ids] : [ids];
super(["XDEL", key, ...cmds], opts);
}
}
39 changes: 39 additions & 0 deletions pkg/commands/xlen.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { keygen, newHttpClient } from "../test-utils";

import { afterAll, describe, expect, test } from "bun:test";
import { XAddCommand } from "./xadd";
import { XLenCommand } from "./xlen";

const client = newHttpClient();

const { newKey, cleanup } = keygen();
afterAll(cleanup);

describe("XLEN", () => {
test("should give size of the stream", async () => {
const key = newKey();
await new XAddCommand([key, "*", { name: "Jane", surname: "Austen" }]).exec(
client
);
await new XAddCommand([
key,
"*",
{ name: "Toni", surname: "Morrison" },
]).exec(client);

await new XAddCommand([
key,
"*",
{ name: "Hezarfen", surname: "----" },
]).exec(client);

const res = await new XLenCommand([key]).exec(client);

expect(res).toBe(3);
});

test("should return 0 when specified key does not exist", async () => {
const res = await new XLenCommand(["missing-key"]).exec(client);
expect(res).toBe(0);
});
});
10 changes: 10 additions & 0 deletions pkg/commands/xlen.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Command, CommandOptions } from "./command";

/**
* @see https://redis.io/commands/xlen
*/
export class XLenCommand extends Command<number, number> {
constructor(cmd: [key: string], opts?: CommandOptions<number, number>) {
super(["XLEN", ...cmd], opts);
}
}
8 changes: 6 additions & 2 deletions pkg/commands/xrange.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ describe("without options", () => {
const field2 = "field2";
const member2 = randomID();

await new XAddCommand([key, "*", { [field1]: member1, [field2]: member2 }]).exec(client);
await new XAddCommand([
key,
"*",
{ [field1]: member1, [field2]: member2 },
]).exec(client);

const res = await new XRangeCommand([key, "-", "+"]).exec(client);
expect(Object.keys(res).length).toBe(1);
Expand Down Expand Up @@ -49,7 +53,7 @@ describe("limit", () => {
});
});

test("many fields", () => {
describe("many fields", () => {
test("returns all fields", async () => {
const key = newKey();

Expand Down
62 changes: 62 additions & 0 deletions pkg/commands/xrevrange.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { keygen, newHttpClient } from "../test-utils";

import { afterAll, beforeEach, describe, expect, test } from "bun:test";
import { XAddCommand } from "./xadd";
import { XRevRangeCommand } from "./xrevrange";

const client = newHttpClient();
const { newKey, cleanup } = keygen();
const key = newKey();
afterAll(cleanup);

beforeEach(async () => {
await new XAddCommand([
key,
"*",
{ name: "Virginia", surname: "Woolf" },
]).exec(client);

await new XAddCommand([key, "*", { name: "Jane", surname: "Austen" }]).exec(
client
);

await new XAddCommand([key, "*", { name: "Toni", surname: "Morrison" }]).exec(
client
);

await new XAddCommand([
key,
"*",
{ name: "Agatha", surname: "Christie" },
]).exec(client);

await new XAddCommand([key, "*", { name: "Ngozi", surname: "Adichie" }]).exec(
client
);
});

describe("without options", () => {
test("should return stream in a reverse order", async () => {
const res = await new XRevRangeCommand([key, "+", "-"]).exec(client);

expect(Object.keys(res).length).toBe(5);
expect(Object.values(res)[0]).toEqual({
name: "Ngozi",
surname: "Adichie",
});
});
});

describe("LIMIT", () => {
test("should return only last two", async () => {
const res = await new XRevRangeCommand([key, "+", "-", 2]).exec(client);
expect(Object.keys(res).length).toBe(2);
expect(Object.values(res)).toEqual([
{
name: "Ngozi",
surname: "Adichie",
},
{ name: "Agatha", surname: "Christie" },
]);
});
});
51 changes: 51 additions & 0 deletions pkg/commands/xrevrange.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Command, CommandOptions } from "./command";

export class XRevRangeCommand<
TData extends Record<string, Record<string, unknown>>
> extends Command<string[][], TData> {
constructor(
[key, end, start, count]: [
key: string,
end: string,
start: string,
count?: number
],
opts?: CommandOptions<unknown[], TData[]>
) {
const command: unknown[] = ["XREVRANGE", key, end, start];
if (typeof count === "number") {
command.push("COUNT", count);
}
super(command, {
deserialize: (result) => deserialize<any>(result as any),
...opts,
});
}
}

function deserialize<TData extends Record<string, Record<string, unknown>>>(
result: [string, string[]][]
): TData {
const obj: Record<string, Record<string, unknown>> = {};
for (const e of result) {
while (e.length >= 2) {
const streamId = e.shift() as string;
const entries = e.shift()!;

if (!(streamId in obj)) {
obj[streamId] = {};
}
while (entries.length >= 2) {
const field = (entries as string[]).shift()! as string;
const value = (entries as string[]).shift()! as string;

try {
obj[streamId][field] = JSON.parse(value);
} catch {
obj[streamId][field] = value;
}
}
}
}
return obj as TData;
}
Loading

0 comments on commit fc26789

Please sign in to comment.