Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(databases): Add support for fast bulk operations #2945

Open
wants to merge 1 commit into
base: dove
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/api/databases/common.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ Disabling or changing the default pagination is not available in the client. Onl

</BlockQuote>

## Bulk updates

Some database adapters allow to set the `params.bulk` option to perform fast `create`, `patch` or `remove` operations for a large amount of data. Setting `params.bulk = true` will always return no data (an empty array `[]`) and not send any real-time events.

```ts
const manyTodos = await readCSVFile('todos.csv')

await app.service('todos').create(manyTodos, {
bulk: true
}) // -> []
```

## Extending Adapters

There are two ways to extend existing database adapters. Either by extending the base class or by adding functionality through hooks.
Expand Down
1 change: 1 addition & 0 deletions packages/adapter-commons/src/declarations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export interface AdapterParams<
> extends Params<Q> {
adapter?: A
paginate?: PaginationParams
bulk?: boolean
}

/**
Expand Down
4 changes: 4 additions & 0 deletions packages/adapter-commons/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ export abstract class AdapterBase<
* @returns Wether or not multiple updates are allowed.
*/
allowsMulti(method: string, params: ServiceParams = {} as ServiceParams) {
if (params.bulk) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can update allowMulti to accept context.id and have a centralized place to throw an error whenever params.bulk === true and

  • For create: data is an object
  • For patch and remove: context.id is null

return true
}

const always = alwaysMulti[method]

if (typeof always !== 'undefined') {
Expand Down
3 changes: 3 additions & 0 deletions packages/adapter-tests/src/declarations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export type AdapterMethodsTestName =
| '.remove'
| '.remove + $select'
| '.remove + id + query'
| '.remove bulk'
| '.remove + multi'
| '.remove + multi no pagination'
| '.remove + id + query id'
Expand All @@ -49,12 +50,14 @@ export type AdapterMethodsTestName =
| '.patch multiple no pagination'
| '.patch multi query same'
| '.patch multi query changed'
| '.patch bulk'
| '.patch + NotFound'
| '.patch + query + NotFound'
| '.patch + id + query id'
| '.create'
| '.create + $select'
| '.create multi'
| '.create bulk'
| 'internal .find'
| 'internal .get'
| 'internal .create'
Expand Down
85 changes: 85 additions & 0 deletions packages/adapter-tests/src/methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,28 @@ export default (test: AdapterMethodsTest, app: any, _errors: any, serviceName: s
}
})

test('.remove bulk', async () => {
await service.create({ name: 'Dave', age: 29, created: true })
await service.create({
name: 'David',
age: 3,
created: true
})

const data = await service.remove(null, {
query: { created: true },
bulk: true
})

assert.deepStrictEqual(data, [])

const found = await service.find({
query: { created: true }
})

assert.strictEqual(found.length, 0)
})

