Skip to content

Commit

Permalink
Fix resend integration headers (#219)
Browse files Browse the repository at this point in the history
* fix: resend integration headers

content-type header wasn't being sent, so resend couldn't process the request. But the integration worked before. So I am suspecting that QStash used to set a default content header but it doesn't anymore. To fix the issue, I added a utility to properly merge the headers and added tests to make sure that we don't miss this behavior.

* fix: add method option to providers
  • Loading branch information
CahidArda authored Dec 10, 2024
1 parent 97f77eb commit 99822e6
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 19 deletions.
2 changes: 2 additions & 0 deletions src/client/api/base.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { HTTPMethods } from "../types";
import type { ApiKind, Owner, ProviderInfo } from "./types";

export abstract class BaseProvider<TName extends ApiKind, TOwner = Owner> {
public abstract readonly apiKind: TName;
public abstract readonly method: HTTPMethods;

public readonly baseUrl: string;
public token: string;
Expand Down
76 changes: 76 additions & 0 deletions src/client/api/email.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ describe("email", () => {
const resendToken = nanoid();
const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token: qstashToken });

const header = "my-header";
const headerValue = "my-header-value";

test("should use resend", async () => {
await mockQStashServer({
execute: async () => {
Expand All @@ -17,6 +20,9 @@ describe("email", () => {
name: "email",
provider: resend({ token: resendToken }),
},
headers: {
[header]: headerValue,
},
body: {
from: "Acme <[email protected]>",
to: ["[email protected]"],
Expand All @@ -42,6 +48,9 @@ describe("email", () => {
headers: {
authorization: `Bearer ${qstashToken}`,
"upstash-forward-authorization": `Bearer ${resendToken}`,
"content-type": "application/json",
[`upstash-forward-${header}`]: headerValue,
"upstash-method": "POST",
},
},
});
Expand All @@ -55,6 +64,70 @@ describe("email", () => {
name: "email",
provider: resend({ token: resendToken, batch: true }),
},
headers: {
[header]: headerValue,
},
body: [
{
from: "Acme <[email protected]>",
to: ["[email protected]"],
subject: "hello world",
html: "<h1>it works!</h1>",
},
{
from: "Acme <[email protected]>",
to: ["[email protected]"],
subject: "world hello",
html: "<p>it works!</p>",
},
],
});
},
responseFields: {
body: { messageId: "msgId" },
status: 200,
},
receivesRequest: {
method: "POST",
token: qstashToken,
url: "http://localhost:8080/v2/publish/https://api.resend.com/emails/batch",
body: [
{
from: "Acme <[email protected]>",
to: ["[email protected]"],
subject: "hello world",
html: "<h1>it works!</h1>",
},
{
from: "Acme <[email protected]>",
to: ["[email protected]"],
subject: "world hello",
html: "<p>it works!</p>",
},
],
headers: {
authorization: `Bearer ${qstashToken}`,
"upstash-forward-authorization": `Bearer ${resendToken}`,
"content-type": "application/json",
[`upstash-forward-${header}`]: headerValue,
"upstash-method": "POST",
},
},
});
});

test("should be able to overwrite method", async () => {
await mockQStashServer({
execute: async () => {
await client.publishJSON({
api: {
name: "email",
provider: resend({ token: resendToken, batch: true }),
},
headers: {
[header]: headerValue,
},
method: "PUT",
body: [
{
from: "Acme <[email protected]>",
Expand Down Expand Up @@ -96,6 +169,9 @@ describe("email", () => {
headers: {
authorization: `Bearer ${qstashToken}`,
"upstash-forward-authorization": `Bearer ${resendToken}`,
"content-type": "application/json",
[`upstash-forward-${header}`]: headerValue,
"upstash-method": "PUT",
},
},
});
Expand Down
1 change: 1 addition & 0 deletions src/client/api/email.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { EmailOwner, ProviderInfo } from "./types";
export class EmailProvider extends BaseProvider<"email", EmailOwner> {
public readonly apiKind = "email";
public readonly batch: boolean;
public readonly method = "POST";

constructor(baseUrl: string, token: string, owner: EmailOwner, batch: boolean) {
super(baseUrl, token, owner);
Expand Down
1 change: 1 addition & 0 deletions src/client/api/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { updateWithAnalytics } from "./utils";
export class LLMProvider<TOwner extends LLMOwner> extends BaseProvider<"llm", LLMOwner> {
public readonly apiKind = "llm";
public readonly organization?: string;
public readonly method = "POST";

constructor(baseUrl: string, token: string, owner: TOwner, organization?: string) {
super(baseUrl, token, owner);
Expand Down
5 changes: 5 additions & 0 deletions src/client/api/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { HTTPMethods } from "../types";
import type { BaseProvider } from "./base";

export type ProviderInfo = {
Expand All @@ -21,6 +22,10 @@ export type ProviderInfo = {
* provider owner
*/
owner: Owner;
/**
* method to use in the request
*/
method: HTTPMethods;
};

export type ApiKind = "llm" | "email";
Expand Down
53 changes: 41 additions & 12 deletions src/client/api/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { PublishRequest } from "../client";
import type { LLMOptions, ProviderInfo, PublishEmailApi, PublishLLMApi } from "./types";
import { upstash } from "./llm";
import type { HeadersInit } from "../types";

/**
* copies and updates the request by removing the api field and adding url & headers.
Expand Down Expand Up @@ -37,29 +38,63 @@ export const getProviderInfo = (
route: finalProvider.getRoute(),
appendHeaders: finalProvider.getHeaders(parameters),
owner: finalProvider.owner,
method: finalProvider.method,
};

return finalProvider.onFinish(providerInfo, parameters);
};

/**
* joins two header sets. If the same header exists in both headers and record,
* one in headers is used.
*
* The reason why we added this method is because the following doesn't work:
*
* ```ts
* const joined = {
* ...headers,
* ...record
* }
* ```
*
* `headers.toJSON` could have worked, but it exists in bun, and not necessarily in
* other runtimes.
*
* @param headers Headers object
* @param record record
* @returns joined header
*/
const safeJoinHeaders = (headers: Headers, record: Record<string, string>) => {
const joinedHeaders = new Headers(record);
for (const [header, value] of headers.entries()) {
joinedHeaders.set(header, value);
}
return joinedHeaders as HeadersInit;
};

/**
* copies and updates the request by removing the api field and adding url & headers.
*
* if there is no api field, simply returns.
* if there is no api field, simply returns after overwriting headers with the passed headers.
*
* @param request request with api field
* @param headers processed headers. Previously, these headers were assigned to the request
* when the headers were calculated. But PublishRequest.request type (HeadersInit) is broader
* than headers (Headers). PublishRequest.request is harder to work with, so we set them here.
* @param upstashToken used if provider is upstash and token is not set
* @returns updated request
*/
export const processApi = (
request: PublishRequest<unknown>,
headers: Headers,
upstashToken: string
): PublishRequest<unknown> => {
if (!request.api) {
request.headers = headers;
return request;
}

const { url, appendHeaders, owner } = getProviderInfo(request.api, upstashToken);
const { url, appendHeaders, owner, method } = getProviderInfo(request.api, upstashToken);

if (request.api.name === "llm") {
const callback = request.callback;
Expand All @@ -69,23 +104,17 @@ export const processApi = (

return {
...request,
// @ts-expect-error undici header conflict
headers: new Headers({
...request.headers,
...appendHeaders,
}),
method: request.method ?? method,
headers: safeJoinHeaders(headers, appendHeaders),
...(owner === "upstash" && !request.api.analytics
? { api: { name: "llm" }, url: undefined, callback }
: { url, api: undefined }),
};
} else {
return {
...request,
// @ts-expect-error undici header conflict
headers: new Headers({
...request.headers,
...appendHeaders,
}),
method: request.method ?? method,
headers: safeJoinHeaders(headers, appendHeaders),
url,
api: undefined,
};
Expand Down
9 changes: 4 additions & 5 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,10 @@ export class Client {
//@ts-expect-error caused by undici and bunjs type overlap
const headers = prefixHeaders(new Headers(request.headers));
headers.set("Content-Type", "application/json");
request.headers = headers;

//@ts-expect-error hacky way to get bearer token
const upstashToken = String(this.http.authorization).split("Bearer ")[1];
const nonApiRequest = processApi(request, upstashToken);
const nonApiRequest = processApi(request, headers, upstashToken);

// @ts-expect-error it's just internal
const response = await this.publish<TRequest>({
Expand Down Expand Up @@ -436,12 +435,12 @@ export class Client {
if ("body" in message) {
message.body = JSON.stringify(message.body) as unknown as TBody;
}
//@ts-expect-error caused by undici and bunjs type overlap
message.headers = new Headers(message.headers);

//@ts-expect-error hacky way to get bearer token
const upstashToken = String(this.http.authorization).split("Bearer ")[1];
const nonApiMessage = processApi(message, upstashToken);

//@ts-expect-error caused by undici and bunjs type overlap
const nonApiMessage = processApi(message, new Headers(message.headers), upstashToken);

(nonApiMessage.headers as Headers).set("Content-Type", "application/json");

Expand Down
3 changes: 1 addition & 2 deletions src/client/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,10 @@ export class Queue {
//@ts-expect-error caused by undici and bunjs type overlap
const headers = prefixHeaders(new Headers(request.headers));
headers.set("Content-Type", "application/json");
request.headers = headers;

//@ts-expect-error hacky way to get bearer token
const upstashToken = String(this.http.authorization).split("Bearer ")[1];
const nonApiRequest = processApi(request, upstashToken);
const nonApiRequest = processApi(request, headers, upstashToken);

const response = await this.enqueue({
...nonApiRequest,
Expand Down

0 comments on commit 99822e6

Please sign in to comment.