Skip to content

Commit

Permalink
feat(webhook): Implement webhook functionality for bookmark events (#852
Browse files Browse the repository at this point in the history
)

* feat(webhook): Implement webhook functionality for bookmark events

- Added WebhookWorker to handle webhook requests.
- Integrated webhook triggering in crawlerWorker after video processing.
- Updated main worker initialization to include WebhookWorker.
- Enhanced configuration to support webhook URLs, token, and timeout.
- Documented webhook configuration options in the documentation.
- Introduced zWebhookRequestSchema for validating webhook requests.

* feat(webhook): Update webhook handling and configuration

- Changed webhook operation type from "create" to "crawled" in crawlerWorker and documentation.
- Enhanced webhook retry logic in WebhookWorker to support multiple attempts.
- Updated Docker configuration to include new webhook environment variables.
- Improved validation for webhook configuration in shared config.
- Adjusted zWebhookRequestSchema to reflect the new operation type.
- Updated documentation to clarify webhook configuration options and usage.

* minor modifications

---------

Co-authored-by: Mohamed Bassem <[email protected]>
  • Loading branch information
hanguofeng and MohamedBassem authored Jan 19, 2025
1 parent b323573 commit b9cce5d
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 11 deletions.
4 changes: 4 additions & 0 deletions apps/workers/crawlerWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
OpenAIQueue,
triggerSearchReindex,
triggerVideoWorker,
triggerWebhookWorker,
zCrawlLinkRequestSchema,
} from "@hoarder/shared/queues";
import { BookmarkTypes } from "@hoarder/shared/types/bookmarks";
Expand Down Expand Up @@ -770,6 +771,9 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
// Trigger a potential download of a video from the URL
await triggerVideoWorker(bookmarkId, url);

// Trigger a webhook
await triggerWebhookWorker(bookmarkId, "crawled");

// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
}
Expand Down
34 changes: 23 additions & 11 deletions apps/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,31 @@ import { shutdownPromise } from "./exit";
import { OpenAiWorker } from "./openaiWorker";
import { SearchIndexingWorker } from "./searchWorker";
import { VideoWorker } from "./videoWorker";
import { WebhookWorker } from "./webhookWorker";

async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();

const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing] =
[
await CrawlerWorker.build(),
OpenAiWorker.build(),
SearchIndexingWorker.build(),
TidyAssetsWorker.build(),
VideoWorker.build(),
FeedWorker.build(),
AssetPreprocessingWorker.build(),
];
const [
crawler,
openai,
search,
tidyAssets,
video,
feed,
assetPreprocessing,
webhook,
] = [
await CrawlerWorker.build(),
OpenAiWorker.build(),
SearchIndexingWorker.build(),
TidyAssetsWorker.build(),
VideoWorker.build(),
FeedWorker.build(),
AssetPreprocessingWorker.build(),
WebhookWorker.build(),
];
FeedRefreshingWorker.start();

await Promise.any([
Expand All @@ -39,11 +49,12 @@ async function main() {
video.run(),
feed.run(),
assetPreprocessing.run(),
webhook.run(),
]),
shutdownPromise,
]);
logger.info(
"Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing and search workers ...",
"Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook and search workers ...",
);

FeedRefreshingWorker.stop();
Expand All @@ -54,6 +65,7 @@ async function main() {
video.stop();
feed.stop();
assetPreprocessing.stop();
webhook.stop();
}

main();
136 changes: 136 additions & 0 deletions apps/workers/webhookWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import { eq } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
import fetch from "node-fetch";

import { db } from "@hoarder/db";
import { bookmarks } from "@hoarder/db/schema";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import {
WebhookQueue,
ZWebhookRequest,
zWebhookRequestSchema,
} from "@hoarder/shared/queues";