test('.remove + multi', async () => {
try {
await service.remove(null)
Expand Down Expand Up @@ -398,6 +420,39 @@ export default (test: AdapterMethodsTest, app: any, _errors: any, serviceName: s
await service.remove(david[idProp])
})

test('.patch bulk', async () => {
const dave = await service.create({
name: 'Dave',
age: 29,
created: true
})
const david = await service.create({
name: 'David',
age: 3,
created: true
})

const data = await service.patch(
null,
{
age: 2
},
{
query: { created: true },
bulk: true
}
)

assert.deepStrictEqual(data, [])

const daveAfter = await service.get(dave[idProp])

assert.strictEqual(daveAfter.age, 2, 'Dave age was updated')

await service.remove(dave[idProp])
await service.remove(david[idProp])
})

test('.patch multiple no pagination', async () => {
try {
await service.remove(doug[idProp])
Expand Down Expand Up @@ -643,6 +698,36 @@ export default (test: AdapterMethodsTest, app: any, _errors: any, serviceName: s
await service.remove(data[0][idProp])
await service.remove(data[1][idProp])
})

test('.create bulk', async () => {
const items = [
{
name: 'Gerald',
age: 18
},
{
name: 'Herald',
age: 18
}
]

const data = await service.create(items, {
bulk: true
})

assert.deepStrictEqual(data, [])

const foundItems = await service.find({
query: { age: 18 }
})

assert.strictEqual(foundItems.length, 2)

await service.remove(null, {
query: { age: 18 },
bulk: true
})
})
})

describe("doesn't call public methods internally", () => {
Expand Down
20 changes: 20 additions & 0 deletions packages/knex/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,15 @@ export class KnexAdapter<
): Promise<Result | Result[]> {
const data = _data as any

if (params.bulk) {
const res = await this.db(params)
.insert(data)
.then(() => [])
.catch(errorHandler)

return res as Result[]
}

if (Array.isArray(data)) {
return Promise.all(data.map((current) => this._create(current, params)))
}
Expand Down Expand Up @@ -252,6 +261,12 @@ export class KnexAdapter<
}

const data = _.omit(raw, this.id)

if (params.bulk) {
await this.createQuery(params).update(data)
return []
}

const results = await this._findOrGet(id, {
...params,
query: {
Expand Down Expand Up @@ -313,6 +328,11 @@ export class KnexAdapter<
throw new MethodNotAllowed('Can not remove multiple entries')
}

if (params.bulk) {
await this.createQuery(params).del().catch(errorHandler)
return []
}

const items = await this._findOrGet(id, params)
const { query } = this.filterQuery(params)
const q = this.db(params)
Expand Down
3 changes: 3 additions & 0 deletions packages/knex/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const testSuite = adapterTests([
'.remove + id + query',
'.remove + multi',
'.remove + multi no pagination',
'.remove bulk',
'.remove + id + query id',
'.update',
'.update + $select',
Expand All @@ -38,6 +39,7 @@ const testSuite = adapterTests([
'.patch + $select',
'.patch + id + query',
'.patch multiple',
'.patch bulk',
'.patch multiple no pagination',
'.patch multi query same',
'.patch multi query changed',
Expand All @@ -47,6 +49,7 @@ const testSuite = adapterTests([
'.create',
'.create + $select',
'.create multi',
'.create bulk',
'internal .find',
'internal .get',
'internal .create',
Expand Down
32 changes: 19 additions & 13 deletions packages/memory/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import {
AdapterBase,
AdapterServiceOptions,
PaginationOptions,
AdapterParams
AdapterParams,
AdapterQuery
} from '@feathersjs/adapter-commons'
import sift from 'sift'
import { NullableId, Id, Params, Paginated } from '@feathersjs/feathers'
Expand All @@ -28,10 +29,12 @@ const _select = (data: any, params: any, ...args: any[]) => {
return base(JSON.parse(JSON.stringify(data)))
}

export type MemoryAdapterParams<Q = AdapterQuery> = AdapterParams<Q, Partial<MemoryServiceOptions>>

export class MemoryAdapter<
Result = any,
Data = Partial<Result>,
ServiceParams extends Params = Params,
ServiceParams extends MemoryAdapterParams = MemoryAdapterParams,
PatchData = Partial<Data>
> extends AdapterBase<Result, Data, PatchData, ServiceParams, MemoryServiceOptions<Result>> {
store: MemoryServiceStore<Result>
Expand Down Expand Up @@ -145,18 +148,18 @@ export class MemoryAdapter<
async _create(data: Partial<Data>[], params?: ServiceParams): Promise<Result[]>
async _create(data: Partial<Data> | Partial<Data>[], _params?: ServiceParams): Promise<Result | Result[]>
async _create(
data: Partial<Data> | Partial<Data>[],
_data: Partial<Data> | Partial<Data>[],
params: ServiceParams = {} as ServiceParams
): Promise<Result | Result[]> {
if (Array.isArray(data)) {
return Promise.all(data.map((current) => this._create(current, params)))
}
const payload = Array.isArray(_data) ? _data : [_data]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To skip the extra processing for large arrays, we can add another check for params.bulk before running .map:

const payload = Array.isArray(_data) ? _data : [_data]
const results = (params.bulk ? [] : payload).map((value) => {
  const id = (value as any)[this.id] || this._uId++
  const current = _.extend({}, value, { [this.id]: id })

  return _select((this.store[id] = current), params, this.id)
})

return params.bulk ? [] : Array.isArray(_data) ? results : results[0]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we also need to address the scenario where we create a single record and also pass params.bulk:

const david = await service.create(
  {
    name: 'David',
    age: 3,
    created: true
  }, 
  { bulk: true }
)

We don't want to return an empty array for the above scenario since that breaks the request-to-response plurality mapping of the service interface. So we probably ought to throw an error:

if (!Array.isArray(_data) && params.bulk) {
  throw new BadRequest()
}

const results = payload.map((value) => {
const id = (value as any)[this.id] || this._uId++
const current = _.extend({}, value, { [this.id]: id })

const id = (data as any)[this.id] || this._uId++
const current = _.extend({}, data, { [this.id]: id })
const result = (this.store[id] = current)
return _select((this.store[id] = current), params, this.id)
})

return _select(result, params, this.id) as Result
return params.bulk ? [] : Array.isArray(_data) ? results : results[0]
}

async _update(id: Id, data: Data, params: ServiceParams = {} as ServiceParams): Promise<Result> {
Expand Down Expand Up @@ -202,11 +205,12 @@ export class MemoryAdapter<
...params,
query
})
const results = entries.map(patchEntry)

return entries.map(patchEntry)
return params.bulk ? [] : results
}

return patchEntry(await this._get(id, params)) // Will throw an error if not found
return params.bulk ? [] : patchEntry(await this._get(id, params)) // Will throw an error if not found
}

async _remove(id: null, params?: ServiceParams): Promise<Result[]>
Expand All @@ -225,7 +229,9 @@ export class MemoryAdapter<
query
})

return Promise.all(entries.map((current: any) => this._remove(current[this.id] as Id, params)))
entries.forEach((current: any) => delete this.store[(current as any)[this.id]])

return params.bulk ? [] : entries
}

const entry = await this._get(id, params)
Expand Down
3 changes: 3 additions & 0 deletions packages/memory/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const testSuite = adapterTests([
'.remove + id + query',
'.remove + multi',
'.remove + multi no pagination',
'.remove bulk',
'.remove + id + query id',
'.update',
'.update + $select',
Expand All @@ -40,12 +41,14 @@ const testSuite = adapterTests([
'.patch multiple no pagination',
'.patch multi query same',
'.patch multi query changed',
'.patch bulk',
'.patch + query + NotFound',
'.patch + NotFound',
'.patch + id + query id',
'.create',
'.create + $select',
'.create multi',
'.create bulk',
'internal .find',
'internal .get',
'internal .create',
Expand Down
31 changes: 19 additions & 12 deletions packages/mongodb/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ export class MongoDbAdapter<
data: Data | Data[],
params: ServiceParams = {} as ServiceParams
): Promise<Result | Result[]> {
const writeOptions = params.mongodb
const model = await this.getModel(params)
const setId = (item: any) => {
const entry = Object.assign({}, item)
Expand All @@ -279,14 +278,14 @@ export class MongoDbAdapter<

const promise = Array.isArray(data)
? model
.insertMany(data.map(setId), writeOptions)
.insertMany(data.map(setId), params.mongodb)
.then(async (result) =>
Promise.all(
Object.values(result.insertedIds).map(async (_id) => model.findOne({ _id }, params.mongodb))
)
params.bulk
? []
: model.find({ _id: { $in: Object.values(result.insertedIds) } }, params.mongodb).toArray()
)
: model
.insertOne(setId(data), writeOptions)
.insertOne(setId(data), params.mongodb)
.then(async (result) => model.findOne({ _id: result.insertedId }, params.mongodb))

return promise.then(select(params, this.id)).catch(errorHandler)
Expand Down Expand Up @@ -325,6 +324,12 @@ export class MongoDbAdapter<

return current
}, {} as any)

if (params.bulk) {
await model.updateMany(query, modifier, updateOptions)
return []
}

const originalIds = await this._findOrGet(id, {
...params,
query: {
Expand Down Expand Up @@ -389,11 +394,13 @@ export class MongoDbAdapter<
}
}

return this._findOrGet(id, findParams)
.then(async (items) => {
await model.deleteMany(query, deleteOptions)
return items
})
.catch(errorHandler)
return params.bulk
? model.deleteMany(query, deleteOptions).then(() => [])
: this._findOrGet(id, findParams)
.then(async (items) => {
await model.deleteMany(query, deleteOptions)
return items
})
.catch(errorHandler)
}
}