Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cts): add tests for chunkedBatch wrappers #3268

Merged
merged 10 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -445,18 +445,18 @@ public async Task<ReplaceAllObjectsResponse> ReplaceAllObjectsAsync<T>(string in

var copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Rules, ScopeType.Settings, ScopeType.Synonyms] }, options, cancellationToken)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);

var batchResponse = await ChunkedBatchAsync(tmpIndexName, objects, Action.AddObject, batchSize,
var batchResponse = await ChunkedBatchAsync(tmpIndexName, objects, Action.AddObject, true, batchSize,
options, cancellationToken).ConfigureAwait(false);

await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Rules, ScopeType.Settings, ScopeType.Synonyms] }, options, cancellationToken)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
Expand Down Expand Up @@ -487,9 +487,9 @@ await WaitForTaskAsync(tmpIndexName, moveResponse.TaskID, requestOptions: option
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
/// <typeparam name="T"></typeparam>
public List<BatchResponse> ChunkedBatch<T>(string indexName, IEnumerable<T> objects, Action action = Action.AddObject,
int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default)
bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null, CancellationToken cancellationToken = default)
where T : class =>
AsyncHelper.RunSync(() => ChunkedBatchAsync(indexName, objects, action, batchSize, options, cancellationToken));
AsyncHelper.RunSync(() => ChunkedBatchAsync(indexName, objects, action, waitForTasks, batchSize, options, cancellationToken));

