Skip to content

Commit

Permalink
feat: add reindex logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Mati365 committed Feb 18, 2024
1 parent e6ac729 commit 6c444d8
Show file tree
Hide file tree
Showing 25 changed files with 734 additions and 671 deletions.
12 changes: 12 additions & 0 deletions packages/core/src/helpers/fp-ts/delay-task-either.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import * as TE from 'fp-ts/TaskEither';
import { timeout } from '../timeout';

export const timeoutTE = (ms: number) => TE.fromTask(async () => timeout(ms));

export const delayTaskEither =
<TE>(time: number) =>
(task: TE): TE =>
TE.chainFirst(() => async () => {
await timeout(time);
return TE.of(undefined) as any;
})(task as any) as TE;
2 changes: 2 additions & 0 deletions packages/core/src/helpers/fp-ts/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export * from './catch-task-either-tagged-error-te';
export * from './delay-task-either';
export * from './tap-task-either-error-te';
export * from './tap-task-either-error';
export * from './tap-task-either';
export * from './try-or-throw';
export * from './try-tagged-error-task';
16 changes: 16 additions & 0 deletions packages/core/src/helpers/fp-ts/try-or-throw.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { task as T, either as E } from 'fp-ts';
import { pipe } from 'fp-ts/function';

import type { TaskEither } from 'fp-ts/TaskEither';

