From f325f8ea4be93f7c5c405ab549fa5e67e192f48c Mon Sep 17 00:00:00 2001 From: Mateusz Baginski Date: Sun, 22 Oct 2023 15:45:48 +0200 Subject: [PATCH] feat(reindex): add basic synchronize method skeleton --- .../catch-task-either-tagged-error-te.ts | 24 +++++ packages/core/src/helpers/fp-ts/index.ts | 2 + .../helpers/fp-ts/tap-task-either-error-te.ts | 21 ++++ .../helpers/fp-ts/try-tagged-error-task.ts | 2 +- .../src/helpers/get-first-obj-key-value.ts | 5 + packages/core/src/helpers/index.ts | 1 + .../src/types/fp-ts/tagged-error.types.ts | 13 ++- .../types/fp-ts/tagged-task-either.types.ts | 2 +- packages/core/src/types/index.ts | 1 + packages/core/src/types/value-object.types.ts | 24 +++++ .../src/abstract/abstract-fp-ts-logger.ts | 42 +++++--- .../client/es-monadic-client.error.ts | 20 ++-- .../src/+internal/client/es-monadic-client.ts | 50 +++++++++- .../src/+internal/client/try-es-task.ts | 4 +- .../create-index-name-with-timestamp.ts | 19 ++++ .../reindex/src/+internal/helpers/index.ts | 1 + packages/reindex/src/+internal/index.ts | 1 + packages/reindex/src/es-index.ts | 99 +++++++++++++++++++ packages/reindex/src/es-indices-registry.ts | 17 ++++ packages/reindex/src/index.ts | 3 +- 20 files changed, 315 insertions(+), 36 deletions(-) create mode 100644 packages/core/src/helpers/fp-ts/catch-task-either-tagged-error-te.ts create mode 100644 packages/core/src/helpers/fp-ts/tap-task-either-error-te.ts create mode 100644 packages/core/src/helpers/get-first-obj-key-value.ts create mode 100644 packages/core/src/types/value-object.types.ts create mode 100644 packages/reindex/src/+internal/helpers/create-index-name-with-timestamp.ts create mode 100644 packages/reindex/src/+internal/helpers/index.ts create mode 100644 packages/reindex/src/es-index.ts create mode 100644 packages/reindex/src/es-indices-registry.ts diff --git a/packages/core/src/helpers/fp-ts/catch-task-either-tagged-error-te.ts b/packages/core/src/helpers/fp-ts/catch-task-either-tagged-error-te.ts new file mode 100644 index 0000000..d0c40b2 --- /dev/null +++ b/packages/core/src/helpers/fp-ts/catch-task-either-tagged-error-te.ts @@ -0,0 +1,24 @@ +import * as TE from 'fp-ts/TaskEither'; +import { pipe } from 'fp-ts/function'; + +import type * as RTE from 'fp-ts/ReaderTaskEither'; +import { isTaggedError, type TaggedError } from '../../types'; + +export const catchTaskEitherTagErrorTE = + ( + tag: T, + catchTask: RTE.ReaderTaskEither, E2, B>, + ) => + ( + task: TE.TaskEither extends never ? never : E, A>, + ) => + pipe( + task, + TE.foldW((error): TE.TaskEither => { + if (isTaggedError(error) && error.tag === tag) { + return catchTask(error); + } + + return TE.left(error); + }, TE.of), + ) as TE.TaskEither; diff --git a/packages/core/src/helpers/fp-ts/index.ts b/packages/core/src/helpers/fp-ts/index.ts index 93af8b0..9ea4207 100644 --- a/packages/core/src/helpers/fp-ts/index.ts +++ b/packages/core/src/helpers/fp-ts/index.ts @@ -1,3 +1,5 @@ +export * from './catch-task-either-tagged-error-te'; +export * from './tap-task-either-error-te'; export * from './tap-task-either-error'; export * from './tap-task-either'; export * from './try-tagged-error-task'; diff --git a/packages/core/src/helpers/fp-ts/tap-task-either-error-te.ts b/packages/core/src/helpers/fp-ts/tap-task-either-error-te.ts new file mode 100644 index 0000000..81e6b0c --- /dev/null +++ b/packages/core/src/helpers/fp-ts/tap-task-either-error-te.ts @@ -0,0 +1,21 @@ +import * as TE from 'fp-ts/TaskEither'; +import { pipe } from 'fp-ts/function'; + +export const tapTaskEitherErrorTE = + (errorFnTE: (error: E) => TE.TaskEither) => + (task: TE.TaskEither): TE.TaskEither => + pipe( + task, + TE.foldW( + error => + pipe( + error, + errorFnTE, + TE.foldW( + rollbackError => TE.left(rollbackError), + () => TE.left(error), + ), + ), + TE.right, + ), + ); diff --git a/packages/core/src/helpers/fp-ts/try-tagged-error-task.ts b/packages/core/src/helpers/fp-ts/try-tagged-error-task.ts index 08c0d6d..96eafb8 100644 --- a/packages/core/src/helpers/fp-ts/try-tagged-error-task.ts +++ b/packages/core/src/helpers/fp-ts/try-tagged-error-task.ts @@ -4,6 +4,6 @@ import type * as T from 'fp-ts/Task'; import type { TaggedError } from '../../types'; export const tryTaggedErrorTask = - (TagClass: new (originalStack?: string) => TC) => + >(TagClass: new (originalStack?: string) => TC) => (task: T.Task): TE.TaskEither => TE.tryCatch(task, (exception: any) => new TagClass(exception.stack)); diff --git a/packages/core/src/helpers/get-first-obj-key-value.ts b/packages/core/src/helpers/get-first-obj-key-value.ts new file mode 100644 index 0000000..577955a --- /dev/null +++ b/packages/core/src/helpers/get-first-obj-key-value.ts @@ -0,0 +1,5 @@ +export const getFirstObjKeyValue = >(obj: O) => { + const [key] = Object.keys(obj); + + return obj[key]; +}; diff --git a/packages/core/src/helpers/index.ts b/packages/core/src/helpers/index.ts index 33efe7a..348d6fc 100644 --- a/packages/core/src/helpers/index.ts +++ b/packages/core/src/helpers/index.ts @@ -1,3 +1,4 @@ export * from './fp-ts'; +export * from './get-first-obj-key-value'; export * from './nop'; export * from './reject-falsy-values'; diff --git a/packages/core/src/types/fp-ts/tagged-error.types.ts b/packages/core/src/types/fp-ts/tagged-error.types.ts index 86c0064..b26ee68 100644 --- a/packages/core/src/types/fp-ts/tagged-error.types.ts +++ b/packages/core/src/types/fp-ts/tagged-error.types.ts @@ -1,11 +1,16 @@ -export abstract class TaggedError extends Error { - abstract readonly tag: string; +export abstract class TaggedError extends Error { + abstract readonly tag: T; constructor(readonly originalStack?: string) { super(); Error.captureStackTrace(this, TaggedError); } + + static ofLiteral = (tag: S) => + class TaggedLiteralError extends TaggedError { + readonly tag = tag; + }; } -export const isTaggedError = (error: Error): error is TaggedError => - 'tag' in error; +export const isTaggedError = (error: any): error is TaggedError => + error && 'tag' in error; diff --git a/packages/core/src/types/fp-ts/tagged-task-either.types.ts b/packages/core/src/types/fp-ts/tagged-task-either.types.ts index 9fdd32c..3b97bd2 100644 --- a/packages/core/src/types/fp-ts/tagged-task-either.types.ts +++ b/packages/core/src/types/fp-ts/tagged-task-either.types.ts @@ -1,4 +1,4 @@ import type { TaskEither } from 'fp-ts/TaskEither'; import type { TaggedError } from './tagged-error.types'; -export type TaggedTaskEither = TaskEither; +export type TaggedTaskEither, A> = TaskEither; diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts index 9e61717..514e682 100644 --- a/packages/core/src/types/index.ts +++ b/packages/core/src/types/index.ts @@ -1 +1,2 @@ export * from './fp-ts'; +export * from './value-object.types'; diff --git a/packages/core/src/types/value-object.types.ts b/packages/core/src/types/value-object.types.ts new file mode 100644 index 0000000..4d7b0f7 --- /dev/null +++ b/packages/core/src/types/value-object.types.ts @@ -0,0 +1,24 @@ +export abstract class ValueObject { + constructor(readonly props: T) {} + + static of: (props: A) => ValueObject; + + unwrap() { + return this.props; + } + + bind(fn: (props: T) => O): O { + return fn(this.props); + } + + extend(props: Partial): this { + return new (this.constructor as any)({ + ...this.props, + ...props, + }); + } + + map(fn: (props: T) => T): this { + return new (this.constructor as any)(this.bind(fn)); + } +} diff --git a/packages/logger/src/abstract/abstract-fp-ts-logger.ts b/packages/logger/src/abstract/abstract-fp-ts-logger.ts index 9854997..8178a6f 100644 --- a/packages/logger/src/abstract/abstract-fp-ts-logger.ts +++ b/packages/logger/src/abstract/abstract-fp-ts-logger.ts @@ -1,14 +1,19 @@ -import { pipe, flow } from 'fp-ts/function'; +import { pipe, flow, identity } from 'fp-ts/function'; import * as TE from 'fp-ts/TaskEither'; import type { AbstractLogger } from './abstract-logger.types'; -import { nop, rejectFalsyItems, tapTaskEither } from '@searchpunch/core'; +import { + nop, + rejectFalsyItems, + tapTaskEither, + tapTaskEitherError, +} from '@searchpunch/core'; type UnsafeErrorMessage = string | Error; type TaskEitherLoggerAttrs = { - onBefore?: () => string; - onRight?: (data: A) => string; + onBefore?: () => UnsafeErrorMessage; + onRight?: (data: A) => UnsafeErrorMessage; onLeft: (error: E) => UnsafeErrorMessage; }; @@ -20,23 +25,32 @@ export class AbstractFpTsLogger { (task: TE.TaskEither): TE.TaskEither => pipe( task, - this.logTaskEither({ - onLeft, + tapTaskEitherError(error => { + this.tryLogErrorWithStack(onLeft(error))(error); }), ); + logBeforeTaskEither = + (onBefore: () => UnsafeErrorMessage) => + (task: TE.TaskEither): TE.TaskEither => + pipe( + TE.fromIO(onBefore ? flow(onBefore, this.logger.info) : nop), + TE.chain(() => task), + ); + + logTaskEitherSuccess = + (onRight: (data: A) => UnsafeErrorMessage) => + (task: TE.TaskEither): TE.TaskEither => + pipe(task, tapTaskEither(flow(onRight, this.logger.info))); + logTaskEither = ({ onBefore, onRight, onLeft }: TaskEitherLoggerAttrs) => (task: TE.TaskEither): TE.TaskEither => pipe( - TE.fromIO(onBefore ?? nop), - TE.chain(() => task), - tapTaskEither( - onRight ? flow(onRight, this.logger.info) : nop, - error => { - this.tryLogErrorWithStack(onLeft(error))(error); - }, - ), + task, + onBefore ? this.logBeforeTaskEither(onBefore) : identity, + onLeft ? this.logTaskEitherError(onLeft) : identity, + onRight ? this.logTaskEitherSuccess(onRight) : identity, ); private readonly tryLogErrorWithStack = diff --git a/packages/reindex/src/+internal/client/es-monadic-client.error.ts b/packages/reindex/src/+internal/client/es-monadic-client.error.ts index 63fe48c..5e5a5a3 100644 --- a/packages/reindex/src/+internal/client/es-monadic-client.error.ts +++ b/packages/reindex/src/+internal/client/es-monadic-client.error.ts @@ -1,20 +1,16 @@ import { TaggedError } from '@searchpunch/core'; -export class EsUnableSetAliasError extends TaggedError { - readonly tag = 'EsUnableSetAliasError'; -} +export class EsUnableSetAliasError extends TaggedError.ofLiteral( + 'EsUnableSetAlias', +) {} -export class EsNotFoundError extends TaggedError { - readonly tag = 'EsNotFoundError'; -} +export class EsNotFoundError extends TaggedError.ofLiteral('EsNotFound') {} -export class EsConnectionRefused extends TaggedError { - readonly tag = 'EsConnectionRefused'; -} +export class EsConnectionRefused extends TaggedError.ofLiteral( + 'EsConnectionRefused', +) {} -export class EsInternalError extends TaggedError { - readonly tag = 'EsInternalError'; -} +export class EsInternalError extends TaggedError.ofLiteral('EsInternal') {} export type EsTaggedError = | EsConnectionRefused diff --git a/packages/reindex/src/+internal/client/es-monadic-client.ts b/packages/reindex/src/+internal/client/es-monadic-client.ts index ac066b2..efc043e 100644 --- a/packages/reindex/src/+internal/client/es-monadic-client.ts +++ b/packages/reindex/src/+internal/client/es-monadic-client.ts @@ -1,6 +1,8 @@ import { pipe } from 'fp-ts/function'; import * as es from '@elastic/elasticsearch'; + import { PinoLogger, type AbstractLogger } from '@searchpunch/logger'; +import { getFirstObjKeyValue } from '@searchpunch/core'; import type { EsDocId } from './es-monadic-client.types'; @@ -38,6 +40,50 @@ export class EsMonadicClient { client: new es.Client(esConnectionOptions), }); + readonly index = { + getMapping: (index: string) => + pipe(async () => { + const response = await this.rawClient.indices.getMapping({ + index, + }); + + return getFirstObjKeyValue(response.body).mappings; + }, tryEsTask(EsNotFoundError)), + + delete: (names: string[]) => + pipe( + async () => + this.rawClient.indices.delete({ + index: names, + }), + tryEsTask(), + this.logger.fp.logTaskEither({ + onBefore: () => `Trying to delete indices: ${names.join(', ')}!`, + onLeft: () => + `Cannot delete indices with names: ${names.join(', ')}!`, + onRight: () => + `Indices with names ${names.join(', ')} has been deleted!`, + }), + ), + + create: (dto: es.estypes.IndicesCreateRequest) => + pipe( + async () => { + await this.rawClient.indices.create(dto); + + return { + index: dto.index, + }; + }, + tryEsTask(), + this.logger.fp.logTaskEither({ + onBefore: () => `Trying to create index with name "${dto.index}"!`, + onLeft: () => `Cannot create index with name "${dto.index}"!`, + onRight: () => `Index with name "${dto.index}" has been created!`, + }), + ), + }; + readonly record = { get: ({ id, index }: AttrWithEsIndex<{ id: EsDocId }>) => pipe( @@ -102,9 +148,9 @@ export class EsMonadicClient { this.rawClient.indices.existsAlias({ name, }), - tryEsTask(), + tryEsTask(EsNotFoundError), this.logger.fp.logTaskEitherError( - () => `Cannot check if "${name}" index exists!"`, + () => `Cannot check if "${name}" index exists!`, ), ), }; diff --git a/packages/reindex/src/+internal/client/try-es-task.ts b/packages/reindex/src/+internal/client/try-es-task.ts index ca33092..5d839c1 100644 --- a/packages/reindex/src/+internal/client/try-es-task.ts +++ b/packages/reindex/src/+internal/client/try-es-task.ts @@ -9,7 +9,9 @@ import { } from './es-monadic-client.error'; export const tryEsTask = - (TagClass?: new (originalStack?: string) => TC) => + >( + TagClass?: new (originalStack?: string) => TC, + ) => ( task: T.Task, ): TE.TaskEither => diff --git a/packages/reindex/src/+internal/helpers/create-index-name-with-timestamp.ts b/packages/reindex/src/+internal/helpers/create-index-name-with-timestamp.ts new file mode 100644 index 0000000..d32ffcd --- /dev/null +++ b/packages/reindex/src/+internal/helpers/create-index-name-with-timestamp.ts @@ -0,0 +1,19 @@ +const genTimestampSuffix = () => { + const d = new Date(); + const time = [ + d.getMonth(), + d.getDay(), + d.getHours(), + d.getMinutes(), + d.getSeconds(), + ].map(num => num.toString().padStart(2, '0')); + + return [ + d.getFullYear(), + ...time, + d.getMilliseconds().toString().padStart(3, '0'), + ].join(''); +}; + +export const createIndexNameWithTimestamp = (prefix: string) => + `${prefix}-${genTimestampSuffix()}`; diff --git a/packages/reindex/src/+internal/helpers/index.ts b/packages/reindex/src/+internal/helpers/index.ts new file mode 100644 index 0000000..f5a11dd --- /dev/null +++ b/packages/reindex/src/+internal/helpers/index.ts @@ -0,0 +1 @@ +export * from './create-index-name-with-timestamp'; diff --git a/packages/reindex/src/+internal/index.ts b/packages/reindex/src/+internal/index.ts index 4f1cce4..9a15e6f 100644 --- a/packages/reindex/src/+internal/index.ts +++ b/packages/reindex/src/+internal/index.ts @@ -1 +1,2 @@ export * from './client'; +export * from './helpers'; diff --git a/packages/reindex/src/es-index.ts b/packages/reindex/src/es-index.ts new file mode 100644 index 0000000..8e41c21 --- /dev/null +++ b/packages/reindex/src/es-index.ts @@ -0,0 +1,99 @@ +import { pipe, flow } from 'fp-ts/function'; +import * as TE from 'fp-ts/TaskEither'; + +import type * as RTE from 'fp-ts/ReaderTaskEither'; +import type * as es from '@elastic/elasticsearch'; + +import { PinoLogger, type AbstractLogger } from '@searchpunch/logger'; +import { + createIndexNameWithTimestamp, + type EsMonadicClient, +} from './+internal'; + +import { + ValueObject, + catchTaskEitherTagErrorTE, + tapTaskEitherErrorTE, +} from '@searchpunch/core'; + +type EsIndexSchema = Omit; + +type EsIndexProps = { + indexName: string; + schema: EsIndexSchema; + client: EsMonadicClient; + logger?: AbstractLogger; +}; + +export class EsIndex extends ValueObject { + get indexName() { + return this.props.indexName; + } + + get client() { + return this.props.client; + } + + get schema() { + return this.props.schema; + } + + get logger() { + return this.props.logger ?? new PinoLogger('EsIndex'); + } + + synchronize = () => + flow( + this.ensureAliasExists, + this.logger.fp.logTaskEither({ + onBefore: () => `Trying to synchronize "${this.indexName}" index...`, + onRight: () => `"${this.indexName}" index has been synchronized!`, + onLeft: () => `Unable to synchronize "${this.indexName}" index!`, + }), + ); + + private readonly ensureAliasExists = () => { + const createWithAlias = this.executeOnTmpIndexAndSwitchTE(() => + TE.of(undefined), + ); + + return pipe( + this.client.alias.existsOrFail(this.props.indexName), + catchTaskEitherTagErrorTE('EsNotFound', () => createWithAlias), + ); + }; + + private readonly executeOnTmpIndexAndSwitchTE = ( + taskReader: RTE.ReaderTaskEither, + ) => { + const { alias, index } = this.client; + + return pipe( + this.createTmpIndex(), + TE.chain(tmpIndex => + pipe( + TE.Do, + TE.bind('prevIndices', () => + alias.getAllIndicesByAlias(this.indexName), + ), + TE.bindW('taskResult', () => taskReader(tmpIndex.index)), + tapTaskEitherErrorTE(() => + alias.put({ + aliasName: this.indexName, + destinationIndex: tmpIndex.index, + }), + ), + tapTaskEitherErrorTE(() => index.delete([tmpIndex.index])), + TE.chainFirstW(({ prevIndices }) => index.delete(prevIndices)), + TE.map(({ taskResult }) => taskResult), + ), + ), + ); + }; + + private readonly createTmpIndex = () => + this.client.index.create({ + ...this.schema, + index: createIndexNameWithTimestamp(this.indexName), + }); +} diff --git a/packages/reindex/src/es-indices-registry.ts b/packages/reindex/src/es-indices-registry.ts new file mode 100644 index 0000000..3a46f66 --- /dev/null +++ b/packages/reindex/src/es-indices-registry.ts @@ -0,0 +1,17 @@ +import { ValueObject } from '@searchpunch/core'; +import type { EsIndex } from './es-index'; + +type EsIndicesRegistryProps = { + indices: EsIndex[]; +}; + +export class EsIndicesRegistry extends ValueObject { + get indices() { + return this.props.indices; + } + + register = (index: EsIndex): EsIndicesRegistry => + this.extend({ + indices: [...this.indices, index], + }); +} diff --git a/packages/reindex/src/index.ts b/packages/reindex/src/index.ts index 8038a07..23dd27c 100644 --- a/packages/reindex/src/index.ts +++ b/packages/reindex/src/index.ts @@ -1 +1,2 @@ -export * from './+internal'; +export * from './es-index'; +export * from './es-indices-registry';