Skip to content

Commit

Permalink
feat(reindex): add basic synchronize method skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
Mati365 committed Oct 22, 2023
1 parent e40b478 commit f325f8e
Show file tree
Hide file tree
Showing 20 changed files with 315 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import * as TE from 'fp-ts/TaskEither';
import { pipe } from 'fp-ts/function';

import type * as RTE from 'fp-ts/ReaderTaskEither';
import { isTaggedError, type TaggedError } from '../../types';

export const catchTaskEitherTagErrorTE =
<const T extends string, E2, B>(
tag: T,
catchTask: RTE.ReaderTaskEither<TaggedError<T>, E2, B>,
) =>
<E, A>(
task: TE.TaskEither<E & TaggedError<T> extends never ? never : E, A>,
) =>
pipe(
task,
TE.foldW((error): TE.TaskEither<E | E2, A | B> => {
if (isTaggedError(error) && error.tag === tag) {
return catchTask(error);
}

return TE.left(error);
}, TE.of),
) as TE.TaskEither<E | E2, A | B>;
2 changes: 2 additions & 0 deletions packages/core/src/helpers/fp-ts/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './catch-task-either-tagged-error-te';
export * from './tap-task-either-error-te';
export * from './tap-task-either-error';
export * from './tap-task-either';
export * from './try-tagged-error-task';
21 changes: 21 additions & 0 deletions packages/core/src/helpers/fp-ts/tap-task-either-error-te.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import * as TE from 'fp-ts/TaskEither';
import { pipe } from 'fp-ts/function';

export const tapTaskEitherErrorTE =
<A, E, E2 = E>(errorFnTE: (error: E) => TE.TaskEither<E2, unknown>) =>
(task: TE.TaskEither<E, A>): TE.TaskEither<E | E2, A> =>
pipe(
task,
TE.foldW(
error =>
pipe(
error,
errorFnTE,
TE.foldW(
rollbackError => TE.left(rollbackError),
() => TE.left(error),
),
),
TE.right,
),
);
2 changes: 1 addition & 1 deletion packages/core/src/helpers/fp-ts/try-tagged-error-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import type * as T from 'fp-ts/Task';
import type { TaggedError } from '../../types';

