Skip to content

Commit

Permalink
Dx 569 (#809)
Browse files Browse the repository at this point in the history
* Add XGROUP commands

* Add XREAD command

* Add XINFO and XREADGROUP command

* Add XPENDING command

* Add XCLAIM command

* Add XAUTOCLAIM command

* Add XACK command

* Fix naming issues

* Fix flaky test
  • Loading branch information
ogzhanolguncu authored Jan 5, 2024
1 parent 0831653 commit 169183f
Show file tree
Hide file tree
Showing 20 changed files with 1,643 additions and 2 deletions.
8 changes: 8 additions & 0 deletions pkg/commands/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,18 @@ export * from "./touch";
export * from "./ttl";
export * from "./type";
export * from "./unlink";
export * from "./xack";
export * from "./xadd";
export * from "./xautoclaim";
export * from "./xclaim";
export * from "./xdel";
export * from "./xgroup";
export * from "./xinfo";
export * from "./xlen";
export * from "./xpending";
export * from "./xrange";
export * from "./xread";
export * from "./xreadgroup";
export * from "./xrevrange";
export * from "./xtrim";
export * from "./zadd";
Expand Down
84 changes: 84 additions & 0 deletions pkg/commands/xack.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { addNewItemToStream, keygen, newHttpClient } from "../test-utils";

import { afterAll, describe, expect, test } from "bun:test";
import { XAddCommand } from "./xadd";
import { XDelCommand } from "./xdel";
import { XRangeCommand } from "./xrange";
import { XGroupCommand } from "./xgroup";
import { XReadGroupCommand } from "./xreadgroup";
import { XAckCommand } from "./xack";

const client = newHttpClient();

const { newKey, cleanup } = keygen();
afterAll(cleanup);

describe("XACK", () => {
test("should acknowledge a message successfully", async () => {
const streamKey1 = newKey();
const group = newKey();
const consumer = newKey();

const { streamId: streamId1 } = await addNewItemToStream(
streamKey1,
client
);
const { streamId: streamId2 } = await addNewItemToStream(
streamKey1,
client
);

await new XGroupCommand([
streamKey1,
{ type: "CREATE", group, id: "0" },
]).exec(client);

(await new XReadGroupCommand([
group,
consumer,
streamKey1,
">",
{ count: 2 },
]).exec(client)) as string[];

const res = await new XAckCommand([
streamKey1,
group,
[streamId1, streamId2],
]).exec(client);
expect(res).toEqual(2);
});

test("should try to re-acknowledge and return 0", async () => {
const streamKey1 = newKey();
const group = newKey();
const consumer = newKey();

const { streamId: streamId1 } = await addNewItemToStream(
streamKey1,
client
);

await new XGroupCommand([
streamKey1,
{ type: "CREATE", group, id: "0", options: { MKSTREAM: true } },
]).exec(client);

(await new XReadGroupCommand([
group,
consumer,
streamKey1,
">",
{ count: 2 },
]).exec(client)) as string[];

const res = await new XAckCommand([streamKey1, group, streamId1]).exec(
client
);
expect(res).toEqual(1);
const res1 = await new XAckCommand([streamKey1, group, streamId1]).exec(
client
);
expect(res1).toEqual(0);
});
});
14 changes: 14 additions & 0 deletions pkg/commands/xack.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Command, CommandOptions } from "./command";

/**
* @see https://redis.io/commands/xack
*/
export class XAckCommand extends Command<number, number> {
constructor(
[key, group, id]: [key: string, group: string, id: string | string[]],
opts?: CommandOptions<number, number>
) {
const ids = Array.isArray(id) ? [...id] : [id];
super(["XACK", key, group, ...ids], opts);
}
}
146 changes: 146 additions & 0 deletions pkg/commands/xautoclaim.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { addNewItemToStream, keygen, newHttpClient } from "../test-utils";

import { sleep } from "bun";
import { afterAll, describe, expect, test } from "bun:test";
import { XAutoClaim } from "./xautoclaim";
import { XGroupCommand } from "./xgroup";
import { XReadGroupCommand } from "./xreadgroup";