export const tryOrThrow = <E, A>(taskEither: TaskEither<E, A>): T.Task<A> =>
pipe(
taskEither,
T.map(either => {
if (E.isLeft(either)) {
throw either.left;

Check failure on line 11 in packages/core/src/helpers/fp-ts/try-or-throw.ts

View workflow job for this annotation

GitHub Actions / Lint app and run tests

Expected an error object to be thrown
}

return either.right;
}),
);
1 change: 1 addition & 0 deletions packages/core/src/helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from './fp-ts';
export * from './get-first-obj-key-value';
export * from './nop';
export * from './reject-falsy-values';
export * from './timeout';
4 changes: 4 additions & 0 deletions packages/core/src/helpers/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export const timeout = async (ms: number) =>
new Promise(resolve => {
setTimeout(resolve, ms);
});
1 change: 1 addition & 0 deletions packages/core/src/types/can-be/can-be-promise.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export type CanBePromise<T> = T | Promise<T>;
1 change: 1 addition & 0 deletions packages/core/src/types/can-be/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './can-be-promise.types';
1 change: 1 addition & 0 deletions packages/core/src/types/entity.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export type ID = string | number;
2 changes: 1 addition & 1 deletion packages/core/src/types/fp-ts/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * from './tagged-task-either.types';
export * from './tagged-error.types';
export * from './tagged-task-either.types';
2 changes: 2 additions & 0 deletions packages/core/src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export * from './can-be';
export * from './entity.types';
export * from './fp-ts';
export * from './value-object.types';
4 changes: 1 addition & 3 deletions packages/core/src/types/value-object.types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
export abstract class ValueObject<T> {
constructor(readonly props: T) {}

static of: <A>(props: A) => ValueObject<A>;
constructor(readonly props: Readonly<T>) {}

unwrap() {
return this.props;
Expand Down
2 changes: 2 additions & 0 deletions packages/reindex/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
"@elastic/elasticsearch": "^8.12.1",
"@searchpunch/core": "*",
"@searchpunch/logger": "*",
"async-await-queue": "^2.1.4",
"elastic-builder": "^2.24.0",
"fast-deep-equal": "^3.1.3",
"fp-ts": "^2.16.2",
"object-hash": "^3.0.0"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ export class EsUnableSetAliasError extends TaggedError.ofLiteral(

export class EsNotFoundError extends TaggedError.ofLiteral('EsNotFound') {}

export class EsBulkReindexError extends TaggedError.ofLiteral(
'EsCannotBulkReindex',
) {}

export class EsConnectionRefused extends TaggedError.ofLiteral(
'EsConnectionRefused',
) {}
Expand All @@ -16,4 +20,5 @@ export type EsTaggedError =
| EsConnectionRefused
| EsUnableSetAliasError
| EsNotFoundError
| EsBulkReindexError
| EsInternalError;
67 changes: 60 additions & 7 deletions packages/reindex/src/+internal/client/es-monadic-client.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import { pipe } from 'fp-ts/function';
import * as es from '@elastic/elasticsearch';
import * as esb from 'elastic-builder';

import { PinoLogger, type AbstractLogger } from '@searchpunch/logger';
import { getFirstObjKeyValue } from '@searchpunch/core';

import type { EsDocId } from './es-monadic-client.types';
import type { ID } from '@searchpunch/core';
import type { EsDoc } from './es-monadic-client.types';

import { tryEsTask } from './try-es-task';
import {
EsBulkReindexError,
EsNotFoundError,
EsUnableSetAliasError,
} from './es-monadic-client.error';

type AttrWithEsIndex<F = unknown> = F & {
export type AttrWithEsIndex<F = unknown> = F & {
index: string;
};

Expand All @@ -28,7 +30,7 @@ export class EsMonadicClient {

constructor(options: EsMonadicClientOptions) {
this.rawClient = options.client;
this.logger = options.logger ?? new PinoLogger('EsMonadicDecorator');
this.logger = options.logger ?? new PinoLogger('EsMonadicClient');
}

static ofConnection = (
Expand All @@ -47,7 +49,7 @@ export class EsMonadicClient {
index,
});

return getFirstObjKeyValue(response.body).mappings;
return response.body.mappings;
}, tryEsTask(EsNotFoundError)),

delete: (names: string[]) =>
Expand Down Expand Up @@ -84,8 +86,33 @@ export class EsMonadicClient {
),
};

readonly reindex = {
bulk: <Doc extends EsDoc<unknown>>({
index,
docs,
}: AttrWithEsIndex<{ docs: Doc[] }>) =>
pipe(
async () => {
await this.rawClient.bulk({
refresh: true,
body: docs.flatMap(({ _id, ...doc }) => [
{
index: {
_index: index,
_id: _id.toString(),
},
},
doc,
]),
});
},
tryEsTask(EsBulkReindexError),
this.logger.fp.logTaskEitherError(() => 'Cannot bulk reindex records!'),
),
};

readonly record = {
get: ({ id, index }: AttrWithEsIndex<{ id: EsDocId }>) =>
get: ({ id, index }: AttrWithEsIndex<{ id: ID }>) =>
pipe(
async () => {
const response = await this.rawClient.get({
Expand All @@ -99,7 +126,7 @@ export class EsMonadicClient {
this.logger.fp.logTaskEitherError(() => `Record with ${id} not found!`),
),

delete: ({ id, index }: AttrWithEsIndex<{ id: EsDocId }>) =>
delete: ({ id, index }: AttrWithEsIndex<{ id: ID }>) =>
pipe(
async () =>
this.rawClient.delete({
Expand All @@ -109,6 +136,32 @@ export class EsMonadicClient {
tryEsTask(),
this.logger.fp.logTaskEitherError(() => `Record with ${id} not found!`),
),

deleteByIds: ({ ids, index }: AttrWithEsIndex<{ ids: ID[] }>) =>
pipe(
async () => {
if (!ids.length) {
return;
}

await this.rawClient.deleteByQuery({
index,
body: esb
.requestBodySearch()
.query(
esb.termsQuery(
'id',
ids.map(id => id.toString()),
),
)
.toJSON(),
});
},
tryEsTask(),
this.logger.fp.logTaskEitherError(
() => `Cannot delete records ${ids.join(', ')}!`,
),
),
};

readonly alias = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
export type EsDocId = string | number;
import type { ID } from '@searchpunch/core';

export type EsDoc<I> = I & {
_id: ID;
};
3 changes: 3 additions & 0 deletions packages/reindex/src/+internal/client/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
export * from './es-monadic-client.error';
export * from './es-monadic-client';
export * from './es-monadic-client.types';
export * from './try-es-task';
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,15 @@ export const createIndexSchemaWithMagicMeta = <S extends { mappings?: object }>(
...schema.mappings,
},
});

export const getIndexSchemaMagicMeta = (
mappings: any,
): EsIndexMagicMeta | null => {
const maybeMappings = mappings?._meta;

if (maybeMappings?.tag === MAGIC_ES_SCHEMA_TAG) {
return maybeMappings;
}

return null;
};
100 changes: 0 additions & 100 deletions packages/reindex/src/es-index.ts

This file was deleted.

9 changes: 9 additions & 0 deletions packages/reindex/src/es-index/es-index-db-adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import type { EsDoc } from '+internal';
import type { CanBePromise, ID } from '@searchpunch/core';

export type EsDbAsyncIdIterator = AsyncIterableIterator<ID[]>;

export type EsIndexDbAdapter<Doc> = {
createAllEntitiesIdsIterator: () => EsDbAsyncIdIterator;
findEntities: (ids: ID[]) => CanBePromise<Array<EsDoc<Doc>>>;
};
13 changes: 13 additions & 0 deletions packages/reindex/src/es-index/es-index.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { TaggedError } from '@searchpunch/core';

export class EsIndexBulkReindexError extends TaggedError.ofLiteral(
'EsIndexBulkReindexError',
) {}

export class EsIndexStreamReindexError extends TaggedError.ofLiteral(
'EsIndexStreamReindexError',
) {}

export class EsIndexSingleReindexError extends TaggedError.ofLiteral(
'EsIndexSingleReindexError',
) {}
Loading

0 comments on commit 6c444d8

Please sign in to comment.