diff --git a/packages/api-dynamodb-to-elasticsearch/src/Operations.ts b/packages/api-dynamodb-to-elasticsearch/src/Operations.ts index d69ec29a1ed..0a52c7cb0e4 100644 --- a/packages/api-dynamodb-to-elasticsearch/src/Operations.ts +++ b/packages/api-dynamodb-to-elasticsearch/src/Operations.ts @@ -13,12 +13,20 @@ export enum OperationType { } export class Operations implements IOperations { - public readonly items: GenericRecord[] = []; + private _items: GenericRecord[] = []; + + public get items(): GenericRecord[] { + return this._items; + } public get total(): number { return this.items.length; } + public clear() { + this._items = []; + } + public insert(params: IInsertOperationParams): void { this.items.push( { diff --git a/packages/api-dynamodb-to-elasticsearch/src/SynchronizationBuilder.ts b/packages/api-dynamodb-to-elasticsearch/src/SynchronizationBuilder.ts new file mode 100644 index 00000000000..81fe68a73dd --- /dev/null +++ b/packages/api-dynamodb-to-elasticsearch/src/SynchronizationBuilder.ts @@ -0,0 +1,72 @@ +import { + Context, + IDeleteOperationParams, + IInsertOperationParams, + IModifyOperationParams, + IOperations +} from "~/types"; +import { Operations } from "~/Operations"; +import { executeWithRetry, IExecuteWithRetryParams } from "~/executeWithRetry"; +import { ITimer } from "@webiny/handler-aws"; + +export type ISynchronizationBuilderExecuteWithRetryParams = Omit< + IExecuteWithRetryParams, + "context" | "timer" | "maxRunningTime" | "operations" +>; + +export interface ISynchronizationBuilder { + insert(params: IInsertOperationParams): void; + delete(params: IDeleteOperationParams): void; + build: () => (params?: ISynchronizationBuilderExecuteWithRetryParams) => Promise; +} + +export interface ISynchronizationBuilderParams { + timer: ITimer; + context: Pick; +} + +export class SynchronizationBuilder implements ISynchronizationBuilder { + private readonly timer: ITimer; + private readonly context: Pick; + private readonly operations: IOperations; + + public constructor(params: ISynchronizationBuilderParams) { + this.timer = params.timer; + this.context = params.context; + this.operations = new Operations(); + } + + public insert(params: IInsertOperationParams): void { + return this.operations.insert(params); + } + + public modify(params: IModifyOperationParams): void { + return this.operations.modify(params); + } + + public delete(params: IDeleteOperationParams): void { + return this.operations.delete(params); + } + + public build() { + return async (params?: ISynchronizationBuilderExecuteWithRetryParams) => { + if (this.operations.total === 0) { + return; + } + await executeWithRetry({ + ...params, + maxRunningTime: this.timer.getRemainingMilliseconds(), + timer: this.timer, + context: this.context, + operations: this.operations + }); + this.operations.clear(); + }; + } +} + +export const createSynchronizationBuilder = ( + params: ISynchronizationBuilderParams +): ISynchronizationBuilder => { + return new SynchronizationBuilder(params); +}; diff --git a/packages/api-dynamodb-to-elasticsearch/src/eventHandler.ts b/packages/api-dynamodb-to-elasticsearch/src/eventHandler.ts index ef4b1e0f58f..c683a72c9a6 100644 --- a/packages/api-dynamodb-to-elasticsearch/src/eventHandler.ts +++ b/packages/api-dynamodb-to-elasticsearch/src/eventHandler.ts @@ -1,15 +1,9 @@ -import { getNumberEnvVariable } from "~/helpers/getNumberEnvVariable"; import { createDynamoDBEventHandler, timerFactory } from "@webiny/handler-aws"; -import { ElasticsearchContext } from "@webiny/api-elasticsearch/types"; +import { Context } from "~/types"; import { Decompressor } from "~/Decompressor"; import { OperationsBuilder } from "~/OperationsBuilder"; import { executeWithRetry } from "~/executeWithRetry"; -const MAX_PROCESSOR_PERCENT = getNumberEnvVariable( - "MAX_ES_PROCESSOR", - process.env.NODE_ENV === "test" ? 101 : 98 -); - /** * Also, we need to set the maximum running time for the Lambda Function. * https://github.com/webiny/webiny-js/blob/f7352d418da2b5ae0b781376be46785aa7ac6ae0/packages/pulumi-aws/src/apps/core/CoreOpenSearch.ts#L232 @@ -20,7 +14,7 @@ const MAX_RUNNING_TIME = 900; export const createEventHandler = () => { return createDynamoDBEventHandler(async ({ event, context: ctx, lambdaContext }) => { const timer = timerFactory(lambdaContext); - const context = ctx as unknown as ElasticsearchContext; + const context = ctx as unknown as Context; if (!context.elasticsearch) { console.error("Missing elasticsearch definition on context."); return null; @@ -49,7 +43,6 @@ export const createEventHandler = () => { await executeWithRetry({ timer, maxRunningTime: MAX_RUNNING_TIME, - maxProcessorPercent: MAX_PROCESSOR_PERCENT, context, operations }); diff --git a/packages/api-dynamodb-to-elasticsearch/src/execute.ts b/packages/api-dynamodb-to-elasticsearch/src/execute.ts index 2bbcd6b5569..a050f6fb691 100644 --- a/packages/api-dynamodb-to-elasticsearch/src/execute.ts +++ b/packages/api-dynamodb-to-elasticsearch/src/execute.ts @@ -5,9 +5,9 @@ import { WaitingHealthyClusterAbortedError } from "@webiny/api-elasticsearch"; import { ITimer } from "@webiny/handler-aws"; -import { ApiResponse, ElasticsearchContext } from "@webiny/api-elasticsearch/types"; +import { ApiResponse } from "@webiny/api-elasticsearch/types"; import { WebinyError } from "@webiny/error"; -import { IOperations } from "./types"; +import { Context, IOperations } from "./types"; export interface BulkOperationsResponseBodyItemIndexError { reason?: string; @@ -30,8 +30,8 @@ export interface IExecuteParams { timer: ITimer; maxRunningTime: number; maxProcessorPercent: number; - context: Pick; - operations: IOperations; + context: Pick; + operations: Pick; } const getError = (item: BulkOperationsResponseBodyItem): string | null => { @@ -67,6 +67,11 @@ const checkErrors = (result?: ApiResponse): void => export const execute = (params: IExecuteParams) => { return async (): Promise => { const { context, timer, maxRunningTime, maxProcessorPercent, operations } = params; + + if (operations.total === 0) { + return; + } + const remainingTime = timer.getRemainingSeconds(); const runningTime = maxRunningTime - remainingTime; const maxWaitingTime = remainingTime - 90; diff --git a/packages/api-dynamodb-to-elasticsearch/src/executeWithRetry.ts b/packages/api-dynamodb-to-elasticsearch/src/executeWithRetry.ts index f9e442fe08d..2411b7a27a9 100644 --- a/packages/api-dynamodb-to-elasticsearch/src/executeWithRetry.ts +++ b/packages/api-dynamodb-to-elasticsearch/src/executeWithRetry.ts @@ -5,11 +5,17 @@ import { getNumberEnvVariable } from "./helpers/getNumberEnvVariable"; const minRemainingSecondsToTimeout = 120; -export interface IExecuteWithRetryParams extends IExecuteParams { +const MAX_PROCESSOR_PERCENT = getNumberEnvVariable( + "MAX_ES_PROCESSOR", + process.env.NODE_ENV === "test" ? 101 : 98 +); + +export interface IExecuteWithRetryParams extends Omit { maxRetryTime?: number; retries?: number; minTimeout?: number; maxTimeout?: number; + maxProcessorPercent?: number; } export const executeWithRetry = async (params: IExecuteWithRetryParams) => { @@ -35,7 +41,7 @@ export const executeWithRetry = async (params: IExecuteWithRetryParams) => { execute({ timer: params.timer, maxRunningTime: params.maxRunningTime, - maxProcessorPercent: params.maxProcessorPercent, + maxProcessorPercent: params.maxProcessorPercent || MAX_PROCESSOR_PERCENT, context: params.context, operations: params.operations }), diff --git a/packages/api-dynamodb-to-elasticsearch/src/index.ts b/packages/api-dynamodb-to-elasticsearch/src/index.ts index 99a39edec60..9c6d27a1d8b 100644 --- a/packages/api-dynamodb-to-elasticsearch/src/index.ts +++ b/packages/api-dynamodb-to-elasticsearch/src/index.ts @@ -6,4 +6,5 @@ export * from "./marshall"; export * from "./NotEnoughRemainingTimeError"; export * from "./Operations"; export * from "./OperationsBuilder"; +export * from "./SynchronizationBuilder"; export * from "./types"; diff --git a/packages/api-dynamodb-to-elasticsearch/src/types.ts b/packages/api-dynamodb-to-elasticsearch/src/types.ts index 2707505d00e..c08e976d72e 100644 --- a/packages/api-dynamodb-to-elasticsearch/src/types.ts +++ b/packages/api-dynamodb-to-elasticsearch/src/types.ts @@ -1,5 +1,8 @@ import { GenericRecord } from "@webiny/cli/types"; import { DynamoDBRecord } from "@webiny/handler-aws/types"; +import { ElasticsearchContext } from "@webiny/api-elasticsearch/types"; + +export type Context = Pick; export interface IOperationsBuilderBuildParams { records: DynamoDBRecord[]; @@ -25,6 +28,7 @@ export interface IDeleteOperationParams { export interface IOperations { items: GenericRecord[]; total: number; + clear(): void; insert(params: IInsertOperationParams): void; modify(params: IModifyOperationParams): void; delete(params: IDeleteOperationParams): void; diff --git a/packages/api-elasticsearch-tasks/__tests__/mocks/store.ts b/packages/api-elasticsearch-tasks/__tests__/mocks/store.ts index 860790ed613..ad4aa8979bc 100644 --- a/packages/api-elasticsearch-tasks/__tests__/mocks/store.ts +++ b/packages/api-elasticsearch-tasks/__tests__/mocks/store.ts @@ -1,5 +1,5 @@ import { TaskManagerStore } from "@webiny/tasks/runner/TaskManagerStore"; -import { Context, ITask, ITaskLog } from "@webiny/tasks/types"; +import { Context, ITask, ITaskDataInput, ITaskLog } from "@webiny/tasks/types"; import { createTaskMock } from "~tests/mocks/task"; import { createContextMock } from "~tests/mocks/context"; import { createTaskLogMock } from "~tests/mocks/log"; @@ -9,11 +9,11 @@ interface Params { task?: ITask; log?: ITaskLog; } -export const createTaskManagerStoreMock = (params?: Params) => { +export const createTaskManagerStoreMock = (params?: Params) => { const context = params?.context || createContextMock(); const task = params?.task || createTaskMock(); const log = params?.log || createTaskLogMock(task); - return new TaskManagerStore({ + return new TaskManagerStore({ context, task, log diff --git a/packages/api-elasticsearch-tasks/__tests__/tasks/dataSynchronization/ElasticsearchToDynamoDbSynchronization.test.ts b/packages/api-elasticsearch-tasks/__tests__/tasks/dataSynchronization/ElasticsearchToDynamoDbSynchronization.test.ts new file mode 100644 index 00000000000..2ff6722df36 --- /dev/null +++ b/packages/api-elasticsearch-tasks/__tests__/tasks/dataSynchronization/ElasticsearchToDynamoDbSynchronization.test.ts @@ -0,0 +1,343 @@ +import { ElasticsearchToDynamoDbSynchronization } from "~/tasks/dataSynchronization/elasticsearch/ElasticsearchToDynamoDbSynchronization"; +import { useHandler } from "~tests/helpers/useHandler"; +import { createManagers } from "./managers"; +import { ElasticsearchFetcher } from "~/tasks/dataSynchronization/elasticsearch/ElasticsearchFetcher"; +import { ElasticsearchSynchronize } from "~/tasks/dataSynchronization/elasticsearch/ElasticsearchSynchronize"; +import { createDataSynchronization, DATA_SYNCHRONIZATION_TASK } from "~/tasks"; +import { Context, SynchronizationBuilder } from "@webiny/api-dynamodb-to-elasticsearch"; +import { ITimer } from "@webiny/handler-aws"; +import { IIndexManager } from "~/settings/types"; +import { createRunner } from "@webiny/project-utils/testing/tasks"; +import { TaskResponseStatus } from "@webiny/tasks"; + +const queryAllRecords = (index: string) => { + return { + index, + body: { + query: { + match_all: {} + }, + size: 10000, + _source: false + } + }; +}; + +interface ICreateSyncBuilderParams { + records: number; + timer: ITimer; + context: Pick; + index: string; +} + +const createRecordsFactory = (params: ICreateSyncBuilderParams) => { + const { timer, context, index, records } = params; + const syncBuilder = new SynchronizationBuilder({ + timer, + context + }); + + for (let i = 0; i < records; i++) { + syncBuilder.insert({ + id: `pkValue${i}:skValue${i}`, + index, + data: { + id: `skValue${i}`, + aText: `myText - ${i}` + } + }); + } + return { + run: () => { + return syncBuilder.build()(); + } + }; +}; + +const getTaskIndex = async (manager: IIndexManager): Promise => { + const indexes = await manager.list(); + const index = indexes.find( + index => index.includes("webinytask") && index.includes("-headless-cms-") + ); + if (!index) { + throw new Error("No index found."); + } + return index; +}; + +describe("ElasticsearchToDynamoDbSynchronization", () => { + it("should run a sync without any indexes and throw an error", async () => { + const handler = useHandler(); + + const context = await handler.rawHandle(); + + const { manager, indexManager } = createManagers({ + context + }); + + const sync = new ElasticsearchToDynamoDbSynchronization({ + manager, + indexManager, + fetcher: new ElasticsearchFetcher({ + client: context.elasticsearch + }), + synchronize: new ElasticsearchSynchronize({ + context, + timer: manager.timer + }) + }); + + try { + const result = await sync.run({ + flow: "elasticsearchToDynamoDb", + skipDryRun: true + }); + expect(result).toEqual("Should not reach this point."); + } catch (ex) { + expect(ex.message).toBe("No Elasticsearch / OpenSearch indexes found."); + } + }); + + it("should run a sync with indexes and return done on dry run", async () => { + const handler = useHandler(); + + const context = await handler.rawHandle(); + + const task = await context.tasks.createTask({ + definitionId: DATA_SYNCHRONIZATION_TASK, + input: { + flow: "elasticsearchToDynamoDb" + }, + name: "Data Sync Mock Task" + }); + + const { manager, indexManager } = createManagers({ + context + }); + + const index = await getTaskIndex(indexManager); + + const totalMockItemsToInsert = 101; + const recordsFactory = createRecordsFactory({ + context, + index, + timer: manager.timer, + records: totalMockItemsToInsert + }); + try { + await recordsFactory.run(); + } catch (ex) { + expect(ex.message).toBe("Should not reach this point."); + } + /** + * Now we need to make sure that the mock data is in the index. + */ + const response = await context.elasticsearch.search(queryAllRecords(index)); + expect(response.body.hits.hits).toHaveLength(totalMockItemsToInsert + 1); + + const runner = createRunner({ + context, + task: createDataSynchronization(), + onContinue: async () => { + return; + } + }); + + const result = await runner({ + webinyTaskId: task.id, + locale: "en-US", + tenant: "root" + }); + + expect(result).toEqual({ + locale: "en-US", + message: "Dry run.", + output: { + keys: [ + "pkValue0:skValue0", + "pkValue1:skValue1", + "pkValue10:skValue10", + "pkValue100:skValue100", + "pkValue11:skValue11", + "pkValue12:skValue12", + "pkValue13:skValue13", + "pkValue14:skValue14", + "pkValue15:skValue15", + "pkValue16:skValue16", + "pkValue17:skValue17", + "pkValue18:skValue18", + "pkValue19:skValue19", + "pkValue2:skValue2", + "pkValue20:skValue20", + "pkValue21:skValue21", + "pkValue22:skValue22", + "pkValue23:skValue23", + "pkValue24:skValue24", + "pkValue25:skValue25", + "pkValue26:skValue26", + "pkValue27:skValue27", + "pkValue28:skValue28", + "pkValue29:skValue29", + "pkValue3:skValue3", + "pkValue30:skValue30", + "pkValue31:skValue31", + "pkValue32:skValue32", + "pkValue33:skValue33", + "pkValue34:skValue34", + "pkValue35:skValue35", + "pkValue36:skValue36", + "pkValue37:skValue37", + "pkValue38:skValue38", + "pkValue39:skValue39", + "pkValue4:skValue4", + "pkValue40:skValue40", + "pkValue41:skValue41", + "pkValue42:skValue42", + "pkValue43:skValue43", + "pkValue44:skValue44", + "pkValue45:skValue45", + "pkValue46:skValue46", + "pkValue47:skValue47", + "pkValue48:skValue48", + "pkValue49:skValue49", + "pkValue5:skValue5", + "pkValue50:skValue50", + "pkValue51:skValue51", + "pkValue52:skValue52", + "pkValue53:skValue53", + "pkValue54:skValue54", + "pkValue55:skValue55", + "pkValue56:skValue56", + "pkValue57:skValue57", + "pkValue58:skValue58", + "pkValue59:skValue59", + "pkValue6:skValue6", + "pkValue60:skValue60", + "pkValue61:skValue61", + "pkValue62:skValue62", + "pkValue63:skValue63", + "pkValue64:skValue64", + "pkValue65:skValue65", + "pkValue66:skValue66", + "pkValue67:skValue67", + "pkValue68:skValue68", + "pkValue69:skValue69", + "pkValue7:skValue7", + "pkValue70:skValue70", + "pkValue71:skValue71", + "pkValue72:skValue72", + "pkValue73:skValue73", + "pkValue74:skValue74", + "pkValue75:skValue75", + "pkValue76:skValue76", + "pkValue77:skValue77", + "pkValue78:skValue78", + "pkValue79:skValue79", + "pkValue8:skValue8", + "pkValue80:skValue80", + "pkValue81:skValue81", + "pkValue82:skValue82", + "pkValue83:skValue83", + "pkValue84:skValue84", + "pkValue85:skValue85", + "pkValue86:skValue86", + "pkValue87:skValue87", + "pkValue88:skValue88", + "pkValue89:skValue89", + "pkValue9:skValue9", + "pkValue90:skValue90", + "pkValue91:skValue91", + "pkValue92:skValue92", + "pkValue93:skValue93", + "pkValue94:skValue94", + "pkValue95:skValue95", + "pkValue96:skValue96", + "pkValue97:skValue97", + "pkValue98:skValue98", + "pkValue99:skValue99" + ] + }, + status: TaskResponseStatus.DONE, + tenant: "root", + webinyTaskDefinitionId: task.definitionId, + webinyTaskId: task.id + }); + }); + + it("should run a sync with indexes and finish", async () => { + const handler = useHandler(); + + const context = await handler.rawHandle(); + + await context.tasks.createTask({ + definitionId: DATA_SYNCHRONIZATION_TASK, + input: { + flow: "elasticsearchToDynamoDb" + }, + name: "Data Sync Mock Task" + }); + + const { manager, indexManager } = createManagers({ + context + }); + + const index = await getTaskIndex(indexManager); + + const totalMockItemsToInsert = 101; + const recordsFactory = createRecordsFactory({ + context, + index, + timer: manager.timer, + records: totalMockItemsToInsert + }); + try { + await recordsFactory.run(); + } catch (ex) { + expect(ex.message).toBe("Should not reach this point."); + } + /** + * Now we need to make sure that the mock data is in the index. + */ + const response = await context.elasticsearch.search(queryAllRecords(index)); + expect(response.body.hits.hits).toHaveLength(totalMockItemsToInsert + 1); + + const sync = new ElasticsearchToDynamoDbSynchronization({ + manager, + indexManager, + fetcher: new ElasticsearchFetcher({ + client: context.elasticsearch + }), + synchronize: new ElasticsearchSynchronize({ + context, + timer: manager.timer + }) + }); + + const result = await sync.run({ + flow: "elasticsearchToDynamoDb", + skipDryRun: true + }); + expect(result).toEqual({ + delay: -1, + input: { + elasticsearchToDynamoDb: { + finished: true + }, + flow: "elasticsearchToDynamoDb", + skipDryRun: true + }, + locale: "en-US", + message: undefined, + status: "continue", + tenant: "root", + wait: undefined, + webinyTaskDefinitionId: "mockDefinitionId", + webinyTaskId: "mockEventId" + }); + /** + * Now we need to make sure that the mock data is not in the index anymore. + */ + const afterRunResponse = await context.elasticsearch.search(queryAllRecords(index)); + expect(afterRunResponse.body.hits.hits).toHaveLength(1); + }); +}); diff --git a/packages/api-elasticsearch-tasks/__tests__/tasks/dataSynchronization/dataSynchronizationTask.test.ts b/packages/api-elasticsearch-tasks/__tests__/tasks/dataSynchronization/dataSynchronizationTask.test.ts new file mode 100644 index 00000000000..ac5130822d8 --- /dev/null +++ b/packages/api-elasticsearch-tasks/__tests__/tasks/dataSynchronization/dataSynchronizationTask.test.ts @@ -0,0 +1,121 @@ +import { createDataSynchronization, DATA_SYNCHRONIZATION_TASK } from "~/tasks"; +import { TaskDefinitionPlugin, TaskResponseStatus } from "@webiny/tasks"; +import { createRunner } from "@webiny/project-utils/testing/tasks"; +import { useHandler } from "~tests/helpers/useHandler"; +import { IDataSynchronizationInput, IFactories } from "~/tasks/dataSynchronization/types"; + +jest.mock("~/tasks/dataSynchronization/createFactories", () => { + return { + createFactories: (): IFactories => { + return { + elasticsearchToDynamoDb: ({ manager }) => { + return { + run: async input => { + return manager.response.continue({ + ...input, + elasticsearchToDynamoDb: { + finished: true + } + }); + } + }; + } + }; + } + }; +}); + +describe("data synchronization - elasticsearch", () => { + it("should create a task definition", async () => { + const result = createDataSynchronization(); + + expect(result).toBeInstanceOf(TaskDefinitionPlugin); + expect(result).toEqual({ + isPrivate: false, + task: { + id: DATA_SYNCHRONIZATION_TASK, + isPrivate: false, + title: "Data Synchronization", + description: "Synchronize data between Elasticsearch and DynamoDB", + maxIterations: 100, + disableDatabaseLogs: true, + fields: [], + run: expect.any(Function), + createInputValidation: expect.any(Function) + } + }); + }); + + it("should run a task and end with error due to invalid flow", async () => { + const handler = useHandler({}); + + const context = await handler.rawHandle(); + + try { + const task = await context.tasks.createTask({ + definitionId: DATA_SYNCHRONIZATION_TASK, + input: { + // @ts-expect-error + flow: "unknownFlow", + skipDryRun: true + }, + name: "Data Sync Mock Task" + }); + expect(task).toEqual("Should not reach this point."); + } catch (ex) { + expect(ex.message).toEqual("Validation failed."); + expect(ex.data).toEqual({ + invalidFields: { + flow: { + code: "invalid_enum_value", + data: { + fatal: undefined, + path: ["flow"] + }, + message: + "Invalid enum value. Expected 'elasticsearchToDynamoDb', received 'unknownFlow'" + } + } + }); + } + }); + + it("should run a task and end with done", async () => { + const handler = useHandler({}); + + const context = await handler.rawHandle(); + + const task = await context.tasks.createTask({ + definitionId: DATA_SYNCHRONIZATION_TASK, + input: { + flow: "elasticsearchToDynamoDb", + skipDryRun: true + }, + name: "Data Sync Mock Task" + }); + + const runner = createRunner({ + context, + task: createDataSynchronization(), + onContinue: async () => { + return; + } + }); + + const result = await runner({ + webinyTaskId: task.id + }); + + expect(result).toEqual({ + status: TaskResponseStatus.DONE, + webinyTaskId: task.id, + webinyTaskDefinitionId: DATA_SYNCHRONIZATION_TASK, + tenant: "root", + locale: "en-US", + message: undefined, + output: undefined + }); + const taskCheck = await context.tasks.getTask(task.id); + expect(taskCheck?.iterations).toEqual(2); + }); +}); diff --git a/packages/api-elasticsearch-tasks/__tests__/tasks/dataSynchronization/managers.ts b/packages/api-elasticsearch-tasks/__tests__/tasks/dataSynchronization/managers.ts new file mode 100644 index 00000000000..1ac6803de3c --- /dev/null +++ b/packages/api-elasticsearch-tasks/__tests__/tasks/dataSynchronization/managers.ts @@ -0,0 +1,40 @@ +import { IndexManager } from "~/settings"; +import { + IDataSynchronizationInput, + IDataSynchronizationManager +} from "~/tasks/dataSynchronization/types"; +import { Context } from "~/types"; +import { Manager } from "~/tasks/Manager"; +import { Response, TaskResponse } from "@webiny/tasks"; +import { createMockEvent } from "~tests/mocks/event"; +import { createTaskManagerStoreMock } from "~tests/mocks/store"; +import { timerFactory } from "@webiny/handler-aws/utils"; + +export interface ICreateManagersParams { + context: Context; +} + +export const createManagers = (params: ICreateManagersParams) => { + const { context } = params; + const manager = new Manager({ + elasticsearchClient: context.elasticsearch, + // @ts-expect-error + documentClient: context.db.driver.documentClient, + response: new TaskResponse(new Response(createMockEvent())), + context, + isAborted: () => { + return false; + }, + isCloseToTimeout: () => { + return false; + }, + timer: timerFactory(), + store: createTaskManagerStoreMock() + }); + + const indexManager = new IndexManager(context.elasticsearch, {}); + return { + manager: manager as unknown as IDataSynchronizationManager, + indexManager + }; +}; diff --git a/packages/api-elasticsearch-tasks/jest.setup.js b/packages/api-elasticsearch-tasks/jest.setup.js index 049095053ae..e9fae563a72 100644 --- a/packages/api-elasticsearch-tasks/jest.setup.js +++ b/packages/api-elasticsearch-tasks/jest.setup.js @@ -1,6 +1,8 @@ const base = require("../../jest.config.base"); const presets = require("@webiny/project-utils/testing/presets")( ["@webiny/api-headless-cms", "storage-operations"], + ["@webiny/api-form-builder", "storage-operations"], + ["@webiny/api-page-builder", "storage-operations"], ["@webiny/api-i18n", "storage-operations"], ["@webiny/api-security", "storage-operations"], ["@webiny/api-tenancy", "storage-operations"] diff --git a/packages/api-elasticsearch-tasks/package.json b/packages/api-elasticsearch-tasks/package.json index 3a0debe2738..9cb842d392c 100644 --- a/packages/api-elasticsearch-tasks/package.json +++ b/packages/api-elasticsearch-tasks/package.json @@ -14,8 +14,10 @@ "dependencies": { "@babel/runtime": "^7.24.0", "@webiny/api": "0.0.0", + "@webiny/api-dynamodb-to-elasticsearch": "0.0.0", "@webiny/api-elasticsearch": "0.0.0", "@webiny/aws-sdk": "0.0.0", + "@webiny/db": "0.0.0", "@webiny/db-dynamodb": "0.0.0", "@webiny/error": "0.0.0", "@webiny/tasks": "0.0.0", @@ -52,5 +54,12 @@ "build": "yarn webiny run build", "watch": "yarn webiny run watch" }, - "gitHead": "8476da73b653c89cc1474d968baf55c1b0ae0e5f" + "gitHead": "8476da73b653c89cc1474d968baf55c1b0ae0e5f", + "adio": { + "ignore": { + "src": [ + "node:util" + ] + } + } } diff --git a/packages/api-elasticsearch-tasks/src/definitions/entry.ts b/packages/api-elasticsearch-tasks/src/definitions/entry.ts index b985c523455..fa3c9bb7afd 100644 --- a/packages/api-elasticsearch-tasks/src/definitions/entry.ts +++ b/packages/api-elasticsearch-tasks/src/definitions/entry.ts @@ -1,7 +1,7 @@ -import { Entity, Table } from "@webiny/db-dynamodb/toolbox"; +import { Entity, TableDef } from "@webiny/db-dynamodb/toolbox"; interface Params { - table: Table; + table: TableDef; entityName: string; } diff --git a/packages/api-elasticsearch-tasks/src/definitions/table.ts b/packages/api-elasticsearch-tasks/src/definitions/table.ts index cc1f5e7a50e..5bd160dfb1e 100644 --- a/packages/api-elasticsearch-tasks/src/definitions/table.ts +++ b/packages/api-elasticsearch-tasks/src/definitions/table.ts @@ -1,11 +1,11 @@ import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb"; -import { Table, TableConstructor } from "@webiny/db-dynamodb/toolbox"; +import { Table, TableConstructor, TableDef } from "@webiny/db-dynamodb/toolbox"; interface Params { documentClient: DynamoDBDocument; } -export const createTable = ({ documentClient }: Params): Table => { +export const createTable = ({ documentClient }: Params): TableDef => { const config: TableConstructor = { name: process.env.DB_TABLE_ELASTICSEARCH as string, partitionKey: "PK", diff --git a/packages/api-elasticsearch-tasks/src/helpers/scan.ts b/packages/api-elasticsearch-tasks/src/helpers/scan.ts index 1f5d4141dbf..2440c0deba5 100644 --- a/packages/api-elasticsearch-tasks/src/helpers/scan.ts +++ b/packages/api-elasticsearch-tasks/src/helpers/scan.ts @@ -1,11 +1,11 @@ import { scan as tableScan, ScanOptions } from "@webiny/db-dynamodb"; -import { Table } from "@webiny/db-dynamodb/toolbox"; +import { TableDef } from "@webiny/db-dynamodb/toolbox"; import { IElasticsearchIndexingTaskValuesKeys } from "~/types"; interface Params { - table: Table; + table: TableDef; keys?: IElasticsearchIndexingTaskValuesKeys; - options?: Pick; + options?: ScanOptions; } export const scan = async (params: Params) => { @@ -13,9 +13,9 @@ export const scan = async (params: Params) => { return tableScan({ table, options: { + ...params.options, startKey: keys, - limit: 200, - ...params.options + limit: params.options?.limit || 200 } }); }; diff --git a/packages/api-elasticsearch-tasks/src/index.ts b/packages/api-elasticsearch-tasks/src/index.ts index e8ca3b86175..12dfab884d1 100644 --- a/packages/api-elasticsearch-tasks/src/index.ts +++ b/packages/api-elasticsearch-tasks/src/index.ts @@ -1,4 +1,5 @@ import { + createDataSynchronization, createElasticsearchReindexingTask, createEnableIndexingTask, createIndexesTaskDefinition @@ -14,7 +15,8 @@ export const createElasticsearchBackgroundTasks = ( return [ createElasticsearchReindexingTask(params), createEnableIndexingTask(params), - createIndexesTaskDefinition(params) + createIndexesTaskDefinition(params), + createDataSynchronization(params) ]; }; diff --git a/packages/api-elasticsearch-tasks/src/settings/IndexManager.ts b/packages/api-elasticsearch-tasks/src/settings/IndexManager.ts index 289e70cd2b9..94a9dca0d32 100644 --- a/packages/api-elasticsearch-tasks/src/settings/IndexManager.ts +++ b/packages/api-elasticsearch-tasks/src/settings/IndexManager.ts @@ -11,6 +11,22 @@ const defaultIndexSettings: IIndexSettingsValues = { refreshInterval: "1s" }; +export interface IListIndicesResponse { + index: string; +} + +const indexPrefix = process.env.ELASTIC_SEARCH_INDEX_PREFIX || ""; +const filterIndex = (item?: string) => { + if (!item) { + return false; + } else if (item.startsWith(".")) { + return false; + } else if (indexPrefix) { + return item.startsWith(indexPrefix); + } + return true; +}; + export class IndexManager implements IIndexManager { private readonly client: Client; private readonly disable: DisableIndexing; @@ -41,13 +57,13 @@ export class IndexManager implements IIndexManager { public async list(): Promise { try { - const response = await this.client.cat.indices({ + const response = await this.client.cat.indices({ format: "json" }); if (!Array.isArray(response.body)) { return []; } - return response.body.map((item: any) => item.index).filter(Boolean); + return response.body.map(item => item.index).filter(filterIndex); } catch (ex) { console.error( JSON.stringify({ diff --git a/packages/api-elasticsearch-tasks/src/tasks/Manager.ts b/packages/api-elasticsearch-tasks/src/tasks/Manager.ts index 1fbebc97913..a62664742a0 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/Manager.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/Manager.ts @@ -1,11 +1,11 @@ import { DynamoDBDocument, getDocumentClient } from "@webiny/aws-sdk/client-dynamodb"; import { Client, createElasticsearchClient } from "@webiny/api-elasticsearch"; import { createTable } from "~/definitions"; -import { Context, IElasticsearchIndexingTaskValues, IManager } from "~/types"; +import { Context, IManager } from "~/types"; import { createEntry } from "~/definitions/entry"; import { Entity } from "@webiny/db-dynamodb/toolbox"; import { ITaskResponse } from "@webiny/tasks/response/abstractions"; -import { ITaskManagerStore } from "@webiny/tasks/runner/abstractions"; +import { IIsCloseToTimeoutCallable, ITaskManagerStore } from "@webiny/tasks/runner/abstractions"; import { batchReadAll, BatchReadItem, @@ -13,30 +13,33 @@ import { BatchWriteItem, BatchWriteResult } from "@webiny/db-dynamodb"; +import { ITimer } from "@webiny/handler-aws/utils"; -export interface ManagerParams { +export interface ManagerParams { context: Context; documentClient?: DynamoDBDocument; elasticsearchClient?: Client; - isCloseToTimeout: () => boolean; + isCloseToTimeout: IIsCloseToTimeoutCallable; isAborted: () => boolean; response: ITaskResponse; - store: ITaskManagerStore; + store: ITaskManagerStore; + timer: ITimer; } -export class Manager implements IManager { +export class Manager implements IManager { public readonly documentClient: DynamoDBDocument; public readonly elasticsearch: Client; public readonly context: Context; public readonly table: ReturnType; - public readonly isCloseToTimeout: () => boolean; + public readonly isCloseToTimeout: IIsCloseToTimeoutCallable; public readonly isAborted: () => boolean; public readonly response: ITaskResponse; - public readonly store: ITaskManagerStore; + public readonly store: ITaskManagerStore; + public readonly timer: ITimer; private readonly entities: Record> = {}; - public constructor(params: ManagerParams) { + public constructor(params: ManagerParams) { this.context = params.context; this.documentClient = params?.documentClient || getDocumentClient(); @@ -58,6 +61,7 @@ export class Manager implements IManager { }; this.response = params.response; this.store = params.store; + this.timer = params.timer; } public getEntity(name: string): Entity { diff --git a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/CreateIndexesTaskRunner.ts b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/CreateIndexesTaskRunner.ts index c9d65ce92e4..62b572e0845 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/CreateIndexesTaskRunner.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/CreateIndexesTaskRunner.ts @@ -6,12 +6,16 @@ import { CreateElasticsearchIndexTaskPluginIndex } from "./CreateElasticsearchIndexTaskPlugin"; import { Context } from "~/types"; +import { IElasticsearchCreateIndexesTaskInput } from "~/tasks/createIndexes/types"; export class CreateIndexesTaskRunner { - private readonly manager: Manager; + private readonly manager: Manager; private readonly indexManager: IndexManager; - public constructor(manager: Manager, indexManager: IndexManager) { + public constructor( + manager: Manager, + indexManager: IndexManager + ) { this.manager = manager; this.indexManager = indexManager; diff --git a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/index.ts b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/index.ts index a84529c4b74..46850d171b2 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/createIndexes/index.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/createIndexes/index.ts @@ -12,23 +12,24 @@ export const createIndexesTaskDefinition = (params?: IElasticsearchTaskConfig) = * No point in having more than 2 runs, as the create index operations should not even take 1 full run, no matter how much indeexs is there to create. */ maxIterations: 2, - run: async ({ response, context, isCloseToTimeout, isAborted, store, input }) => { + run: async ({ response, context, isCloseToTimeout, isAborted, store, input, timer }) => { const { Manager } = await import( - /* webpackChunkName: "ElasticsearchTaskManager" */ + /* webpackChunkName: "Manager" */ "../Manager" ); const { IndexManager } = await import( - /* webpackChunkName: "ElasticsearchTaskSettings" */ "~/settings" + /* webpackChunkName: "IndexManager" */ "~/settings" ); - const manager = new Manager({ + const manager = new Manager({ elasticsearchClient: params?.elasticsearchClient, documentClient: params?.documentClient, response, context, isAborted, isCloseToTimeout, - store + store, + timer }); const indexManager = new IndexManager(manager.elasticsearch, {}); diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/DataSynchronizationTaskRunner.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/DataSynchronizationTaskRunner.ts new file mode 100644 index 00000000000..5e3b1231eae --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/DataSynchronizationTaskRunner.ts @@ -0,0 +1,69 @@ +import { + IDataSynchronizationInput, + IDataSynchronizationManager, + IFactories +} from "~/tasks/dataSynchronization/types"; +import { IIndexManager } from "~/settings/types"; +import { ElasticsearchSynchronize } from "~/tasks/dataSynchronization/elasticsearch/ElasticsearchSynchronize"; +import { ElasticsearchFetcher } from "~/tasks/dataSynchronization/elasticsearch/ElasticsearchFetcher"; + +export interface IDataSynchronizationTaskRunnerParams { + manager: IDataSynchronizationManager; + indexManager: IIndexManager; + factories: IFactories; +} + +export class DataSynchronizationTaskRunner { + private readonly manager: IDataSynchronizationManager; + private readonly indexManager: IIndexManager; + private readonly factories: IFactories; + + public constructor(params: IDataSynchronizationTaskRunnerParams) { + this.manager = params.manager; + this.indexManager = params.indexManager; + this.factories = params.factories; + } + + public async run(input: IDataSynchronizationInput) { + this.validateFlow(input); + /** + * Go through the Elasticsearch and delete records which do not exist in the Elasticsearch table. + */ + // + if (input.flow === "elasticsearchToDynamoDb" && !input.elasticsearchToDynamoDb?.finished) { + const sync = this.factories.elasticsearchToDynamoDb({ + manager: this.manager, + indexManager: this.indexManager, + synchronize: new ElasticsearchSynchronize({ + context: this.manager.context, + timer: this.manager.timer + }), + fetcher: new ElasticsearchFetcher({ + client: this.manager.elasticsearch + }) + }); + try { + return await sync.run(input); + } catch (ex) { + return this.manager.response.error(ex); + } + } + /** + * We are done. + */ + return this.manager.response.done(); + } + + private validateFlow(input: IDataSynchronizationInput): void { + if (!input.flow) { + throw new Error(`Missing "flow" in the input.`); + } else if (this.factories[input.flow]) { + return; + } + throw new Error( + `Invalid flow "${input.flow}". Allowed flows: ${Object.keys(this.factories).join( + ", " + )}.` + ); + } +} diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/createFactories.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/createFactories.ts new file mode 100644 index 00000000000..dd1e20cecb3 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/createFactories.ts @@ -0,0 +1,10 @@ +import { IFactories } from "./types"; +import { ElasticsearchToDynamoDbSynchronization } from "./elasticsearch/ElasticsearchToDynamoDbSynchronization"; + +export const createFactories = (): IFactories => { + return { + elasticsearchToDynamoDb: params => { + return new ElasticsearchToDynamoDbSynchronization(params); + } + }; +}; diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/ElasticsearchFetcher.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/ElasticsearchFetcher.ts new file mode 100644 index 00000000000..55f58abf601 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/ElasticsearchFetcher.ts @@ -0,0 +1,107 @@ +import { Client } from "@webiny/api-elasticsearch"; +import { + IElasticsearchFetcher, + IElasticsearchFetcherFetchParams, + IElasticsearchFetcherFetchResponse, + IElasticsearchFetcherFetchResponseItem +} from "./abstractions/ElasticsearchFetcher"; +import { ElasticsearchSearchResponse, PrimitiveValue } from "@webiny/api-elasticsearch/types"; +import { shouldIgnoreEsResponseError } from "./shouldIgnoreEsResponseError"; +import { inspect } from "node:util"; + +export interface IElasticsearchFetcherParams { + client: Client; +} + +export class ElasticsearchFetcher implements IElasticsearchFetcher { + private readonly client: Client; + + public constructor(params: IElasticsearchFetcherParams) { + this.client = params.client; + } + public async fetch({ + index, + cursor, + limit + }: IElasticsearchFetcherFetchParams): Promise { + let response: ElasticsearchSearchResponse; + try { + response = await this.client.search({ + index, + body: { + query: { + match_all: {} + }, + sort: { + "id.keyword": { + order: "asc" + } + }, + size: limit + 1, + track_total_hits: true, + search_after: cursor, + _source: false + } + }); + } catch (ex) { + /** + * If we ignore the error, we can continue with the next index. + */ + if (shouldIgnoreEsResponseError(ex)) { + if (process.env.DEBUG === "true") { + console.error( + inspect(ex, { + depth: 5, + showHidden: true + }) + ); + } + return { + done: true, + totalCount: 0, + items: [] + }; + } + console.error("Failed to fetch data from Elasticsearch.", ex); + throw ex; + } + + const { hits, total } = response.body.hits; + if (hits.length === 0) { + return { + done: true, + cursor: undefined, + totalCount: total.value, + items: [] + }; + } + + const hasMoreItems = hits.length > limit; + let nextCursor: PrimitiveValue[] | undefined; + if (hasMoreItems) { + hits.pop(); + nextCursor = hits.at(-1)?.sort; + } + const items = hits.reduce((collection, hit) => { + const [PK, SK] = hit._id.split(":"); + if (!PK || !SK) { + return collection; + } + collection.push({ + PK, + SK, + _id: hit._id, + index: hit._index + }); + + return collection; + }, []); + + return { + totalCount: total.value, + cursor: nextCursor, + done: !nextCursor, + items + }; + } +} diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/ElasticsearchSynchronize.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/ElasticsearchSynchronize.ts new file mode 100644 index 00000000000..10c7e2c3dbe --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/ElasticsearchSynchronize.ts @@ -0,0 +1,114 @@ +import { batchReadAll } from "@webiny/db-dynamodb"; +import { createSynchronizationBuilder } from "@webiny/api-dynamodb-to-elasticsearch"; +import { + getElasticsearchEntity, + getElasticsearchEntityType, + getTable, + IGetElasticsearchEntityTypeParams +} from "~/tasks/dataSynchronization/entities"; +import { ITimer } from "@webiny/handler-aws"; +import { Context } from "~/types"; +import { + IElasticsearchSynchronize, + IElasticsearchSynchronizeExecuteParams, + IElasticsearchSynchronizeExecuteResponse +} from "./abstractions/ElasticsearchSynchronize"; + +export interface IElasticsearchSynchronizeParams { + timer: ITimer; + context: Context; +} + +interface IDynamoDbItem { + PK: string; + SK: string; +} + +export class ElasticsearchSynchronize implements IElasticsearchSynchronize { + private readonly timer: ITimer; + private readonly context: Context; + + public constructor(params: IElasticsearchSynchronizeParams) { + this.timer = params.timer; + this.context = params.context; + } + + public async execute( + params: IElasticsearchSynchronizeExecuteParams + ): Promise { + const { items, done, index, skipDryRun } = params; + if (items.length === 0) { + return { + done: true, + keys: [] + }; + } + + const table = getTable({ + type: "es", + context: this.context + }); + + const readableItems = items.map(item => { + const entity = this.getEntity(item); + return entity.item.getBatch({ + PK: item.PK, + SK: item.SK + }); + }); + + const tableItems = await batchReadAll({ + items: readableItems, + table + }); + + const elasticsearchSyncBuilder = createSynchronizationBuilder({ + timer: this.timer, + context: this.context + }); + const keys: string[] = []; + /** + * We need to find the items we have in the Elasticsearch but not in the DynamoDB-Elasticsearch table. + */ + for (const item of items) { + const exists = tableItems.some(ddbItem => { + return ddbItem.PK === item.PK && ddbItem.SK === item.SK; + }); + if (exists) { + continue; + } + keys.push(item._id); + elasticsearchSyncBuilder.delete({ + index, + id: item._id + }); + } + /** + * If there is dry run, just return the done flag + items which are going to get deleted. + */ + if (skipDryRun === false) { + return { + done, + keys + }; + } + + const executeWithRetry = elasticsearchSyncBuilder.build(); + await executeWithRetry(); + + return { + done, + keys + }; + } + + private getEntity( + params: IGetElasticsearchEntityTypeParams + ): ReturnType { + const type = getElasticsearchEntityType(params); + return getElasticsearchEntity({ + type, + context: this.context + }); + } +} diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/ElasticsearchToDynamoDbSynchronization.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/ElasticsearchToDynamoDbSynchronization.ts new file mode 100644 index 00000000000..c7d15a59a28 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/ElasticsearchToDynamoDbSynchronization.ts @@ -0,0 +1,115 @@ +import { + IDataSynchronizationInput, + IDataSynchronizationManager, + IElasticsearchSyncParams, + ISynchronization, + ISynchronizationRunResult +} from "../types"; +import { IIndexManager } from "~/settings/types"; +import { NonEmptyArray } from "@webiny/api/types"; +import { IElasticsearchSynchronize } from "./abstractions/ElasticsearchSynchronize"; +import { IElasticsearchFetcher } from "./abstractions/ElasticsearchFetcher"; + +export class ElasticsearchToDynamoDbSynchronization implements ISynchronization { + private readonly manager: IDataSynchronizationManager; + private readonly indexManager: IIndexManager; + private readonly synchronize: IElasticsearchSynchronize; + private readonly fetcher: IElasticsearchFetcher; + + public constructor(params: IElasticsearchSyncParams) { + this.manager = params.manager; + this.indexManager = params.indexManager; + this.synchronize = params.synchronize; + this.fetcher = params.fetcher; + } + + public async run(input: IDataSynchronizationInput): Promise { + const lastIndex = input.elasticsearchToDynamoDb?.index; + let cursor = input.elasticsearchToDynamoDb?.cursor; + const indexes = await this.fetchAllIndexes(); + + let next = 0; + if (lastIndex) { + next = indexes.findIndex(index => index === lastIndex); + } + + let currentIndex = indexes[next]; + + const keys: string[] = []; + + while (currentIndex) { + if (this.manager.isAborted()) { + return this.manager.response.aborted(); + } + /** + * We will put 180 seconds because we are writing to the Elasticsearch/OpenSearch directly. + * We want to leave enough time for possible retries. + */ + // + else if (this.manager.isCloseToTimeout(180)) { + /** + * If run is a dru run, we will not continue, we will return keys which are going to get deleted. + */ + if (input.skipDryRun === false) { + return this.manager.response.done("Dry run.", { + keys + }); + } + return this.manager.response.continue({ + ...input, + elasticsearchToDynamoDb: { + ...input.elasticsearchToDynamoDb, + index: currentIndex, + cursor + } + }); + } + + const result = await this.fetcher.fetch({ + index: currentIndex, + cursor, + limit: 100 + }); + + const syncResult = await this.synchronize.execute({ + done: result.done, + index: currentIndex, + items: result.items, + skipDryRun: input.skipDryRun + }); + keys.push(...syncResult.keys); + + if (!syncResult.done && result.cursor) { + cursor = result.cursor; + continue; + } + cursor = undefined; + + const next = indexes.findIndex(index => index === currentIndex) + 1; + currentIndex = indexes[next]; + } + /** + * If run is a dru run, we will not continue, we will return keys which are going to get deleted. + */ + if (input.skipDryRun === false) { + return this.manager.response.done("Dry run.", { + keys + }); + } + + return this.manager.response.continue({ + ...input, + elasticsearchToDynamoDb: { + finished: true + } + }); + } + + private async fetchAllIndexes(): Promise> { + const result = await this.indexManager.list(); + if (result.length > 0) { + return result as NonEmptyArray; + } + throw new Error("No Elasticsearch / OpenSearch indexes found."); + } +} diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/abstractions/ElasticsearchFetcher.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/abstractions/ElasticsearchFetcher.ts new file mode 100644 index 00000000000..242bec7f4d8 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/abstractions/ElasticsearchFetcher.ts @@ -0,0 +1,25 @@ +import { PrimitiveValue } from "@webiny/api-elasticsearch/types"; + +export interface IElasticsearchFetcherFetchResponseItem { + PK: string; + SK: string; + _id: string; + index: string; +} + +export interface IElasticsearchFetcherFetchParams { + index: string; + cursor?: PrimitiveValue[]; + limit: number; +} + +export interface IElasticsearchFetcherFetchResponse { + done: boolean; + totalCount: number; + cursor?: PrimitiveValue[]; + items: IElasticsearchFetcherFetchResponseItem[]; +} + +export interface IElasticsearchFetcher { + fetch(params: IElasticsearchFetcherFetchParams): Promise; +} diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/abstractions/ElasticsearchSynchronize.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/abstractions/ElasticsearchSynchronize.ts new file mode 100644 index 00000000000..d136441d4b7 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/abstractions/ElasticsearchSynchronize.ts @@ -0,0 +1,23 @@ +export interface IElasticsearchSynchronizeExecuteParamsItem { + PK: string; + SK: string; + _id: string; + index: string; +} + +export interface IElasticsearchSynchronizeExecuteParams { + done: boolean; + index: string; + items: IElasticsearchSynchronizeExecuteParamsItem[]; + skipDryRun: boolean; +} + +export interface IElasticsearchSynchronizeExecuteResponse { + done: boolean; + keys: string[]; +} +export interface IElasticsearchSynchronize { + execute( + params: IElasticsearchSynchronizeExecuteParams + ): Promise; +} diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/shouldIgnoreEsResponseError.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/shouldIgnoreEsResponseError.ts new file mode 100644 index 00000000000..b4d76ba15ad --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/elasticsearch/shouldIgnoreEsResponseError.ts @@ -0,0 +1,11 @@ +import WebinyError from "@webiny/error"; + +const IGNORED_ES_SEARCH_EXCEPTIONS = [ + "index_not_found_exception", + "search_phase_execution_exception", + "illegal_argument_exception" +]; + +export const shouldIgnoreEsResponseError = (error: WebinyError) => { + return IGNORED_ES_SEARCH_EXCEPTIONS.includes(error.message); +}; diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/getElasticsearchEntity.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/getElasticsearchEntity.ts new file mode 100644 index 00000000000..cc20f4061ef --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/getElasticsearchEntity.ts @@ -0,0 +1,52 @@ +import { Entity } from "@webiny/db-dynamodb/toolbox"; +import { NonEmptyArray } from "@webiny/api/types"; +import { IRegistryItem } from "@webiny/db"; +import { EntityType } from "./getElasticsearchEntityType"; +import { Context } from "~/types"; + +export interface IGetElasticsearchEntityParams { + type: EntityType | unknown; + context: Pick; +} + +const createPredicate = (app: string, tags: NonEmptyArray) => { + return (item: IRegistryItem) => { + return item.app === app && tags.every(tag => item.tags.includes(tag)); + }; +}; + +export const getElasticsearchEntity = (params: IGetElasticsearchEntityParams) => { + const { type, context } = params; + + const getByPredicate = (predicate: (item: IRegistryItem) => boolean) => { + return context.db.registry.getOneItem(predicate); + }; + + try { + switch (type) { + case EntityType.CMS: + return getByPredicate(createPredicate("cms", ["es"])); + case EntityType.PAGE_BUILDER: + return getByPredicate(createPredicate("pb", ["es"])); + case EntityType.FORM_BUILDER: + return getByPredicate(createPredicate("fb", ["es"])); + case EntityType.FORM_BUILDER_SUBMISSION: + return getByPredicate(createPredicate("fb", ["es", "form-submission"])); + } + } catch (ex) {} + throw new Error(`Unknown entity type "${type}".`); +}; + +export interface IListElasticsearchEntitiesParams { + context: Pick; +} + +export const listElasticsearchEntities = ( + params: IListElasticsearchEntitiesParams +): IRegistryItem[] => { + const { context } = params; + + return context.db.registry.getItems(item => { + return item.tags.includes("es"); + }); +}; diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/getElasticsearchEntityType.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/getElasticsearchEntityType.ts new file mode 100644 index 00000000000..5501193cf75 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/getElasticsearchEntityType.ts @@ -0,0 +1,27 @@ +export enum EntityType { + CMS = "headless-cms", + PAGE_BUILDER = "page-builder", + FORM_BUILDER = "form-builder", + FORM_BUILDER_SUBMISSION = "form-builder-submission" +} + +export interface IGetElasticsearchEntityTypeParams { + SK: string; + index: string; +} + +export const getElasticsearchEntityType = ( + params: IGetElasticsearchEntityTypeParams +): EntityType => { + if (params.index.includes("-headless-cms-")) { + return EntityType.CMS; + } else if (params.index.endsWith("-page-builder")) { + return EntityType.PAGE_BUILDER; + } else if (params.index.endsWith("-form-builder")) { + if (params.SK.startsWith("FS#")) { + return EntityType.FORM_BUILDER_SUBMISSION; + } + return EntityType.FORM_BUILDER; + } + throw new Error(`Unknown entity type for item "${JSON.stringify(params)}".`); +}; diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/getTable.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/getTable.ts new file mode 100644 index 00000000000..bb0e5720b9b --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/getTable.ts @@ -0,0 +1,30 @@ +import { Entity, TableDef } from "@webiny/db-dynamodb/toolbox"; +import { Context } from "~/types"; +import { NonEmptyArray } from "@webiny/api/types"; +import { IRegistryItem } from "@webiny/db"; + +export interface IGetTableParams { + context: Pick; + type: "regular" | "es"; +} + +const createPredicate = (app: string, tags: NonEmptyArray) => { + return (item: IRegistryItem) => { + return item.app === app && tags.every(tag => item.tags.includes(tag)); + }; +}; + +export const getTable = (params: IGetTableParams): TableDef => { + const { context, type } = params; + + const getByPredicate = (predicate: (item: IRegistryItem) => boolean) => { + const item = context.db.registry.getOneItem(predicate); + return item.item; + }; + + const entity = getByPredicate(createPredicate("cms", [type])); + if (!entity) { + throw new Error(`Unknown entity type "${type}".`); + } + return entity.table as TableDef; +}; diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/index.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/index.ts new file mode 100644 index 00000000000..938b45e64ab --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/entities/index.ts @@ -0,0 +1,3 @@ +export * from "./getElasticsearchEntity"; +export * from "./getElasticsearchEntityType"; +export * from "./getTable"; diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/index.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/index.ts new file mode 100644 index 00000000000..3e0ae05137b --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/index.ts @@ -0,0 +1,81 @@ +import { createTaskDefinition } from "@webiny/tasks"; +import { Context, IElasticsearchTaskConfig } from "~/types"; +import { + IDataSynchronizationInput, + IDataSynchronizationManager, + IDataSynchronizationOutput +} from "~/tasks/dataSynchronization/types"; + +export const DATA_SYNCHRONIZATION_TASK = "dataSynchronization"; + +export const createDataSynchronization = (params?: IElasticsearchTaskConfig) => { + return createTaskDefinition({ + id: DATA_SYNCHRONIZATION_TASK, + isPrivate: false, + title: "Data Synchronization", + description: "Synchronize data between Elasticsearch and DynamoDB", + maxIterations: 100, + disableDatabaseLogs: true, + async run({ context, response, isCloseToTimeout, isAborted, store, input, timer }) { + const { Manager } = await import( + /* webpackChunkName: "Manager" */ + "../Manager" + ); + + const { IndexManager } = await import( + /* webpackChunkName: "IndexManager" */ "~/settings" + ); + + const manager = new Manager({ + elasticsearchClient: params?.elasticsearchClient, + documentClient: params?.documentClient, + response, + context, + isAborted, + isCloseToTimeout, + store, + timer + }); + + const indexManager = new IndexManager(manager.elasticsearch, {}); + + const { DataSynchronizationTaskRunner } = await import( + /* webpackChunkName: "DataSynchronizationTaskRunner" */ "./DataSynchronizationTaskRunner" + ); + + const { createFactories } = await import( + /* webpackChunkName: "createFactories" */ "./createFactories" + ); + + try { + const dataSynchronization = new DataSynchronizationTaskRunner({ + manager: manager as unknown as IDataSynchronizationManager, + indexManager, + factories: createFactories() + }); + return await dataSynchronization.run({ + ...input, + skipDryRun: !!input.skipDryRun + }); + } catch (ex) { + return response.error(ex); + } + }, + createInputValidation({ validator }) { + return { + flow: validator.enum(["elasticsearchToDynamoDb"]), + skipDryRun: validator.boolean().optional().default(false), + elasticsearchToDynamoDb: validator + .object({ + finished: validator.boolean().optional().default(false), + index: validator.string().optional(), + cursor: validator.array(validator.string()).optional() + }) + .optional() + .default({ + finished: false + }) + }; + } + }); +}; diff --git a/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/types.ts b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/types.ts new file mode 100644 index 00000000000..36b621148d9 --- /dev/null +++ b/packages/api-elasticsearch-tasks/src/tasks/dataSynchronization/types.ts @@ -0,0 +1,65 @@ +import { IManager } from "~/types"; +import { PrimitiveValue } from "@webiny/api-elasticsearch/types"; +import { IIndexManager } from "~/settings/types"; +import { + ITaskResponseAbortedResult, + ITaskResponseContinueResult, + ITaskResponseDoneResult, + ITaskResponseDoneResultOutput, + ITaskResponseErrorResult +} from "@webiny/tasks"; +import { IElasticsearchSynchronize } from "~/tasks/dataSynchronization/elasticsearch/abstractions/ElasticsearchSynchronize"; +import { IElasticsearchFetcher } from "~/tasks/dataSynchronization/elasticsearch/abstractions/ElasticsearchFetcher"; + +export interface IDataSynchronizationInputValue { + finished?: boolean; +} + +export interface IDataSynchronizationInputElasticsearchToDynamoDbValue + extends IDataSynchronizationInputValue { + index?: string; + cursor?: PrimitiveValue[]; +} + +export interface IDataSynchronizationInput { + flow: "elasticsearchToDynamoDb"; + skipDryRun: boolean; + elasticsearchToDynamoDb?: IDataSynchronizationInputElasticsearchToDynamoDbValue; +} + +export interface IDataSynchronizationOutput extends ITaskResponseDoneResultOutput { + keys?: string[]; +} + +export type ISynchronizationRunResult = + | ITaskResponseContinueResult + | ITaskResponseDoneResult + | ITaskResponseErrorResult + | ITaskResponseAbortedResult; + +export interface ISynchronization { + run(input: IDataSynchronizationInput): Promise; +} + +export interface IElasticsearchSyncParams { + manager: IDataSynchronizationManager; + indexManager: IIndexManager; + synchronize: IElasticsearchSynchronize; + fetcher: IElasticsearchFetcher; +} + +export interface IElasticsearchSyncFactory { + (params: IElasticsearchSyncParams): ISynchronization; +} + +export interface IFactories { + /** + * Delete all the records which are in the Elasticsearch but not in the Elasticsearch DynamoDB table. + */ + elasticsearchToDynamoDb: IElasticsearchSyncFactory; +} + +export type IDataSynchronizationManager = IManager< + IDataSynchronizationInput, + IDataSynchronizationOutput +>; diff --git a/packages/api-elasticsearch-tasks/src/tasks/enableIndexing/EnableIndexingTaskRunner.ts b/packages/api-elasticsearch-tasks/src/tasks/enableIndexing/EnableIndexingTaskRunner.ts index 938477cbbbf..3dca288e2d4 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/enableIndexing/EnableIndexingTaskRunner.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/enableIndexing/EnableIndexingTaskRunner.ts @@ -2,13 +2,17 @@ import { IManager } from "~/types"; import { ITaskResponse, ITaskResponseResult } from "@webiny/tasks/response/abstractions"; import { IndexManager } from "~/settings"; import { IIndexManager } from "~/settings/types"; +import { IElasticsearchEnableIndexingTaskInput } from "~/tasks/enableIndexing/types"; export class EnableIndexingTaskRunner { - private readonly manager: IManager; + private readonly manager: IManager; private readonly indexManager: IIndexManager; private readonly response: ITaskResponse; - public constructor(manager: IManager, indexManager: IndexManager) { + public constructor( + manager: IManager, + indexManager: IndexManager + ) { this.manager = manager; this.response = manager.response; this.indexManager = indexManager; diff --git a/packages/api-elasticsearch-tasks/src/tasks/enableIndexing/index.ts b/packages/api-elasticsearch-tasks/src/tasks/enableIndexing/index.ts index 6c64ca3c10e..a310ad80db0 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/enableIndexing/index.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/enableIndexing/index.ts @@ -6,27 +6,28 @@ export const createEnableIndexingTask = (params?: IElasticsearchTaskConfig) => { return createTaskDefinition({ id: "elasticsearchEnableIndexing", title: "Enable Indexing on Elasticsearch Indexes", - run: async ({ response, context, isAborted, isCloseToTimeout, input, store }) => { + run: async ({ response, context, isAborted, isCloseToTimeout, input, store, timer }) => { const { Manager } = await import( - /* webpackChunkName: "ElasticsearchTaskManager" */ + /* webpackChunkName: "Manager" */ "../Manager" ); const { IndexManager } = await import( - /* webpackChunkName: "ElasticsearchTaskSettings" */ "~/settings" + /* webpackChunkName: "IndexManager" */ "~/settings" ); const { EnableIndexingTaskRunner } = await import( - /* webpackChunkName: "ElasticsearchEnableIndexingTaskRunner" */ "./EnableIndexingTaskRunner" + /* webpackChunkName: "EnableIndexingTaskRunner" */ "./EnableIndexingTaskRunner" ); - const manager = new Manager({ + const manager = new Manager({ elasticsearchClient: params?.elasticsearchClient, documentClient: params?.documentClient, response, context, isAborted, isCloseToTimeout, - store + store, + timer }); const indexManager = new IndexManager( diff --git a/packages/api-elasticsearch-tasks/src/tasks/index.ts b/packages/api-elasticsearch-tasks/src/tasks/index.ts index 52680e600ca..61972c2d768 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/index.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/index.ts @@ -1,3 +1,4 @@ export * from "./enableIndexing"; +export * from "./dataSynchronization"; export * from "./reindexing"; export * from "./createIndexes"; diff --git a/packages/api-elasticsearch-tasks/src/tasks/reindexing/ReindexingTaskRunner.ts b/packages/api-elasticsearch-tasks/src/tasks/reindexing/ReindexingTaskRunner.ts index 85f974b667f..908d8d911b7 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/reindexing/ReindexingTaskRunner.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/reindexing/ReindexingTaskRunner.ts @@ -1,5 +1,6 @@ import { IDynamoDbElasticsearchRecord, + IElasticsearchIndexingTaskValues, IElasticsearchIndexingTaskValuesKeys, IManager } from "~/types"; @@ -20,13 +21,16 @@ const getKeys = (results: ScanResponse): IElasticsearchIndexingTaskValuesKeys | }; export class ReindexingTaskRunner { - private readonly manager: IManager; + private readonly manager: IManager; private keys?: IElasticsearchIndexingTaskValuesKeys; private readonly indexManager: IIndexManager; private readonly response: ITaskResponse; - public constructor(manager: IManager, indexManager: IndexManager) { + public constructor( + manager: IManager, + indexManager: IndexManager + ) { this.manager = manager; this.response = manager.response; this.indexManager = indexManager; diff --git a/packages/api-elasticsearch-tasks/src/tasks/reindexing/reindexingTaskDefinition.ts b/packages/api-elasticsearch-tasks/src/tasks/reindexing/reindexingTaskDefinition.ts index 6194a69018d..851276356e2 100644 --- a/packages/api-elasticsearch-tasks/src/tasks/reindexing/reindexingTaskDefinition.ts +++ b/packages/api-elasticsearch-tasks/src/tasks/reindexing/reindexingTaskDefinition.ts @@ -5,26 +5,27 @@ export const createElasticsearchReindexingTask = (params?: IElasticsearchTaskCon return createTaskDefinition({ id: "elasticsearchReindexing", title: "Elasticsearch reindexing", - run: async ({ context, isCloseToTimeout, response, input, isAborted, store }) => { + run: async ({ context, isCloseToTimeout, response, input, isAborted, store, timer }) => { const { Manager } = await import( - /* webpackChunkName: "ElasticsearchReindexingManager" */ + /* webpackChunkName: "Manager" */ "../Manager" ); const { IndexManager } = await import( - /* webpackChunkName: "ElasticsearchReindexingSettings" */ "~/settings" + /* webpackChunkName: "IndexManager" */ "~/settings" ); const { ReindexingTaskRunner } = await import( - /* webpackChunkName: "ElasticsearchReindexingTaskRunner" */ "./ReindexingTaskRunner" + /* webpackChunkName: "ReindexingTaskRunner" */ "./ReindexingTaskRunner" ); - const manager = new Manager({ + const manager = new Manager({ elasticsearchClient: params?.elasticsearchClient, documentClient: params?.documentClient, response, context, isAborted, isCloseToTimeout, - store + store, + timer }); const indexManager = new IndexManager(manager.elasticsearch, input.settings || {}); diff --git a/packages/api-elasticsearch-tasks/src/types.ts b/packages/api-elasticsearch-tasks/src/types.ts index 2b832cc6621..b2d5b276551 100644 --- a/packages/api-elasticsearch-tasks/src/types.ts +++ b/packages/api-elasticsearch-tasks/src/types.ts @@ -1,12 +1,17 @@ import { ElasticsearchContext } from "@webiny/api-elasticsearch/types"; import { Entity } from "@webiny/db-dynamodb/toolbox"; -import { Context as TasksContext } from "@webiny/tasks/types"; +import { + Context as TasksContext, + IIsCloseToTimeoutCallable, + ITaskResponseDoneResultOutput +} from "@webiny/tasks/types"; import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb"; import { Client } from "@webiny/api-elasticsearch"; import { createTable } from "~/definitions"; import { ITaskResponse } from "@webiny/tasks/response/abstractions"; import { ITaskManagerStore } from "@webiny/tasks/runner/abstractions"; import { BatchWriteItem, BatchWriteResult } from "@webiny/db-dynamodb"; +import { ITimer } from "@webiny/handler-aws"; export interface Context extends ElasticsearchContext, TasksContext {} @@ -51,15 +56,19 @@ export interface IDynamoDbElasticsearchRecord { modified: string; } -export interface IManager { +export interface IManager< + T, + O extends ITaskResponseDoneResultOutput = ITaskResponseDoneResultOutput +> { readonly documentClient: DynamoDBDocument; readonly elasticsearch: Client; readonly context: Context; readonly table: ReturnType; - readonly isCloseToTimeout: () => boolean; + readonly isCloseToTimeout: IIsCloseToTimeoutCallable; readonly isAborted: () => boolean; - readonly response: ITaskResponse; - readonly store: ITaskManagerStore; + readonly response: ITaskResponse; + readonly store: ITaskManagerStore; + readonly timer: ITimer; getEntity: (name: string) => Entity; diff --git a/packages/api-elasticsearch-tasks/tsconfig.build.json b/packages/api-elasticsearch-tasks/tsconfig.build.json index 014a0b3fcdf..68bbfbb2671 100644 --- a/packages/api-elasticsearch-tasks/tsconfig.build.json +++ b/packages/api-elasticsearch-tasks/tsconfig.build.json @@ -3,8 +3,10 @@ "include": ["src"], "references": [ { "path": "../api/tsconfig.build.json" }, + { "path": "../api-dynamodb-to-elasticsearch/tsconfig.build.json" }, { "path": "../api-elasticsearch/tsconfig.build.json" }, { "path": "../aws-sdk/tsconfig.build.json" }, + { "path": "../db/tsconfig.build.json" }, { "path": "../db-dynamodb/tsconfig.build.json" }, { "path": "../error/tsconfig.build.json" }, { "path": "../tasks/tsconfig.build.json" }, diff --git a/packages/api-elasticsearch-tasks/tsconfig.json b/packages/api-elasticsearch-tasks/tsconfig.json index f2032353ca0..d80ecd6f23f 100644 --- a/packages/api-elasticsearch-tasks/tsconfig.json +++ b/packages/api-elasticsearch-tasks/tsconfig.json @@ -3,8 +3,10 @@ "include": ["src", "__tests__"], "references": [ { "path": "../api" }, + { "path": "../api-dynamodb-to-elasticsearch" }, { "path": "../api-elasticsearch" }, { "path": "../aws-sdk" }, + { "path": "../db" }, { "path": "../db-dynamodb" }, { "path": "../error" }, { "path": "../tasks" }, @@ -29,10 +31,14 @@ "~tests/*": ["./__tests__/*"], "@webiny/api/*": ["../api/src/*"], "@webiny/api": ["../api/src"], + "@webiny/api-dynamodb-to-elasticsearch/*": ["../api-dynamodb-to-elasticsearch/src/*"], + "@webiny/api-dynamodb-to-elasticsearch": ["../api-dynamodb-to-elasticsearch/src"], "@webiny/api-elasticsearch/*": ["../api-elasticsearch/src/*"], "@webiny/api-elasticsearch": ["../api-elasticsearch/src"], "@webiny/aws-sdk/*": ["../aws-sdk/src/*"], "@webiny/aws-sdk": ["../aws-sdk/src"], + "@webiny/db/*": ["../db/src/*"], + "@webiny/db": ["../db/src"], "@webiny/db-dynamodb/*": ["../db-dynamodb/src/*"], "@webiny/db-dynamodb": ["../db-dynamodb/src"], "@webiny/error/*": ["../error/src/*"], diff --git a/packages/api-elasticsearch/src/cursors.ts b/packages/api-elasticsearch/src/cursors.ts index d6c116d1fd4..a6984671259 100644 --- a/packages/api-elasticsearch/src/cursors.ts +++ b/packages/api-elasticsearch/src/cursors.ts @@ -3,12 +3,16 @@ import { PrimitiveValue } from "~/types"; /** * Encode a received cursor value into something that can be passed on to the user. */ -export const encodeCursor = (cursor?: string | string[] | null): string | undefined => { - if (!cursor) { +export const encodeCursor = (input?: PrimitiveValue[]): string | undefined => { + if (!input) { return undefined; } - cursor = Array.isArray(cursor) ? cursor.map(encodeURIComponent) : encodeURIComponent(cursor); + const cursor = Array.isArray(input) + ? input + .filter((item: PrimitiveValue): item is string | number | boolean => item !== null) + .map(item => encodeURIComponent(item)) + : encodeURIComponent(input); try { return Buffer.from(JSON.stringify(cursor)).toString("base64"); @@ -28,7 +32,7 @@ export const decodeCursor = (cursor?: string | null): PrimitiveValue[] | undefin try { const value = JSON.parse(Buffer.from(cursor, "base64").toString("ascii")); if (Array.isArray(value)) { - return value.map(decodeURIComponent); + return value.filter(item => item !== null).map(decodeURIComponent); } const decoded = decodeURIComponent(value); return decoded ? [decoded] : undefined; diff --git a/packages/api-elasticsearch/src/types.ts b/packages/api-elasticsearch/src/types.ts index 8b0ac7c5dde..a57bae522ea 100644 --- a/packages/api-elasticsearch/src/types.ts +++ b/packages/api-elasticsearch/src/types.ts @@ -1,6 +1,6 @@ -import { Client, ApiResponse } from "@elastic/elasticsearch"; -import { BoolQueryConfig as esBoolQueryConfig, Query as esQuery } from "elastic-ts"; -import { Context } from "@webiny/api/types"; +import { ApiResponse, Client } from "@elastic/elasticsearch"; +import { BoolQueryConfig, PrimitiveValue, Query as esQuery } from "elastic-ts"; +import { Context, GenericRecord } from "@webiny/api/types"; /** * Re-export some dep lib types. */ @@ -15,7 +15,7 @@ export interface ElasticsearchContext extends Context { * To simplify our plugins, we say that query contains arrays of objects, not single objects. * And that they all are defined as empty arrays at the start. */ -export interface ElasticsearchBoolQueryConfig extends esBoolQueryConfig { +export interface ElasticsearchBoolQueryConfig extends BoolQueryConfig { must: esQuery[]; filter: esQuery[]; should: esQuery[]; @@ -77,29 +77,40 @@ export interface ElasticsearchQueryBuilderArgsPlugin { * Elasticsearch responses. */ export interface ElasticsearchSearchResponseHit { + _index: string; + _type: string; + _id: string; + _score: number | null; _source: T; - sort: string; + sort: PrimitiveValue[]; } export interface ElasticsearchSearchResponseAggregationBucket { key: T; doc_count: number; } -export interface ElasticsearchSearchResponse { - body: { - hits: { - hits: ElasticsearchSearchResponseHit[]; - total: { - value: number; - }; - }; - aggregations: { - [key: string]: { - buckets: ElasticsearchSearchResponseAggregationBucket[]; - }; - }; + +export interface ElasticsearchSearchResponseBodyHits { + hits: ElasticsearchSearchResponseHit[]; + total: { + value: number; + }; +} + +export interface ElasticsearchSearchResponseBodyAggregations { + [key: string]: { + buckets: ElasticsearchSearchResponseAggregationBucket[]; }; } +export interface ElasticsearchSearchResponseBody { + hits: ElasticsearchSearchResponseBodyHits; + aggregations: ElasticsearchSearchResponseBodyAggregations; +} + +export interface ElasticsearchSearchResponse { + body: ElasticsearchSearchResponseBody; +} + export interface ElasticsearchIndexRequestBodyMappingsDynamicTemplate { [key: string]: { path_match?: string; diff --git a/packages/api-form-builder-so-ddb-es/src/index.ts b/packages/api-form-builder-so-ddb-es/src/index.ts index 1968a95b18d..33773290324 100644 --- a/packages/api-form-builder-so-ddb-es/src/index.ts +++ b/packages/api-form-builder-so-ddb-es/src/index.ts @@ -139,6 +139,27 @@ export const createFormBuilderStorageOperations: FormBuilderStorageOperationsFac return { beforeInit: async (context: FormBuilderContext) => { + context.db.registry.register({ + item: entities.form, + app: "fb", + tags: ["regular", "form", entities.form.name] + }); + context.db.registry.register({ + item: entities.esForm, + app: "fb", + tags: ["es", "form", entities.esForm.name] + }); + context.db.registry.register({ + item: entities.submission, + app: "fb", + tags: ["regular", "form-submission", entities.submission.name] + }); + context.db.registry.register({ + item: entities.esSubmission, + app: "fb", + tags: ["es", "form-submission", entities.esSubmission.name] + }); + const types: string[] = [ // Elasticsearch CompressionPlugin.type, diff --git a/packages/api-form-builder-so-ddb-es/src/types.ts b/packages/api-form-builder-so-ddb-es/src/types.ts index 8fbba21e172..8b9a96a8289 100644 --- a/packages/api-form-builder-so-ddb-es/src/types.ts +++ b/packages/api-form-builder-so-ddb-es/src/types.ts @@ -1,26 +1,24 @@ import { + FormBuilder, + FormBuilderContext as BaseFormBuilderContext, + FormBuilderFormStorageOperations as BaseFormBuilderFormStorageOperations, + FormBuilderSettingsStorageOperations as BaseFormBuilderSettingsStorageOperations, FormBuilderStorageOperations as BaseFormBuilderStorageOperations, - FormBuilderSystemStorageOperations as BaseFormBuilderSystemStorageOperations, FormBuilderSubmissionStorageOperations as BaseFormBuilderSubmissionStorageOperations, - FormBuilderSettingsStorageOperations as BaseFormBuilderSettingsStorageOperations, - FormBuilderFormStorageOperations as BaseFormBuilderFormStorageOperations, - FormBuilderContext + FormBuilderSystemStorageOperations as BaseFormBuilderSystemStorageOperations } from "@webiny/api-form-builder/types"; import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb"; -import { Entity, Table } from "@webiny/db-dynamodb/toolbox"; -import { AttributeDefinition } from "@webiny/db-dynamodb/toolbox"; +import { AttributeDefinition, Entity, Table } from "@webiny/db-dynamodb/toolbox"; import { Client } from "@elastic/elasticsearch"; import { PluginCollection } from "@webiny/plugins/types"; -export { FormBuilderContext }; - export type Attributes = Record; export enum ENTITIES { FORM = "FormBuilderForm", + ES_SUBMISSION = "FormBuilderSubmissionEs", ES_FORM = "FormBuilderFormEs", SUBMISSION = "FormBuilderSubmission", - ES_SUBMISSION = "FormBuilderSubmissionEs", SYSTEM = "FormBuilderSystem", SETTINGS = "FormBuilderSettings" } @@ -96,3 +94,9 @@ export interface FormBuilderStorageOperations export interface FormBuilderStorageOperationsFactory { (params: FormBuilderStorageOperationsFactoryParams): FormBuilderStorageOperations; } + +export interface FormBuilderContext extends BaseFormBuilderContext { + formBuilder: FormBuilder & { + storageOperations: FormBuilderStorageOperations; + }; +} diff --git a/packages/api-headless-cms-ddb-es/src/index.ts b/packages/api-headless-cms-ddb-es/src/index.ts index b4bf759c8c1..7c0dbad00e8 100644 --- a/packages/api-headless-cms-ddb-es/src/index.ts +++ b/packages/api-headless-cms-ddb-es/src/index.ts @@ -124,6 +124,16 @@ export const createStorageOperations: StorageOperationsFactory = params => { return { name: "dynamodb:elasticsearch", beforeInit: async context => { + context.db.registry.register({ + item: entities.entries, + app: "cms", + tags: ["regular", entities.entries.name] + }); + context.db.registry.register({ + item: entities.entriesEs, + app: "cms", + tags: ["es", entities.entriesEs.name] + }); /** * Attach the elasticsearch into context if it is not already attached. */ diff --git a/packages/api-headless-cms-ddb-es/src/types.ts b/packages/api-headless-cms-ddb-es/src/types.ts index 7618bf64df4..e7fae5a605f 100644 --- a/packages/api-headless-cms-ddb-es/src/types.ts +++ b/packages/api-headless-cms-ddb-es/src/types.ts @@ -7,14 +7,14 @@ import { CmsModelField, CmsModelFieldToGraphQLPlugin, CmsModelFieldType, + HeadlessCms, HeadlessCmsStorageOperations as BaseHeadlessCmsStorageOperations } from "@webiny/api-headless-cms/types"; -import { TableConstructor } from "@webiny/db-dynamodb/toolbox"; +import { AttributeDefinition, Entity, Table, TableConstructor } from "@webiny/db-dynamodb/toolbox"; import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb"; -import { AttributeDefinition } from "@webiny/db-dynamodb/toolbox"; import { Client } from "@elastic/elasticsearch"; -import { Entity, Table } from "@webiny/db-dynamodb/toolbox"; import { PluginsContainer } from "@webiny/plugins"; +import { ElasticsearchContext } from "@webiny/api-elasticsearch/types"; /** * A definition of the entry that is being prepared for the Elasticsearch. @@ -167,6 +167,12 @@ export interface StorageOperationsFactoryParams { plugins?: PluginCollection; } +export interface CmsContext extends BaseCmsContext, ElasticsearchContext { + cms: HeadlessCms & { + storageOperations: HeadlessCmsStorageOperations; + }; +} + export interface HeadlessCmsStorageOperations extends BaseHeadlessCmsStorageOperations { getTable: () => Table; getEsTable: () => Table; @@ -180,10 +186,6 @@ export interface StorageOperationsFactory { (params: StorageOperationsFactoryParams): HeadlessCmsStorageOperations; } -export interface CmsContext extends BaseCmsContext { - [key: string]: any; -} - export interface CmsEntryStorageOperations extends BaseCmsEntryStorageOperations { dataLoaders: DataLoadersHandlerInterface; } diff --git a/packages/api-page-builder-so-ddb-es/src/index.ts b/packages/api-page-builder-so-ddb-es/src/index.ts index ff9a54f69f2..73d8f53705d 100644 --- a/packages/api-page-builder-so-ddb-es/src/index.ts +++ b/packages/api-page-builder-so-ddb-es/src/index.ts @@ -208,6 +208,16 @@ export const createStorageOperations: StorageOperationsFactory = params => { return { beforeInit: async (context: PbContext) => { + context.db.registry.register({ + item: entities.pages, + app: "pb", + tags: ["regular", entities.pages.name] + }); + context.db.registry.register({ + item: entities.pagesEs, + app: "pb", + tags: ["es", entities.pagesEs.name] + }); const types: string[] = [ // Elasticsearch CompressionPlugin.type, diff --git a/packages/api-page-builder-so-ddb-es/src/types.ts b/packages/api-page-builder-so-ddb-es/src/types.ts index 6201f33b364..f0721d7cfbd 100644 --- a/packages/api-page-builder-so-ddb-es/src/types.ts +++ b/packages/api-page-builder-so-ddb-es/src/types.ts @@ -2,18 +2,15 @@ import { BlockCategoryStorageOperations as BaseBlockCategoryStorageOperations, CategoryStorageOperations as BaseCategoryStorageOperations, PageBlockStorageOperations as BasePageBlockStorageOperations, + PageBuilderContextObject, PageBuilderStorageOperations as BasePageBuilderStorageOperations, PageTemplateStorageOperations as BasePageTemplateStorageOperations, - PbContext + PbContext as BasePbContext } from "@webiny/api-page-builder/types"; -import { Entity, Table } from "@webiny/db-dynamodb/toolbox"; +import { AttributeDefinition, Entity, Table, TableConstructor } from "@webiny/db-dynamodb/toolbox"; import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb"; import { Client } from "@elastic/elasticsearch"; import { PluginCollection } from "@webiny/plugins/types"; -import { TableConstructor } from "@webiny/db-dynamodb/toolbox"; -import { AttributeDefinition } from "@webiny/db-dynamodb/toolbox"; - -export { PbContext }; export type Attributes = Record; @@ -52,6 +49,12 @@ export interface PageBuilderStorageOperations extends BasePageBuilderStorageOper >; } +export interface PbContext extends BasePbContext { + pageBuilder: PageBuilderContextObject & { + storageOperations: PageBuilderStorageOperations; + }; +} + export interface StorageOperationsFactoryParams { documentClient: DynamoDBDocument; elasticsearch: Client; diff --git a/packages/db-dynamodb/src/DynamoDbDriver.ts b/packages/db-dynamodb/src/DynamoDbDriver.ts index 45b5a13d2a5..7e3ccf28ec9 100644 --- a/packages/db-dynamodb/src/DynamoDbDriver.ts +++ b/packages/db-dynamodb/src/DynamoDbDriver.ts @@ -1,48 +1,19 @@ import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb"; -import { DbDriver, Result } from "@webiny/db"; +import { DbDriver } from "@webiny/db"; interface ConstructorArgs { documentClient: DynamoDBDocument; } -class DynamoDbDriver implements DbDriver { - batchProcesses: Record; - documentClient: DynamoDBDocument; +class DynamoDbDriver implements DbDriver { + public readonly documentClient: DynamoDBDocument; constructor({ documentClient }: ConstructorArgs) { - this.batchProcesses = {}; this.documentClient = documentClient; } getClient() { return this.documentClient; } - async create(): Promise { - return [true, {}]; - } - - async update(): Promise { - return [true, {}]; - } - - async delete(): Promise { - return [true, {}]; - } - - async read(): Promise> { - return [[], {}]; - } - - async createLog(): Promise { - return [true, {}]; - } - - async readLogs() { - return this.read(); - } - - getBatchProcess() { - // not empty - } } export default DynamoDbDriver; diff --git a/packages/db-dynamodb/src/types.ts b/packages/db-dynamodb/src/types.ts index 7462e21df3a..83d721d7b99 100644 --- a/packages/db-dynamodb/src/types.ts +++ b/packages/db-dynamodb/src/types.ts @@ -1,27 +1,3 @@ -export interface OperatorArgs { - expression: string; - attributeNames: Record; - attributeValues: Record; -} - -interface CanProcessArgs { - key: string; - value: any; - args: OperatorArgs; -} - -interface ProcessArgs { - key: string; - value: any; - args: OperatorArgs; - processStatement: any; -} - -export interface Operator { - canProcess: ({ key }: CanProcessArgs) => boolean; - process: ({ key, value, args }: ProcessArgs) => void; -} - /** * We use this definition to search for a value in any given field that was passed. * It works as an "OR" condition. diff --git a/packages/db-dynamodb/src/utils/batchRead.ts b/packages/db-dynamodb/src/utils/batchRead.ts index 1324297674c..74a51e5a220 100644 --- a/packages/db-dynamodb/src/utils/batchRead.ts +++ b/packages/db-dynamodb/src/utils/batchRead.ts @@ -1,6 +1,7 @@ import lodashChunk from "lodash/chunk"; import WebinyError from "@webiny/error"; import { TableDef } from "~/toolbox"; +import { GenericRecord } from "@webiny/api/types"; export interface BatchReadItem { Table?: TableDef; @@ -57,7 +58,7 @@ const batchReadAllChunk = async (params: BatchReadAllChunkParams): Prom * This helper function is meant to be used to batch read from one table. * It will fetch all results, as there is a next() method call built in. */ -export const batchReadAll = async ( +export const batchReadAll = async ( params: BatchReadParams, maxChunk = MAX_BATCH_ITEMS ): Promise => { @@ -75,7 +76,7 @@ export const batchReadAll = async ( const records: T[] = []; - const chunkItemsList: BatchReadItem[][] = lodashChunk(params.items, maxChunk); + const chunkItemsList = lodashChunk(params.items, maxChunk); for (const chunkItems of chunkItemsList) { const results = await batchReadAllChunk({ diff --git a/packages/db/package.json b/packages/db/package.json index 21347f1ea1a..54500711e6c 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -12,6 +12,9 @@ "access": "public", "directory": "dist" }, + "dependencies": { + "@webiny/api": "0.0.0" + }, "devDependencies": { "@babel/cli": "^7.23.9", "@babel/core": "^7.24.0", diff --git a/packages/db/src/DbRegistry.ts b/packages/db/src/DbRegistry.ts new file mode 100644 index 00000000000..3bbbf8fc7e1 --- /dev/null +++ b/packages/db/src/DbRegistry.ts @@ -0,0 +1,49 @@ +import { IRegistry, IRegistryItem, IRegistryRegisterParams } from "./types"; +import { GenericRecord } from "@webiny/api/types"; + +export class DbRegistry implements IRegistry { + private readonly items: GenericRecord = {}; + + public register(input: IRegistryRegisterParams): void { + const key = `${input.app}-${input.tags.sort().join("-")}`; + + if (this.items[key]) { + throw new Error( + `Item with app "${input.app}" and tags "${input.tags.join( + ", " + )}" is already registered.` + ); + } + this.items[key] = input; + } + + public getOneItem(cb: (item: IRegistryItem) => boolean): IRegistryItem { + const item = this.getItem(cb); + if (!item) { + throw new Error("Item not found."); + } + return item; + } + + public getItem(cb: (item: IRegistryItem) => boolean): IRegistryItem | null { + const items = this.getItems(cb); + if (items.length === 0) { + return null; + } else if (items.length > 1) { + throw new Error("More than one item found with the provided criteria."); + } + return items[0]; + } + + public getItems(cb: (item: IRegistryItem) => boolean): IRegistryItem[] { + const results: IRegistryItem[] = []; + for (const key in this.items) { + const item = this.items[key] as IRegistryItem; + if (cb(item)) { + results.push(item); + } + } + + return results; + } +} diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index e06d9da2d2b..01a56af9175 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -1,254 +1,26 @@ -/** - * TODO Remove when moved all packages to standalone storage opts. - */ -interface KeyField { - name: string; -} +import { DbRegistry } from "~/DbRegistry"; -export interface Key { - primary?: boolean; - unique?: boolean; - name: string; - fields: KeyField[]; -} +export * from "./types"; -export interface ArgsBatch { - instance: Batch; - operation: Operation; -} -export interface Args { - __batch?: ArgsBatch; - table?: string; - meta?: boolean; - limit?: number; - sort?: Record; - data?: Record; - query?: Record; - keys?: Key[]; +export interface DbDriver { + getClient(): T; } -export type Result = [T, Record]; - -export interface DbDriver { - create: (args: Args) => Promise>; - read: >(args: Args) => Promise>; - update: (args: Args) => Promise>; - delete: (args: Args) => Promise>; - - // Logging functions. - createLog: (args: { - operation: string; - data: Args; - table: string; - id: string; - }) => Promise>; - readLogs: >(args: { table: string }) => Promise>; -} - -export type OperationType = "create" | "read" | "update" | "delete"; -export type Operation = [OperationType, Args]; - -export type ConstructorArgs = { - driver: DbDriver; +export interface ConstructorArgs { + driver: DbDriver; table?: string; - logTable?: string; -}; - -// Generates a short and sortable ID, e.g. "1607677774994.tfz58m". -const shortId = () => { - const time = new Date().getTime(); - const uniqueId = Math.random().toString(36).slice(-6); - - return `${time}.${uniqueId}`; -}; - -interface LogDataBatch { - id: string; - type: string; -} -interface LogData - extends Pick { - batch: LogDataBatch | null; } -// Picks necessary data from received args, ready to be stored in the log table. -const getCreateLogData = (args: Args): LogData => { - const { table, meta, limit, sort, data, query, keys } = args; +class Db { + public driver: DbDriver; + public readonly table?: string; - return { - table, - meta, - limit, - sort, - data, - query, - keys, - batch: args.__batch - ? { - id: args.__batch.instance.id, - type: args.__batch.instance.type - } - : null - }; -}; + public readonly registry = new DbRegistry(); -class Db { - public driver: DbDriver; - public table: string; - public logTable?: string; - - constructor({ driver, table, logTable }: ConstructorArgs) { - this.driver = driver; - // @ts-expect-error + constructor({ driver, table }: ConstructorArgs) { this.table = table; - this.logTable = logTable; - } - - public async create(args: Args): Promise> { - const createArgs = { ...args, table: args.table || this.table }; - await this.createLog("create", createArgs); - return this.driver.create(createArgs); - } - - public async read>(args: Args): Promise> { - const readArgs = { ...args, table: args.table || this.table }; - await this.createLog("read", readArgs); - return this.driver.read(readArgs); - } - - public async update(args: Args): Promise> { - const updateArgs = { ...args, table: args.table || this.table }; - await this.createLog("update", updateArgs); - return this.driver.update(updateArgs); - } - - public async delete(args: Args): Promise> { - const deleteArgs = { ...args, table: args.table || this.table }; - await this.createLog("delete", deleteArgs); - return this.driver.delete(deleteArgs); - } - - // Logging functions. - public async createLog(operation: string, args: Args): Promise | null> { - if (!this.logTable) { - return null; - } - - const data = getCreateLogData(args); - return this.driver.createLog({ operation, data, table: this.logTable, id: shortId() }); - } - - public async readLogs>(): Promise | null> { - if (!this.logTable) { - return null; - } - - return this.driver.readLogs({ - table: this.logTable - }); - } - - public batch< - T0 = any, - T1 = any, - T2 = any, - T3 = any, - T4 = any, - T5 = any, - T6 = any, - T7 = any, - T8 = any, - T9 = any - >(): Batch { - return new Batch(this); - } -} - -class Batch< - T0 = any, - T1 = any, - T2 = any, - T3 = any, - T4 = any, - T5 = any, - T6 = any, - T7 = any, - T8 = any, - T9 = any -> { - db: Db; - type: "batch" | "transaction"; - id: string; - meta: Record; - operations: Operation[]; - - constructor(db: Db) { - this.db = db; - this.type = "batch"; - this.id = shortId(); - - this.meta = {}; - this.operations = []; - } - - push(...operations: Operation[]) { - for (let i = 0; i < operations.length; i++) { - const item = operations[i]; - this.operations.push(item); - } - return this; - } - - create(...args: Args[]) { - for (let i = 0; i < args.length; i++) { - this.push(["create", args[i]]); - } - return this; - } - - read(...args: Args[]) { - for (let i = 0; i < args.length; i++) { - this.push(["read", args[i]]); - } - return this; - } - - update(...args: Args[]) { - for (let i = 0; i < args.length; i++) { - this.push(["update", args[i]]); - } - return this; - } - - delete(...args: Args[]) { - for (let i = 0; i < args.length; i++) { - this.push(["delete", args[i]]); - } - return this; - } - - async execute(): Promise<[T0?, T1?, T2?, T3?, T4?, T5?, T6?, T7?, T8?, T9?]> { - /** - * TODO: figure out which exact type to use instead of any. - */ - const promises: Promise[] = []; - for (let i = 0; i < this.operations.length; i++) { - const [operation, args] = this.operations[i]; - promises.push( - this.db[operation]({ - ...args, - __batch: { - instance: this, - operation: this.operations[i] - } - }) - ); - } - - const result = Promise.all(promises); - - return result as Promise<[T0?, T1?, T2?, T3?, T4?, T5?, T6?, T7?, T8?, T9?]>; + this.driver = driver; } } -export { Batch, Db }; +export { Db }; diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts new file mode 100644 index 00000000000..eac650c993b --- /dev/null +++ b/packages/db/src/types.ts @@ -0,0 +1,26 @@ +import { NonEmptyArray } from "@webiny/api/types"; + +export interface IRegistryRegisterParams { + item: T; + app: string; + tags: NonEmptyArray; +} + +export interface IRegistryItem { + item: T; + app: string; + tags: NonEmptyArray; +} + +export interface IRegistry { + register(params: IRegistryRegisterParams): void; + /** + * Throws an error if more than one item is found or there is no item found. + */ + getOneItem(cb: (item: IRegistryItem) => boolean): IRegistryItem; + /** + * Throws an error if more than one item is found. + */ + getItem(cb: (item: IRegistryItem) => boolean): IRegistryItem | null; + getItems(cb: (item: IRegistryItem) => boolean): IRegistryItem[]; +} diff --git a/packages/db/tsconfig.build.json b/packages/db/tsconfig.build.json index 5e7843d3c8c..097b4c0b400 100644 --- a/packages/db/tsconfig.build.json +++ b/packages/db/tsconfig.build.json @@ -1,7 +1,7 @@ { "extends": "../../tsconfig.build.json", "include": ["src"], - "references": [], + "references": [{ "path": "../api/tsconfig.build.json" }], "compilerOptions": { "rootDir": "./src", "outDir": "./dist", diff --git a/packages/db/tsconfig.json b/packages/db/tsconfig.json index 6ca26f3c929..306a82a42a7 100644 --- a/packages/db/tsconfig.json +++ b/packages/db/tsconfig.json @@ -1,12 +1,17 @@ { "extends": "../../tsconfig.json", "include": ["src", "__tests__"], - "references": [], + "references": [{ "path": "../api" }], "compilerOptions": { "rootDirs": ["./src", "./__tests__"], "outDir": "./dist", "declarationDir": "./dist", - "paths": { "~/*": ["./src/*"], "~tests/*": ["./__tests__/*"] }, + "paths": { + "~/*": ["./src/*"], + "~tests/*": ["./__tests__/*"], + "@webiny/api/*": ["../api/src/*"], + "@webiny/api": ["../api/src"] + }, "baseUrl": "." } } diff --git a/packages/handler-db/src/index.ts b/packages/handler-db/src/index.ts index af1cbeb82b5..eaafb0aed5f 100644 --- a/packages/handler-db/src/index.ts +++ b/packages/handler-db/src/index.ts @@ -1,17 +1,14 @@ -import { Db } from "@webiny/db"; +import { ConstructorArgs, Db } from "@webiny/db"; import { ContextPlugin } from "@webiny/api"; import { DbContext } from "./types"; -/** - * TODO: remove this package. - */ -export default (args: any) => { +export default (args: ConstructorArgs) => { return [ new ContextPlugin(context => { if (context.db) { return; } - context.db = new Db(args); + context.db = new Db(args); }) ]; }; diff --git a/packages/handler-db/src/types.ts b/packages/handler-db/src/types.ts index 6937b8bd4ca..bfa9b7fb0e6 100644 --- a/packages/handler-db/src/types.ts +++ b/packages/handler-db/src/types.ts @@ -2,5 +2,5 @@ import { Db } from "@webiny/db"; import { Context } from "@webiny/api/types"; export interface DbContext extends Context { - db: Db; + db: Db; } diff --git a/packages/tasks/src/runner/TaskManager.ts b/packages/tasks/src/runner/TaskManager.ts index 0cb6732b7ed..e9ee12c642b 100644 --- a/packages/tasks/src/runner/TaskManager.ts +++ b/packages/tasks/src/runner/TaskManager.ts @@ -16,15 +16,17 @@ import { } from "~/response/abstractions"; import { getErrorProperties } from "~/utils/getErrorProperties"; +type ITaskManagerRunner = Pick; + export class TaskManager implements ITaskManager { - private readonly runner: Pick; + private readonly runner: ITaskManagerRunner; private readonly context: Context; private readonly response: IResponse; private readonly taskResponse: ITaskResponse; private readonly store: ITaskManagerStorePrivate; public constructor( - runner: Pick, + runner: ITaskManagerRunner, context: Context, response: IResponse, taskResponse: ITaskResponse, @@ -134,7 +136,8 @@ export class TaskManager implements ITaskManager { ...params, parent: this.store.getTask() }); - } + }, + timer: this.runner.timer }); }); } catch (ex) { diff --git a/packages/tasks/src/runner/TaskRunner.ts b/packages/tasks/src/runner/TaskRunner.ts index 13db64c6741..f758aecf108 100644 --- a/packages/tasks/src/runner/TaskRunner.ts +++ b/packages/tasks/src/runner/TaskRunner.ts @@ -23,8 +23,8 @@ export class TaskRunner implements ITaskRunner { * Follow the same example for the rest of the properties. */ public readonly context: C; + public readonly timer: ITimer; private readonly validation: ITaskEventValidation; - private readonly timer: ITimer; /** * We take all required variables separately because they will get injected via DI - so less refactoring is required in the future. diff --git a/packages/tasks/src/runner/abstractions/TaskRunner.ts b/packages/tasks/src/runner/abstractions/TaskRunner.ts index da4f8bca9fb..3d399826041 100644 --- a/packages/tasks/src/runner/abstractions/TaskRunner.ts +++ b/packages/tasks/src/runner/abstractions/TaskRunner.ts @@ -1,6 +1,7 @@ import { Context } from "~/types"; import { ITaskEvent } from "~/handler/types"; import { IResponseResult } from "~/response/abstractions"; +import { ITimer } from "@webiny/handler-aws"; export interface IIsCloseToTimeoutCallable { (seconds?: number): boolean; @@ -9,5 +10,6 @@ export interface IIsCloseToTimeoutCallable { export interface ITaskRunner { context: C; isCloseToTimeout: IIsCloseToTimeoutCallable; + timer: ITimer; run(event: ITaskEvent): Promise; } diff --git a/packages/tasks/src/types.ts b/packages/tasks/src/types.ts index e3619ca6d8d..b7447934182 100644 --- a/packages/tasks/src/types.ts +++ b/packages/tasks/src/types.ts @@ -17,6 +17,7 @@ import { IIsCloseToTimeoutCallable, ITaskManagerStore } from "./runner/abstracti import { SecurityPermission } from "@webiny/api-security/types"; import { GenericRecord } from "@webiny/api/types"; import { IStepFunctionServiceFetchResult } from "~/service/StepFunctionServicePlugin"; +import { ITimer } from "@webiny/handler-aws"; import type zod from "zod"; @@ -334,6 +335,7 @@ export interface ITaskRunParams< trigger( params: Omit, "parent"> ): Promise>; + timer: ITimer; } export interface ITaskOnSuccessParams< diff --git a/yarn.lock b/yarn.lock index 2b89b21d867..cfbe168e186 100644 --- a/yarn.lock +++ b/yarn.lock @@ -13857,6 +13857,7 @@ __metadata: "@babel/preset-typescript": ^7.23.3 "@babel/runtime": ^7.24.0 "@webiny/api": 0.0.0 + "@webiny/api-dynamodb-to-elasticsearch": 0.0.0 "@webiny/api-elasticsearch": 0.0.0 "@webiny/api-headless-cms": 0.0.0 "@webiny/api-i18n": 0.0.0 @@ -13865,6 +13866,7 @@ __metadata: "@webiny/api-wcp": 0.0.0 "@webiny/aws-sdk": 0.0.0 "@webiny/cli": 0.0.0 + "@webiny/db": 0.0.0 "@webiny/db-dynamodb": 0.0.0 "@webiny/error": 0.0.0 "@webiny/handler": 0.0.0 @@ -17320,6 +17322,7 @@ __metadata: dependencies: "@babel/cli": ^7.23.9 "@babel/core": ^7.24.0 + "@webiny/api": 0.0.0 "@webiny/cli": 0.0.0 "@webiny/project-utils": 0.0.0 rimraf: ^5.0.5