const client = newHttpClient();

const { newKey, cleanup } = keygen();
afterAll(cleanup);

describe("XCLAIM", () => {
test("should move ownership to newly created consumer ", async () => {
const streamKey = newKey();
const group = newKey();
const consumer1 = newKey();
const consumer2 = newKey();

await addNewItemToStream(streamKey, client);
await addNewItemToStream(streamKey, client);

await new XGroupCommand([
streamKey,
{ type: "CREATE", group, id: "0" },
]).exec(client);

await new XReadGroupCommand([
group,
consumer1,
[streamKey],
[">"],
{ count: 2 },
]).exec(client);

const res = (await new XAutoClaim([
streamKey,
group,
consumer2,
10,
"0-0",
{ count: 1 },
]).exec(client)) as string[];
expect(res).toBeInstanceOf(Array);
});

test("should try to move ownership and fail due to too high idle time", async () => {
const streamKey = newKey();
const group = newKey();
const consumer1 = newKey();
const consumer2 = newKey();

await addNewItemToStream(streamKey, client);
await addNewItemToStream(streamKey, client);

await new XGroupCommand([
streamKey,
{ type: "CREATE", group, id: "0" },
]).exec(client);

await new XReadGroupCommand([
group,
consumer1,
[streamKey],
[">"],
{ count: 2 },
]).exec(client);
await sleep(2000);
const res = (await new XAutoClaim([
streamKey,
group,
consumer2,
3000,
"0-0",
{ count: 1 },
]).exec(client)) as string[];
expect(res).toEqual(["0-0", []]);
});

test("should successfull move the ownership with idle time", async () => {
const streamKey = newKey();
const group = newKey();
const consumer1 = newKey();
const consumer2 = newKey();

await addNewItemToStream(streamKey, client);
const { streamId } = await addNewItemToStream(streamKey, client);

await new XGroupCommand([
streamKey,
{ type: "CREATE", group, id: "0" },
]).exec(client);

await new XReadGroupCommand([
group,
consumer1,
[streamKey],
[">"],
{ count: 2 },
]).exec(client);
await sleep(2000);
const xclaim = (await new XAutoClaim([
streamKey,
group,
consumer2,
1000,
"0-0",
{ count: 1 },
]).exec(client)) as string[];
expect(xclaim[0]).toEqual(streamId);
});

test("should successfull return justid", async () => {
const streamKey = newKey();
const group = newKey();
const consumer1 = newKey();
const consumer2 = newKey();

await addNewItemToStream(streamKey, client);
const { streamId } = await addNewItemToStream(streamKey, client);

await new XGroupCommand([
streamKey,
{ type: "CREATE", group, id: "0" },
]).exec(client);

await new XReadGroupCommand([
group,
consumer1,
[streamKey],
[">"],
{ count: 2 },
]).exec(client);

const xclaim = (await new XAutoClaim([
streamKey,
group,
consumer2,
0,
"0-0",
{ count: 1, justId: true },
]).exec(client)) as string[];
expect(xclaim[1].length).toBe(1);
});
});
32 changes: 32 additions & 0 deletions pkg/commands/xautoclaim.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Command, CommandOptions } from "./command";

/**
* @see https://redis.io/commands/xautoclaim
*/
export class XAutoClaim extends Command<unknown[], unknown[]> {
constructor(
[key, group, consumer, minIdleTime, start, options]: [
key: string,
group: string,
consumer: string,
minIdleTime: number,
start: string,
options?: { count?: number; justId?: boolean }
],
opts?: CommandOptions<unknown[], unknown[]>
) {
const commands: unknown[] = [];

if (options?.count) {
commands.push("COUNT", options.count);
}

if (options?.justId) {
commands.push("JUSTID");
}
super(
["XAUTOCLAIM", key, group, consumer, minIdleTime, start, ...commands],
opts
);
}
}
Loading

0 comments on commit 169183f

Please sign in to comment.