Skip to content

Commit

Permalink
refactor: use SharedWorker (closes #13)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronshaf committed Nov 25, 2024
1 parent 7fd079a commit d64da15
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 94 deletions.
4 changes: 2 additions & 2 deletions packages/idb-cache/src/encryptionTasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
} from "./errors";

/**
* Encrypts a chunk of data using the worker.
* Encrypts a chunk of data using the SharedWorker.
* @param port - The MessagePort instance.
* @param value - The plaintext string to encrypt.
* @param pendingRequests - Map of pending requests awaiting responses.
Expand Down Expand Up @@ -50,7 +50,7 @@ export async function encryptChunk(
}

/**
* Decrypts a chunk of data using the worker.
* Decrypts a chunk of data using the SharedWorker.
* @param port - The MessagePort instance.
* @param iv - The Initialization Vector used during encryption.
* @param ciphertext - The encrypted data.
Expand Down
134 changes: 69 additions & 65 deletions packages/idb-cache/src/encryptionWorkerFn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

import type { WorkerMessage } from "./types";

declare const self: SharedWorkerGlobalScope;

export function encryptionWorkerFunction() {
let cacheKey: Uint8Array | null = null;
const derivedKeyCache: Map<string, CryptoKey> = new Map();
let pbkdf2Iterations = 100000;
let port: MessagePort | null = null;
let fixedSalt: ArrayBuffer | null = null;

async function getKeyFromCacheKey(
Expand Down Expand Up @@ -59,12 +60,16 @@ export function encryptionWorkerFunction() {
throw new Error("Cache key not provided for encryption worker");
}
try {
port?.postMessage({ type: "ready" });
for (const port of ports) {
port.postMessage({ type: "ready" });
}
} catch (error: unknown) {
console.error("Worker: Failed to initialize AES key:", error);
const errorMessage =
error instanceof Error ? error.message : "Unknown initialization error";
port?.postMessage({ type: "initError", error: errorMessage });
for (const port of ports) {
port.postMessage({ type: "initError", error: errorMessage });
}
}
}

Expand Down Expand Up @@ -158,7 +163,7 @@ export function encryptionWorkerFunction() {
}
}

function handleEncrypt(requestId: string, value: string) {
function handleEncrypt(requestId: string, value: string, port: MessagePort) {
enqueueTask(async () => {
try {
const encrypted = await encrypt(value);
Expand Down Expand Up @@ -186,7 +191,8 @@ export function encryptionWorkerFunction() {
function handleDecrypt(
requestId: string,
iv: ArrayBuffer,
ciphertext: ArrayBuffer
ciphertext: ArrayBuffer,
port: MessagePort
) {
enqueueTask(async () => {
try {
Expand All @@ -209,73 +215,71 @@ export function encryptionWorkerFunction() {
});
}

async function onMessage(e: MessageEvent<WorkerMessage>) {
const { type, payload, requestId } = e.data;

switch (type) {
case "initialize":
{
const {
cacheKey: incomingCacheKey,
pbkdf2Iterations: incomingIterations,
cacheBuster,
} = payload;
cacheKey = new TextEncoder().encode(incomingCacheKey);
pbkdf2Iterations = incomingIterations || 100000;
fixedSalt = new TextEncoder().encode(cacheBuster).buffer;
await initializeKey();
}
break;

case "encrypt":
{
const { value } = payload;
await handleEncrypt(requestId, value);
}
break;
const ports: MessagePort[] = [];

case "decrypt":
{
const { iv, ciphertext } = payload;
await handleDecrypt(requestId, iv, ciphertext);
}
break;
function onConnect(e: MessageEvent) {
const port = e.ports[0];
ports.push(port);
port.onmessage = (event: MessageEvent<WorkerMessage>) => {
const { type, payload, requestId } = event.data;

case "destroy":
{
if (cacheKey) {
cacheKey.fill(0);
cacheKey = null;
}
if (fixedSalt) {
const saltArray = new Uint8Array(fixedSalt);
saltArray.fill(0);
fixedSalt = null;
switch (type) {
case "initialize":
{
const {
cacheKey: incomingCacheKey,
pbkdf2Iterations: incomingIterations,
cacheBuster,
} = payload;
cacheKey = new TextEncoder().encode(incomingCacheKey);
pbkdf2Iterations = incomingIterations || 100000;
fixedSalt = new TextEncoder().encode(cacheBuster).buffer;
initializeKey().catch((error) => {
console.error("Worker: Initialization failed:", error);
});
}
if (port) {
port.close();
port = null;
break;

case "encrypt":
{
const { value } = payload;
handleEncrypt(requestId, value, port);
}
self.close();
}
break;
break;

default:
console.warn(
`Worker: Unknown message type received: ${type}. Ignoring the message.`
);
}
}
case "decrypt":
{
const { iv, ciphertext } = payload;
handleDecrypt(requestId, iv, ciphertext, port);
}
break;

function handleInit(e: MessageEvent) {
const { type } = e.data;
case "destroy":
{
if (cacheKey) {
cacheKey.fill(0);
cacheKey = null;
}
if (fixedSalt) {
const saltArray = new Uint8Array(fixedSalt);
saltArray.fill(0);
fixedSalt = null;
}
for (const p of ports) {
p.close();
}
self.close();
}
break;

if (type === "init" && e.ports && e.ports.length > 0) {
port = e.ports[0];
port.onmessage = onMessage;
port.start();
}
default:
console.warn(
`Worker: Unknown message type received: ${type}. Ignoring the message.`
);
}
};
port.start();
}

self.onmessage = handleInit;
self.onconnect = onConnect;
}
10 changes: 5 additions & 5 deletions packages/idb-cache/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ const isSubtleCryptoSupported = crypto?.subtle;
export class IDBCache implements IDBCacheInterface {
dbReadyPromise: Promise<import("idb").IDBPDatabase<IDBCacheSchema>>;
private storeName: STORE;
private worker: Worker | null = null;
private worker: SharedWorker | null = null;
private port: MessagePort | null = null;
private pendingRequests: Map<
string,
Expand Down Expand Up @@ -183,7 +183,7 @@ export class IDBCache implements IDBCacheInterface {
}

/**
* Initializes the worker by creating it, setting up communication, and handling initialization.
* Initializes the SharedWorker by creating it, setting up communication, and handling initialization.
* @param cacheKey - The cache key used for encryption/decryption.
* @param cacheBuster - The cacheBuster used as a fixed salt.
* @throws {WorkerInitializationError} If the worker fails to initialize.
Expand Down Expand Up @@ -324,7 +324,7 @@ export class IDBCache implements IDBCacheInterface {

// Define key range for this baseKey
const lowerBound = `${baseKey}-chunk-000000-`;
const upperBound = `${baseKey}-chunk-999999\uffff`;
const upperBound = `${baseKey}-chunk-999999`;
const range = IDBKeyRange.bound(
lowerBound,
upperBound,
Expand Down Expand Up @@ -800,7 +800,7 @@ export class IDBCache implements IDBCacheInterface {
}

/**
* Destroys the IDBCache instance by clearing data (optional), releasing resources, and terminating the worker.
* Destroys the IDBCache instance by clearing data (optional), releasing resources, and terminating the SharedWorker.
* @param options - Configuration options for destruction.
* @param options.clearData - Whether to clear all cached data before destruction.
* @throws {DatabaseError} If there is an issue accessing the database during data clearing.
Expand Down Expand Up @@ -831,7 +831,7 @@ export class IDBCache implements IDBCacheInterface {
}

if (this.worker) {
this.worker.terminate();
this.worker.port.close();
this.worker = null;
}

Expand Down
45 changes: 23 additions & 22 deletions packages/idb-cache/src/workerUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import {
IDBCacheError,
} from "./errors";

/**
* Utility type guards for Worker responses
*/
function isReadyResponse(
message: WorkerResponse
): message is { type: "ready" } {
Expand Down Expand Up @@ -51,7 +54,7 @@ function isErrorResponse(
}

/**
* Creates a worker from a given function and sets up initial communication.
* Creates a SharedWorker from a given function and sets up initial communication.
* @param fn - The worker function to execute.
* @param rejectAll - Function to call to reject all pending requests in case of failure.
* @returns An object containing the worker instance and its message port.
Expand All @@ -60,38 +63,34 @@ export function createWorkerFromFunction(
fn: () => void,
rejectAll: (errorMessage: string) => void
): {
worker: Worker;
worker: SharedWorker;
port: MessagePort;
} {
const blob = new Blob([`(${fn.toString()})()`], {
type: "application/javascript",
});
const url = URL.createObjectURL(blob);
const worker = new Worker(url);

const channel = new MessageChannel();
const worker = new SharedWorker(url);

worker.postMessage({ type: "init" }, [channel.port2]);
const port = worker.port;

worker.onmessage = () => {
URL.revokeObjectURL(url);
};
port.start();

worker.onerror = (event) => {
console.error("Worker encountered an error:", event.message);
rejectAll("Worker encountered an error and was terminated.");
worker.terminate();
console.error("SharedWorker encountered an error:", event.message);
rejectAll("SharedWorker encountered an error and was terminated.");
worker.port.close();
};

channel.port1.onmessageerror = () => {
port.onmessageerror = () => {
console.warn(
"MessagePort encountered a message error. Worker may have been terminated."
"MessagePort encountered a message error. SharedWorker may have been terminated."
);
rejectAll("Worker was terminated unexpectedly.");
channel.port1.close();
rejectAll("SharedWorker was terminated unexpectedly.");
port.close();
};

return { worker, port: channel.port1 };
return { worker, port };
}

/**
Expand Down Expand Up @@ -176,12 +175,14 @@ export function initializeWorker(
};

port.onmessageerror = (e: MessageEvent) => {
console.error("Worker encountered a message error:", e);
const error = new WorkerInitializationError("Worker failed to initialize");
console.error("SharedWorker encountered a message error:", e);
const error = new WorkerInitializationError(
"SharedWorker failed to communicate properly."
);
rejectReady(error);
rejectAllPendingRequests(
pendingRequests,
"Worker encountered an error and was terminated."
"SharedWorker encountered an error and was terminated."
);
port.close();
};
Expand Down Expand Up @@ -229,13 +230,13 @@ export async function sendMessageToWorker<T extends WorkerMessage["type"]>(
port.postMessage(message);
}
} catch (error) {
console.error("Failed to post message to worker:", error);
console.error("Failed to post message to SharedWorker:", error);
const pending = pendingRequests.get(requestId);
if (pending) {
clearTimeout(pending.timer);
pending.reject(
new WorkerInitializationError(
"Failed to communicate with the worker."
"Failed to communicate with the SharedWorker."
)
);
pendingRequests.delete(requestId);
Expand Down

0 comments on commit d64da15

Please sign in to comment.