export const tryTaggedErrorTask =
<TC extends TaggedError>(TagClass: new (originalStack?: string) => TC) =>
<TC extends TaggedError<any>>(TagClass: new (originalStack?: string) => TC) =>
<R>(task: T.Task<R>): TE.TaskEither<TC, R> =>
TE.tryCatch(task, (exception: any) => new TagClass(exception.stack));
5 changes: 5 additions & 0 deletions packages/core/src/helpers/get-first-obj-key-value.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const getFirstObjKeyValue = <O extends Record<string, any>>(obj: O) => {
const [key] = Object.keys(obj);

return obj[key];
};
1 change: 1 addition & 0 deletions packages/core/src/helpers/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './fp-ts';
export * from './get-first-obj-key-value';
export * from './nop';
export * from './reject-falsy-values';
13 changes: 9 additions & 4 deletions packages/core/src/types/fp-ts/tagged-error.types.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
export abstract class TaggedError extends Error {
abstract readonly tag: string;
export abstract class TaggedError<T extends string> extends Error {
abstract readonly tag: T;

constructor(readonly originalStack?: string) {
super();
Error.captureStackTrace(this, TaggedError);
}

static ofLiteral = <const S extends string>(tag: S) =>
class TaggedLiteralError extends TaggedError<S> {
readonly tag = tag;
};
}

export const isTaggedError = (error: Error): error is TaggedError =>
'tag' in error;
export const isTaggedError = (error: any): error is TaggedError<any> =>
error && 'tag' in error;
2 changes: 1 addition & 1 deletion packages/core/src/types/fp-ts/tagged-task-either.types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { TaskEither } from 'fp-ts/TaskEither';
import type { TaggedError } from './tagged-error.types';

export type TaggedTaskEither<E extends TaggedError, A> = TaskEither<E, A>;
export type TaggedTaskEither<E extends TaggedError<any>, A> = TaskEither<E, A>;
1 change: 1 addition & 0 deletions packages/core/src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './fp-ts';
export * from './value-object.types';
24 changes: 24 additions & 0 deletions packages/core/src/types/value-object.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
export abstract class ValueObject<T> {
constructor(readonly props: T) {}

static of: <A>(props: A) => ValueObject<A>;

unwrap() {
return this.props;
}

bind<O>(fn: (props: T) => O): O {
return fn(this.props);
}

extend(props: Partial<T>): this {
return new (this.constructor as any)({
...this.props,
...props,
});
}

map(fn: (props: T) => T): this {
return new (this.constructor as any)(this.bind(fn));
}
}
42 changes: 28 additions & 14 deletions packages/logger/src/abstract/abstract-fp-ts-logger.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import { pipe, flow } from 'fp-ts/function';
import { pipe, flow, identity } from 'fp-ts/function';
import * as TE from 'fp-ts/TaskEither';

import type { AbstractLogger } from './abstract-logger.types';
import { nop, rejectFalsyItems, tapTaskEither } from '@searchpunch/core';
import {
nop,
rejectFalsyItems,
tapTaskEither,
tapTaskEitherError,
} from '@searchpunch/core';

type UnsafeErrorMessage = string | Error;

type TaskEitherLoggerAttrs<E, A> = {
onBefore?: () => string;
onRight?: (data: A) => string;
onBefore?: () => UnsafeErrorMessage;
onRight?: (data: A) => UnsafeErrorMessage;
onLeft: (error: E) => UnsafeErrorMessage;
};

Expand All @@ -20,23 +25,32 @@ export class AbstractFpTsLogger {
(task: TE.TaskEither<E, A>): TE.TaskEither<E, A> =>
pipe(
task,
this.logTaskEither({
onLeft,
tapTaskEitherError(error => {
this.tryLogErrorWithStack(onLeft(error))(error);
}),
);

logBeforeTaskEither =
<E, A>(onBefore: () => UnsafeErrorMessage) =>
(task: TE.TaskEither<E, A>): TE.TaskEither<E, A> =>
pipe(
TE.fromIO(onBefore ? flow(onBefore, this.logger.info) : nop),
TE.chain(() => task),
);

logTaskEitherSuccess =
<E, A>(onRight: (data: A) => UnsafeErrorMessage) =>
(task: TE.TaskEither<E, A>): TE.TaskEither<E, A> =>
pipe(task, tapTaskEither(flow(onRight, this.logger.info)));

logTaskEither =
<E, A>({ onBefore, onRight, onLeft }: TaskEitherLoggerAttrs<E, A>) =>
(task: TE.TaskEither<E, A>): TE.TaskEither<E, A> =>
pipe(
TE.fromIO(onBefore ?? nop),
TE.chain(() => task),
tapTaskEither(
onRight ? flow(onRight, this.logger.info) : nop,
error => {
this.tryLogErrorWithStack(onLeft(error))(error);
},
),
task,
onBefore ? this.logBeforeTaskEither(onBefore) : identity,
onLeft ? this.logTaskEitherError(onLeft) : identity,
onRight ? this.logTaskEitherSuccess(onRight) : identity,
);

private readonly tryLogErrorWithStack =
Expand Down
20 changes: 8 additions & 12 deletions packages/reindex/src/+internal/client/es-monadic-client.error.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
import { TaggedError } from '@searchpunch/core';

export class EsUnableSetAliasError extends TaggedError {
readonly tag = 'EsUnableSetAliasError';
}
export class EsUnableSetAliasError extends TaggedError.ofLiteral(
'EsUnableSetAlias',
) {}

export class EsNotFoundError extends TaggedError {
readonly tag = 'EsNotFoundError';
}
export class EsNotFoundError extends TaggedError.ofLiteral('EsNotFound') {}

export class EsConnectionRefused extends TaggedError {
readonly tag = 'EsConnectionRefused';
}
export class EsConnectionRefused extends TaggedError.ofLiteral(
'EsConnectionRefused',
) {}

export class EsInternalError extends TaggedError {
readonly tag = 'EsInternalError';
}
export class EsInternalError extends TaggedError.ofLiteral('EsInternal') {}

export type EsTaggedError =
| EsConnectionRefused
Expand Down
50 changes: 48 additions & 2 deletions packages/reindex/src/+internal/client/es-monadic-client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { pipe } from 'fp-ts/function';
import * as es from '@elastic/elasticsearch';

import { PinoLogger, type AbstractLogger } from '@searchpunch/logger';
import { getFirstObjKeyValue } from '@searchpunch/core';

import type { EsDocId } from './es-monadic-client.types';

Expand Down Expand Up @@ -38,6 +40,50 @@ export class EsMonadicClient {
client: new es.Client(esConnectionOptions),
});

readonly index = {
getMapping: (index: string) =>
pipe(async () => {
const response = await this.rawClient.indices.getMapping({
index,
});

return getFirstObjKeyValue(response.body).mappings;
}, tryEsTask(EsNotFoundError)),

delete: (names: string[]) =>
pipe(
async () =>
this.rawClient.indices.delete({
index: names,
}),
tryEsTask(),
this.logger.fp.logTaskEither({
onBefore: () => `Trying to delete indices: ${names.join(', ')}!`,
onLeft: () =>
`Cannot delete indices with names: ${names.join(', ')}!`,
onRight: () =>
`Indices with names ${names.join(', ')} has been deleted!`,
}),
),

create: (dto: es.estypes.IndicesCreateRequest) =>
pipe(
async () => {
await this.rawClient.indices.create(dto);

return {
index: dto.index,
};
},
tryEsTask(),
this.logger.fp.logTaskEither({
onBefore: () => `Trying to create index with name "${dto.index}"!`,
onLeft: () => `Cannot create index with name "${dto.index}"!`,
onRight: () => `Index with name "${dto.index}" has been created!`,
}),
),
};

readonly record = {
get: ({ id, index }: AttrWithEsIndex<{ id: EsDocId }>) =>
pipe(
Expand Down Expand Up @@ -102,9 +148,9 @@ export class EsMonadicClient {
this.rawClient.indices.existsAlias({
name,
}),
tryEsTask(),
tryEsTask(EsNotFoundError),
this.logger.fp.logTaskEitherError(
() => `Cannot check if "${name}" index exists!"`,
() => `Cannot check if "${name}" index exists!`,
),
),
};
Expand Down
4 changes: 3 additions & 1 deletion packages/reindex/src/+internal/client/try-es-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import {
} from './es-monadic-client.error';

export const tryEsTask =
<TC extends TaggedError>(TagClass?: new (originalStack?: string) => TC) =>
<TC extends TaggedError<any>>(
TagClass?: new (originalStack?: string) => TC,
) =>
<R>(
task: T.Task<R>,
): TE.TaskEither<EsConnectionRefused | EsInternalError | TC, R> =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const genTimestampSuffix = () => {
const d = new Date();
const time = [
d.getMonth(),
d.getDay(),
d.getHours(),
d.getMinutes(),
d.getSeconds(),
].map(num => num.toString().padStart(2, '0'));

return [
d.getFullYear(),
...time,
d.getMilliseconds().toString().padStart(3, '0'),
].join('');
};

export const createIndexNameWithTimestamp = (prefix: string) =>
`${prefix}-${genTimestampSuffix()}`;
1 change: 1 addition & 0 deletions packages/reindex/src/+internal/helpers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './create-index-name-with-timestamp';
1 change: 1 addition & 0 deletions packages/reindex/src/+internal/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './client';
export * from './helpers';
Loading

0 comments on commit f325f8e

Please sign in to comment.