/// <summary>
/// Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests.
Expand All @@ -502,7 +502,7 @@ public List<BatchResponse> ChunkedBatch<T>(string indexName, IEnumerable<T> obje
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
/// <typeparam name="T"></typeparam>
public async Task<List<BatchResponse>> ChunkedBatchAsync<T>(string indexName, IEnumerable<T> objects,
Action action = Action.AddObject, int batchSize = 1000, RequestOptions options = null,
Action action = Action.AddObject, bool waitForTasks = false, int batchSize = 1000, RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
var batchCount = (int)Math.Ceiling((double)objects.Count() / batchSize);
Expand All @@ -518,10 +518,13 @@ public async Task<List<BatchResponse>> ChunkedBatchAsync<T>(string indexName, IE
responses.Add(batchResponse);
}

foreach (var batch in responses)
if (waitForTasks)
{
await WaitForTaskAsync(indexName, batch.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
foreach (var batch in responses)
{
await WaitForTaskAsync(indexName, batch.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
}
}

return responses;
Expand All @@ -544,7 +547,7 @@ public async Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEn
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, Action.AddObject, false, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -554,11 +557,11 @@ public async Task<List<BatchResponse>> SaveObjectsAsync<T>(string indexName, IEn
/// <param name="objectIDs">The list of `objectIDs` to remove from the given Algolia `indexName`.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
public async Task<List<BatchResponse>> DeleteObjects(string indexName, IEnumerable<String> objectIDs,
public async Task<List<BatchResponse>> DeleteObjectsAsync(string indexName, IEnumerable<String> objectIDs,
RequestOptions options = null,
CancellationToken cancellationToken = default)
{
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objectIDs.Select(id => new { objectID = id }), Action.DeleteObject, false, 1000, options, cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -569,11 +572,11 @@ public async Task<List<BatchResponse>> DeleteObjects(string indexName, IEnumerab
/// <param name="createIfNotExists">To be provided if non-existing objects are passed, otherwise, the call will fail.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
public async Task<List<BatchResponse>> PartialUpdateObjects<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists,
public async Task<List<BatchResponse>> PartialUpdateObjectsAsync<T>(string indexName, IEnumerable<T> objects, bool createIfNotExists,
RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, 1000, options, cancellationToken).ConfigureAwait(false);
return await ChunkedBatchAsync(indexName, objects, createIfNotExists ? Action.PartialUpdateObject : Action.PartialUpdateObjectNoCreate, false, 1000, options, cancellationToken).ConfigureAwait(false);
}

private static async Task<List<TU>> CreateIterable<TU>(Func<TU, Task<TU>> executeQuery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,19 +502,20 @@ public extension SearchClient {
/// `chunkedBatch` helper is used under the hood, which creates a `batch` requests with at most 1000 objects in it.
/// - parameter indexName: The name of the index where to update the objects
/// - parameter objects: The objects to update
/// - parameter createIfNotExist: To be provided if non-existing objects are passed, otherwise, the call will fail..
/// - parameter createIfNotExists: To be provided if non-existing objects are passed, otherwise, the call will
/// fail..
/// - parameter requestOptions: The request options
/// - returns: [BatchResponse]
func partialUpdateObjects(
indexName: String,
objects: [some Encodable],
createIfNotExist: Bool = false,
createIfNotExists: Bool = false,
requestOptions: RequestOptions? = nil
) async throws -> [BatchResponse] {
try await self.chunkedBatch(
indexName: indexName,
objects: objects,
action: createIfNotExist ? .partialUpdateObject : .partialUpdateObjectNoCreate,
action: createIfNotExists ? .partialUpdateObject : .partialUpdateObjectNoCreate,
waitForTasks: false,
batchSize: 1000,
requestOptions: requestOptions
Expand Down Expand Up @@ -544,7 +545,7 @@ public extension SearchClient {
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.rules, .settings, .synonyms]
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)
Expand All @@ -563,7 +564,7 @@ public extension SearchClient {
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.rules, .settings, .synonyms]
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.algolia.codegen.exceptions.CTSException;
import com.algolia.codegen.utils.*;
import io.swagger.util.Json;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -160,7 +161,11 @@ public void run(Map<String, CodegenModel> models, Map<String, CodegenOperation>
if (step.expected.match instanceof Map match) {
paramsType.enhanceParameters(match, matchMap);
stepOut.put("match", matchMap);
stepOut.put("matchIsObject", true);
stepOut.put("matchIsJSON", true);
} else if (step.expected.match instanceof List match) {
matchMap.put("parameters", Json.mapper().writeValueAsString(step.expected.match));
stepOut.put("match", matchMap);
stepOut.put("matchIsJSON", true);
} else {
stepOut.put("match", step.expected.match);
}
Expand Down
9 changes: 6 additions & 3 deletions scripts/cts/runCts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { createSpinner } from '../spinners.js';
import type { Language } from '../types.js';

import { startTestServer } from './testServer';
import { assertChunkWrapperValid } from './testServer/chunkWrapper.js';
import { assertValidReplaceAllObjects } from './testServer/replaceAllObjects.js';
import { assertValidTimeouts } from './testServer/timeout.js';

async function runCtsOne(language: string): Promise<void> {
Expand Down Expand Up @@ -86,9 +88,10 @@ export async function runCts(languages: Language[], clients: string[]): Promise<
if (useTestServer) {
await close();

assertValidTimeouts(languages.length);
const skip = (lang: Language): number => (languages.includes(lang) ? 1 : 0);

// uncomment this once all languages are supported
// assertValidReplaceAllObjects(languages.length);
assertValidTimeouts(languages.length);
assertChunkWrapperValid(languages.length - skip('dart') - skip('scala'));
assertValidReplaceAllObjects(languages.length - skip('dart') - skip('scala'));
}
}
113 changes: 113 additions & 0 deletions scripts/cts/testServer/chunkWrapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import type { Server } from 'http';

import { expect } from 'chai';
import express from 'express';
import type { Express } from 'express';

import { setupServer } from '.';

const chunkWrapperState: Record<string, any> = {};

export function assertChunkWrapperValid(expectedCount: number): void {
if (Object.values(chunkWrapperState).length !== expectedCount) {
throw new Error('unexpected number of call to chunkWrapper');
}
for (const state of Object.values(chunkWrapperState)) {
expect(state).to.deep.equal({ saveObjects: 1, partialUpdateObjects: 2, deleteObjects: 1 });
}
}

function addRoutes(app: Express): void {
app.use(express.urlencoded({ extended: true }));
app.use(
express.json({
type: ['application/json', 'text/plain'], // the js client sends the body as text/plain
}),
);

app.post('/1/indexes/:indexName/batch', (req, res) => {
const match = req.params.indexName.match(/^cts_e2e_(\w+)_(.*)$/);
const helper = match?.[1] as string;
const lang = match?.[2] as string;

if (!chunkWrapperState[lang]) {
chunkWrapperState[lang] = {};
}
chunkWrapperState[lang][helper] = (chunkWrapperState[lang][helper] ?? 0) + 1;
switch (helper) {
case 'saveObjects':
expect(req.body).to.deep.equal({
requests: [
{ action: 'addObject', body: { objectID: '1', name: 'Adam' } },
{ action: 'addObject', body: { objectID: '2', name: 'Benoit' } },
],
});

res.json({
taskID: 333,
objectIDs: req.body.requests.map((r) => r.body.objectID),
});

break;
case 'partialUpdateObjects':
if (req.body.requests[0].body.objectID === '1') {
expect(req.body).to.deep.equal({
requests: [
{ action: 'partialUpdateObject', body: { objectID: '1', name: 'Adam' } },
{ action: 'partialUpdateObject', body: { objectID: '2', name: 'Benoit' } },
],
});

res.json({
taskID: 444,
objectIDs: req.body.requests.map((r) => r.body.objectID),
});
} else {
expect(req.body).to.deep.equal({
requests: [
{ action: 'partialUpdateObjectNoCreate', body: { objectID: '3', name: 'Cyril' } },
{ action: 'partialUpdateObjectNoCreate', body: { objectID: '4', name: 'David' } },
],
});

res.json({
taskID: 555,
objectIDs: req.body.requests.map((r) => r.body.objectID),
});
}
break;
case 'deleteObjects':
expect(req.body).to.deep.equal({
requests: [
{ action: 'deleteObject', body: { objectID: '1' } },
{ action: 'deleteObject', body: { objectID: '2' } },
],
});

res.json({
taskID: 666,
objectIDs: req.body.requests.map((r) => r.body.objectID),
});
break;
default:
throw new Error('unknown helper');
}
});

// fallback route
app.use((req, res) => {
// eslint-disable-next-line no-console
console.log('fallback route', req.method, req.url);
res.status(404).json({ message: 'not found' });
});

app.use((err, req, res, _) => {
// eslint-disable-next-line no-console
console.error(err.message);
res.status(500).send({ message: err.message });
});
}

export function chunkWrapperServer(): Promise<Server> {
return setupServer('chunkWrapper', 6680, addRoutes);
}
4 changes: 3 additions & 1 deletion scripts/cts/testServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Express } from 'express';

import { createSpinner } from '../../spinners';

import { chunkWrapperServer } from './chunkWrapper';
import { gzipServer } from './gzip';
import { replaceAllObjectsServer } from './replaceAllObjects';
import { timeoutServer } from './timeout';
Expand All @@ -13,9 +14,10 @@ import { timeoutServerBis } from './timeoutBis';
export async function startTestServer(): Promise<() => Promise<void>> {
const servers = await Promise.all([
timeoutServer(),
timeoutServerBis(),
gzipServer(),
timeoutServerBis(),
replaceAllObjectsServer(),
chunkWrapperServer(),
]);

return async () => {
Expand Down
Loading
Loading