Skip to content

Commit

Permalink
In auto pipeline, throw errors seperately (#1305)
Browse files Browse the repository at this point in the history
* fix: in auto pipeline, throw errors seperately

* fix: add tests

* fix: overload exec method

* fix: use interface to overload exec

* fix: return type

* fix: check undefined result in error
  • Loading branch information
CahidArda authored Oct 9, 2024
1 parent c5d3b0f commit ef1ca98
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 31 deletions.
45 changes: 45 additions & 0 deletions pkg/auto-pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,4 +318,49 @@ describe("Auto pipeline", () => {

expect(res).toEqual(["OK", "OK", "bar", { hello: "world" }, 1, null]);
});

test.only("should throw errors granularly", async () => {
// in this test, we have two methods being called parallel. both
// use redis, but one of them has try/catch. when the request in
// try fails, it shouldn't make the request in the parallel request
// fail
const redis = Redis.fromEnv({
enableAutoPipelining: true,
});

const scriptLoadCommand = new ScriptLoadCommand(["redis.call('SET', 'foobar', 'foobar')"]);
const scriptHash = await scriptLoadCommand.exec(client);
await redis.scriptFlush();

const methodOne = async () => {
// method with try catch
try {
await redis.evalsha(scriptHash, [], []);
throw new Error("test should have thrown in the command above");
} catch (error_) {
const error = error_ as Error;

if (error.message.includes("NOSCRIPT")) {
await scriptLoadCommand.exec(client);
await redis.evalsha(scriptHash, [], []);
return true;
} else {
throw new Error("incorrect error was thrown:", error);
}
}
};

const methodTwo = async () => {
await redis.set("barfoo", "barfoo");
return await redis.get("barfoo");
};

const [result1, result2] = await Promise.all([methodOne(), methodTwo()]);
expect(result1).toBeTrue();
expect(result2).toBe("barfoo");

// first method executed correctly
const result = await redis.get("foobar");
expect(result).toBe("foobar");
});
});
12 changes: 9 additions & 3 deletions pkg/auto-pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { Command } from "./commands/command";
import { UpstashError } from "./error";
import type { UpstashResponse } from "./http";
import type { Pipeline } from "./pipeline";
import type { Redis } from "./redis";
import type { CommandArgs } from "./types";
Expand Down Expand Up @@ -86,7 +88,7 @@ class AutoPipelineExecutor {

const pipelineDone = this.deferExecution().then(() => {
if (!this.pipelinePromises.has(pipeline)) {
const pipelinePromise = pipeline.exec();
const pipelinePromise = pipeline.exec({ keepErrors: true });
this.pipelineCounter += 1;

this.pipelinePromises.set(pipeline, pipelinePromise);
Expand All @@ -96,8 +98,12 @@ class AutoPipelineExecutor {
return this.pipelinePromises.get(pipeline)!;
});

const results = await pipelineDone;
return results[index] as T;
const results = (await pipelineDone) as UpstashResponse<T>[];
const commandResult = results[index];
if (commandResult.error) {
throw new UpstashError(`Command failed: ${commandResult.error}`);
}
return commandResult.result as T;
}

private async deferExecution() {
Expand Down
49 changes: 49 additions & 0 deletions pkg/pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,52 @@ describe("use all the things", () => {
expect(res.length).toEqual(121);
});
});
describe("keep errors", () => {
test("should return results in case of success", async () => {
const p = new Pipeline({ client });
p.set("foo", "1");
p.set("bar", "2");
p.get("foo");
p.get("bar");
const results = await p.exec({ keepErrors: true });

// errors are undefined
for (const { error } of results) {
expect(error).toBeUndefined();
}
expect(results[2].result).toBe(1);
expect(results[3].result).toBe(2);
});

test("should throw without keepErrors", async () => {
const p = new Pipeline({ client });
p.set("foo", "1");
p.set("bar", "2");
p.evalsha("wrong-sha1", [], []);
p.get("foo");
p.get("bar");
expect(() => p.exec()).toThrow(
"Command 3 [ evalsha ] failed: NOSCRIPT No matching script. Please use EVAL."
);
});

test("should return errors with keepErrors", async () => {
const p = new Pipeline({ client });
p.set("foo", "1");
p.set("bar", "2");
p.evalsha("wrong-sha1", [], []);
p.get("foo");
p.get("bar");
const results = await p.exec<[string, string, string, number, number]>({ keepErrors: true });

expect(results[0].error).toBeUndefined();
expect(results[1].error).toBeUndefined();
expect(results[2].error).toBe("NOSCRIPT No matching script. Please use EVAL.");
expect(results[3].error).toBeUndefined();
expect(results[4].error).toBeUndefined();

expect(results[2].result).toBeUndefined();
expect(results[3].result).toBe(1);
expect(results[4].result).toBe(2);
});
});
89 changes: 61 additions & 28 deletions pkg/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,46 @@ type InferResponseData<T extends unknown[]> = {
[K in keyof T]: T[K] extends Command<any, infer TData> ? TData : unknown;
};

interface ExecMethod<TCommands extends Command<any, any>[]> {
/**
* Send the pipeline request to upstash.
*
* Returns an array with the results of all pipelined commands.
*
* If all commands are statically chained from start to finish, types are inferred. You can still define a return type manually if necessary though:
* ```ts
* const p = redis.pipeline()
* p.get("key")
* const result = p.exec<[{ greeting: string }]>()
* ```
*
* If one of the commands get an error, the whole pipeline fails. Alternatively, you can set the keepErrors option to true in order to get the errors individually.
*
* If keepErrors is set to true, a list of objects is returned where each object corresponds to a command and is of type: `{ result: unknown, error?: string }`.
*
* ```ts
* const p = redis.pipeline()
* p.get("key")
*
* const result = await p.exec({ keepErrors: true });
* const getResult = result[0].result
* const getError = result[0].error
* ```
*/
<
TCommandResults extends unknown[] = [] extends TCommands
? unknown[]
: InferResponseData<TCommands>,
>(): Promise<TCommandResults>;
<
TCommandResults extends unknown[] = [] extends TCommands
? unknown[]
: InferResponseData<TCommands>,
>(options: {
keepErrors: true;
}): Promise<{ [K in keyof TCommandResults]: UpstashResponse<TCommandResults[K]> }>;
}

/**
* Upstash REST API supports command pipelining to send multiple commands in
* batch, instead of sending each command one by one and waiting for a response.
Expand Down Expand Up @@ -246,9 +286,11 @@ export class Pipeline<TCommands extends Command<any, any>[] = []> {
TCommandResults extends unknown[] = [] extends TCommands
? unknown[]
: InferResponseData<TCommands>,
>(): Promise<TCommandResults> => {
>(options?: {
keepErrors: true;
}): Promise<TCommandResults> => {
const start = performance.now();
const result = await originalExec();
const result = await (options ? originalExec(options) : originalExec());
const end = performance.now();
const loggerResult = (end - start).toFixed(2);
// eslint-disable-next-line no-console
Expand All @@ -262,23 +304,7 @@ export class Pipeline<TCommands extends Command<any, any>[] = []> {
}
}

/**
* Send the pipeline request to upstash.
*
* Returns an array with the results of all pipelined commands.
*
* If all commands are statically chained from start to finish, types are inferred. You can still define a return type manually if necessary though:
* ```ts
* const p = redis.pipeline()
* p.get("key")
* const result = p.exec<[{ greeting: string }]>()
* ```
*/
exec = async <
TCommandResults extends unknown[] = [] extends TCommands
? unknown[]
: InferResponseData<TCommands>,
>(): Promise<TCommandResults> => {
exec: ExecMethod<TCommands> = async (options?: { keepErrors: true }) => {
if (this.commands.length === 0) {
throw new Error("Pipeline is empty");
}
Expand All @@ -289,15 +315,22 @@ export class Pipeline<TCommands extends Command<any, any>[] = []> {
body: Object.values(this.commands).map((c) => c.command),
})) as UpstashResponse<any>[];

return res.map(({ error, result }, i) => {
if (error) {
throw new UpstashError(
`Command ${i + 1} [ ${this.commands[i].command[0]} ] failed: ${error}`
);
}

return this.commands[i].deserialize(result);
}) as TCommandResults;
return options?.keepErrors
? res.map(({ error, result }, i) => {
return {
error: error,
result: this.commands[i].deserialize(result),
};
})
: res.map(({ error, result }, i) => {
if (error) {
throw new UpstashError(
`Command ${i + 1} [ ${this.commands[i].command[0]} ] failed: ${error}`
);
}

return this.commands[i].deserialize(result);
});
};

/**
Expand Down

0 comments on commit ef1ca98

Please sign in to comment.