export class WebhookWorker {
static build() {
logger.info("Starting webhook worker ...");
const worker = new Runner<ZWebhookRequest>(
WebhookQueue,
{
run: runWebhook,
onComplete: async (job) => {
const jobId = job.id;
logger.info(`[webhook][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: async (job) => {
const jobId = job.id;
logger.error(
`[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`,
);
return Promise.resolve();
},
},
{
concurrency: 1,
pollIntervalMs: 1000,
timeoutSecs:
serverConfig.webhook.timeoutSec *
(serverConfig.webhook.retryTimes + 1) +
1, //consider retry times, and timeout and add 1 second for other stuff
validator: zWebhookRequestSchema,
},
);

return worker;
}
}

async function fetchBookmark(linkId: string) {
return await db.query.bookmarks.findFirst({
where: eq(bookmarks.id, linkId),
with: {
link: true,
text: true,
asset: true,
},
});
}

async function runWebhook(job: DequeuedJob<ZWebhookRequest>) {
const jobId = job.id;
const webhookUrls = serverConfig.webhook.urls;
if (!webhookUrls) {
logger.info(
`[webhook][${jobId}] No webhook urls configured. Skipping webhook job.`,
);
return;
}
const webhookToken = serverConfig.webhook.token;
const webhookTimeoutSec = serverConfig.webhook.timeoutSec;

const { bookmarkId } = job.data;
const bookmark = await fetchBookmark(bookmarkId);
if (!bookmark) {
throw new Error(
`[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`,
);
}

logger.info(
`[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id}"`,
);

await Promise.allSettled(
webhookUrls.map(async (url) => {
const maxRetries = serverConfig.webhook.retryTimes;
let attempt = 0;
let success = false;

while (attempt < maxRetries && !success) {
try {
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
...(webhookToken
? {
Authorization: `Bearer ${webhookToken}`,
}
: {}),
},
body: JSON.stringify({
jobId,
bookmarkId,
userId: bookmark.userId,
url: bookmark.link ? bookmark.link.url : undefined,
type: bookmark.type,
operation: job.data.operation,
}),
signal: AbortSignal.timeout(webhookTimeoutSec * 1000),
});

if (!response.ok) {
logger.error(
`Webhook call to ${url} failed with status: ${response.status}`,
);
} else {
logger.info(`[webhook][${jobId}] Webhook to ${url} call succeeded`);
success = true;
}
} catch (error) {
logger.error(
`[webhook][${jobId}] Webhook to ${url} call failed: ${error}`,
);
}
attempt++;
if (!success && attempt < maxRetries) {
logger.info(
`[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`,
);
}
}
}),
);
}
33 changes: 33 additions & 0 deletions docs/docs/03-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,36 @@ Hoarder uses [tesseract.js](https://github.com/naptha/tesseract.js) to extract t
| OCR_CACHE_DIR | No | $TEMP_DIR | The dir where tesseract will download its models. By default, those models are not persisted and stored in the OS' temp dir. |
| OCR_LANGS | No | eng | Comma separated list of the language codes that you want tesseract to support. You can find the language codes [here](https://tesseract-ocr.github.io/tessdoc/Data-Files-in-different-versions.html). Set to empty string to disable OCR. |
| OCR_CONFIDENCE_THRESHOLD | No | 50 | A number between 0 and 100 indicating the minimum acceptable confidence from tessaract. If tessaract's confidence is lower than this value, extracted text won't be stored. |

## Webhook Configs

You can use webhooks to trigger actions when bookmarks are changed ( only support _crawled_ now ).

| Name | Required | Default | Description |
| ------------------- | -------- | ------- | ---------------------------------------------------------------------------------------------- |
| WEBHOOK_URLS | No | | The urls of the webhooks to trigger, separated by commas. |
| WEBHOOK_TOKEN | No | | The token to use for authentication. Will appears in the Authorization header as Bearer token. |
| WEBHOOK_TIMEOUT_SEC | No | 5 | The timeout for the webhook request in seconds. |
| WEBHOOK_RETRY_TIMES | No | 3 | The number of times to retry the webhook request. |

:::info

- If a url is add to hoarder , after it is crawled, the webhook will be triggered.
- The WEBHOOK_TOKEN is used for authentication. It will appear in the Authorization header as Bearer token.
```
Authorization: Bearer <WEBHOOK_TOKEN>
```
- The webhook will be triggered with the job id (used for idempotence), bookmark id, bookmark type, the user id, the url and the operation in JSON format in the body.

```json
{
"jobId": 123,
"type": "link",
"bookmarkId": "exampleBookmarkId",
"userId": "exampleUserId",
"url": "https://example.com",
"operation": "crawled"
}
```

:::
14 changes: 14 additions & 0 deletions packages/shared/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ const allEnv = z.object({
DATA_DIR: z.string().default(""),
MAX_ASSET_SIZE_MB: z.coerce.number().default(4),
INFERENCE_LANG: z.string().default("english"),
WEBHOOK_URLS: z
.string()
.transform((val) => val.split(","))
.pipe(z.array(z.string().url()))
.optional(),
WEBHOOK_TOKEN: z.string().optional(),
WEBHOOK_TIMEOUT_SEC: z.coerce.number().default(5),
WEBHOOK_RETRY_TIMES: z.coerce.number().int().min(0).default(3),
// Build only flag
SERVER_VERSION: z.string().optional(),
DISABLE_NEW_RELEASE_CHECK: stringBool("false"),
Expand Down Expand Up @@ -134,6 +142,12 @@ const serverConfigSchema = allEnv.transform((val) => {
serverVersion: val.SERVER_VERSION,
disableNewReleaseCheck: val.DISABLE_NEW_RELEASE_CHECK,
usingLegacySeparateContainers: val.USING_LEGACY_SEPARATE_CONTAINERS,
webhook: {
urls: val.WEBHOOK_URLS,
token: val.WEBHOOK_TOKEN,
timeoutSec: val.WEBHOOK_TIMEOUT_SEC,
retryTimes: val.WEBHOOK_RETRY_TIMES,
},
};
});

Expand Down
27 changes: 27 additions & 0 deletions packages/shared/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,30 @@ export const AssetPreprocessingQueue =
keepFailedJobs: false,
},
);

//Webhook worker
export const zWebhookRequestSchema = z.object({
bookmarkId: z.string(),
operation: z.enum(["crawled"]),
});
export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
export const WebhookQueue = new SqliteQueue<ZWebhookRequest>(
"webhook_queue",
queueDB,
{
defaultJobArgs: {
numRetries: 3,
},
keepFailedJobs: false,
},
);

export async function triggerWebhookWorker(
bookmarkId: string,
operation: "crawled",
) {
await WebhookQueue.enqueue({
bookmarkId,
operation,
});
}

0 comments on commit b9cce5d

Please sign in to comment.