Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [WIP] apify extra #116

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module.exports = {
],
moduleNameMapper: {
'^apify$': '<rootDir>/packages/apify/src',
'^apify-extra$': '<rootDir>/packages/apify-extra/src',
'^@apify/scraper-tools$': '<rootDir>/packages/scraper-tools/src',
},
modulePathIgnorePatterns: [
Expand Down
484 changes: 132 additions & 352 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions packages/apify-extra/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
node_modules
src
test
coverage
apify_storage
tsconfig.*
2 changes: 2 additions & 0 deletions packages/apify-extra/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Apify Extra

65 changes: 65 additions & 0 deletions packages/apify-extra/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{
"name": "apify-extra",
"version": "0.0.1",
"description": "Advanced and experimental functionality for Apify Actors and Crawlee. Smaller test coverage. Use with caution!",
"engines": {
"node": ">=16.0.0"
},
"main": "./dist/index.js",
"module": "./dist/index.mjs",
"types": "./dist/index.d.ts",
"exports": {
".": {
"import": "./dist/index.mjs",
"require": "./dist/index.js",
"types": "./dist/index.d.ts"
},
"./package.json": "./package.json"
},
"keywords": [
"apify",
"headless",
"chrome",
"puppeteer",
"crawler",
"scraper"
],
"author": {
"name": "Apify",
"email": "[email protected]",
"url": "https://apify.com"
},
"contributors": [
"Jan Curn <[email protected]>",
"Marek Trunkat <[email protected]>",
"Ondra Urban <[email protected]>"
],
"license": "Apache-2.0",
"repository": {
"type": "git",
"url": "git+https://github.com/apify/apify-sdk-js"
},
"bugs": {
"url": "https://github.com/apify/apify-sdk-js/issues"
},
"homepage": "https://sdk.apify.com",
"scripts": {
"build": "npm run clean && npm run compile && npm run copy && npm run fixApifyExport",
"clean": "rimraf ./dist",
"compile": "tsc -p tsconfig.build.json && gen-esm-wrapper ./dist/index.js ./dist/index.mjs",
"copy": "ts-node -T ../../scripts/copy.ts --readme=local",
"fixApifyExport": "ts-node -T ../../scripts/temp_fix_apify_exports.ts"
},
"publishConfig": {
"access": "public"
},
"dependencies": {
"apify": "^3.0.0",
"@types/bluebird": "^3.5.37",
"bluebird": "^3.7.2"
},
"peerDependencies": {
"apify": ">= 3.0.0",
"crawlee": ">= 3.0.0"
}
}
2 changes: 2 additions & 0 deletions packages/apify-extra/src/const.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const APIFY_EXTRA_KV_RECORD_PREFIX = 'APIFY-EXTRA-';
export const APIFY_EXTRA_LOG_PREFIX = '[apify-extra]: ';
184 changes: 184 additions & 0 deletions packages/apify-extra/src/dataset.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/* eslint-disable max-classes-per-file */
import { Actor, Dataset as OriginalDataset } from 'apify';
import { log } from 'crawlee';

import { APIFY_EXTRA_KV_RECORD_PREFIX, APIFY_EXTRA_LOG_PREFIX } from './const';

export type DatasetItem = Exclude<Parameters<OriginalDataset['pushData']>[0], any[]>;

export async function waitForCompletion<T>(promises: (() => Promise<T>)[], maxConcurrency: number): Promise<void> {
async function worker() {
let job;
/* eslint-disable-next-line no-cond-assign */
while (job = promises.shift()) await job();
}

await Promise.all([...new Array(maxConcurrency)].map(() => worker()));
}

export class ChunkTracker {
private readonly chunks: Record<string, boolean> = {};

public add(chunkId: string): void {
this.chunks[chunkId] = true;
}

public has(chunkId: string): boolean {
return this.chunks[chunkId] === true;
}

public get(): string[] {
return Object.keys(this.chunks);
}

constructor(data?: Record<string, boolean> | null) {
if (data) {
Object.assign(this.chunks, data);
}
}
}

export class Dataset extends OriginalDataset {
/**
* Stores an object or an array of objects to the dataset.
* The function returns a promise that resolves when the operation finishes.
* It has no result, but throws on invalid args or other errors.
*
* **IMPORTANT**: Make sure to use the `await` keyword when calling `pushDataParallel()`,
* otherwise the crawler process might finish before the data is stored!
*
* The size of the data is limited by the receiving API and therefore `pushDataParallel()` will only
* allow objects whose JSON representation is smaller than 9MB. When an array is passed,
* none of the included objects
* may be larger than 9MB, but the array itself may be of any size.
*
* This method parallellizes the pushData calls to the Apify API, which can handle up to 30 parallel requests.
* It also ensures keeps track of the progress and can resume the push if the actor is migrated.
* Unline the `pushData` method, this method does not guarantee the order of items.
*
* @param data Object or array of objects containing data to be stored in the dataset.
* The objects must be serializable to JSON and the JSON representation of each object must be smaller than 9MB.
* @param [options] All `pushDataParallel()` parameters.
* @param [options.batchSize] Number of items to be pushed in one push call.
* Should not be higher than 1000 to ensure each push call finishes when migration happens.
* @param [options.parallelPushes] Number of push calls to be done in parallel. Apify API should handle up to 30 parallel requests.
* @param [options.idempotencyKey] By providing different idempotency keys (any string), you can call this function multiple times in the same run.
*/
public async pushDataParallel(data: DatasetItem | DatasetItem[], options: {
batchSize?: number;
parallelPushes?: number;
idempotencyKey?: string;
} = {}) {
if (!Array.isArray(data)) {
return this.pushData(data);
}

const {
batchSize = 1000,
parallelPushes = 10,
idempotencyKey = '',
} = options;

if (parallelPushes > 30) {
log.warning(`${APIFY_EXTRA_LOG_PREFIX} Setting the parallelPushes option larger than 30 can lead to problems with the Apify Platform API.`);
}

const sanitizedIdempotencyKey = idempotencyKey.replace(/[^a-zA-Z0-9]/g, '-').slice(0, 30);
const chunkTrackerName = `${APIFY_EXTRA_KV_RECORD_PREFIX}-PUSH-${this.id}-${sanitizedIdempotencyKey}`;
const chunkTracker = new ChunkTracker(await Actor.getValue<Record<string, boolean>>(chunkTrackerName));

let isMigrating = false;
const migrationCallback = async (migrating: boolean) => {
isMigrating = migrating ?? true;
await Actor.setValue(chunkTrackerName, chunkTracker.get());
};

Actor.on('migrating', migrationCallback);
Actor.on('aborting', migrationCallback);
Actor.on('persistState', () => migrationCallback(false));

return waitForCompletion(
[...new Array(Math.ceil(data.length / batchSize))]
.filter((_, i) => !chunkTracker.has(`${batchSize * i}`))
.map((_, i) => async () => {
if (isMigrating) {
log.info(`${APIFY_EXTRA_LOG_PREFIX}[pushParallel]: Stopping pushParallel because of migration`);
await new Promise(() => {});
}

chunkTracker.add(`${batchSize * i}`);
const currentSlice = data.slice(batchSize * i, batchSize * (i + 1));
if (currentSlice.length > 0) {
await this.pushData(currentSlice);
}
}),
parallelPushes);
};

/**
* Iterates over dataset items, passing every item to the provided `func` function.
* Each invocation of `func` is called with two arguments: `(item, index)`. Index specifies the zero-based index of the item in the dataset.
*
* If the `func` function returns a Promise, it is awaited.
* If it throws an error, the iteration is aborted and the `forEachParallel` function throws the error.
*
* **Example usage**
* ```typescript
* const dataset = await Dataset.open('my-results');
* await dataset.forEachParallel(async (item, index) => {
* console.log(`Item at ${index}: ${JSON.stringify(item)}`);
* });
* ```
*
* *Important note*: Unlike the `forEach` method, this method processes items in parallel and does not guarantee the order of items.
* It also doesn't wait before calling the provided function for the next item.
*
* @param func A function that is called for every item in the dataset.
* @param [options] All `forEach()` parameters.
* @param [options.parallelLoads] Maximum number of item batches to be processed in parallel.
* @param [options.batchSize] Maximum number of items to be processed in one batch.
* @param [options.persistState] If `true`, the processing state will be persisted between actor migrations and runs.
* @returns {Promise<void>}
*/
public async forEachParallel(
func: Parameters<OriginalDataset['forEach']>[0],
options: Parameters<OriginalDataset['forEach']>[1] & { parallelLoads?: number; batchSize?: number; persistState: boolean; idempotencyKey?: string },
) {
const { parallelLoads = 20, batchSize = 50000, persistState, idempotencyKey = '' } = options;
const { offset: globalOffset = 0, limit: globalLimit = Infinity } = options;

const sanitizedIdempotencyKey = idempotencyKey.replace(/[^a-zA-Z0-9]/g, '-').slice(0, 30);
const chunkTrackerName = `${APIFY_EXTRA_KV_RECORD_PREFIX}FOREACH-${this.id}-${sanitizedIdempotencyKey}`;
const chunkTracker = new ChunkTracker(persistState ? await Actor.getValue<Record<string, boolean>>(chunkTrackerName) : undefined);

let isMigrating = false;

const migrationCallback = async (migrating: boolean) => {
isMigrating = migrating ?? true;
await Actor.setValue(chunkTrackerName, persistState ? chunkTracker.get() : null);
};

Actor.on('migrating', migrationCallback);
Actor.on('aborting', migrationCallback);
Actor.on('persistState', () => migrationCallback(false));

const { itemCount } = await this.getInfo() ?? { itemCount: 0 };

return waitForCompletion(
[...new Array(Math.ceil((itemCount < globalLimit ? itemCount : globalLimit) / batchSize))] // every item represents one chunk
.filter((_, i) => !chunkTracker.has(`${globalOffset + i * batchSize}`))
.map((_, i) => async () => {
if (isMigrating) {
log.info(`${APIFY_EXTRA_LOG_PREFIX}[forEachParallel]: Stopping forEachParallel because of migration`);
// hang indefinitely until migration is done
await new Promise(() => {});
}

const { items } = await this.getData({ limit: batchSize, offset: globalOffset + batchSize * i });

chunkTracker.add(`${globalOffset + batchSize * i}`);
await Promise.all(items.map((item, b) => func(item, globalOffset + batchSize * i + b)));
}),
parallelLoads);
};
}
1 change: 1 addition & 0 deletions packages/apify-extra/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './dataset';
7 changes: 7 additions & 0 deletions packages/apify-extra/tsconfig.build.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"extends": "../../tsconfig.build.json",
"compilerOptions": {
"outDir": "./dist"
},
"include": ["src/**/*"]
}
4 changes: 4 additions & 0 deletions packages/apify-extra/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": "../../tsconfig.json",
"include": ["src/**/*"]
}
50 changes: 50 additions & 0 deletions test/apify/extra-dataset.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { ENV_VARS } from '@apify/consts';
import { Actor, Configuration, PlatformEventManager, log } from 'apify';
import { Server } from 'ws';
import { Dataset } from 'apify-extra';

describe('forEachParallel', () => {
let wss: Server = null;
const config = Configuration.getGlobalConfig();
const events = new PlatformEventManager(config);
config.useEventManager(events);

beforeEach(() => {
wss = new Server({ port: 9099 });
process.env[ENV_VARS.ACTOR_EVENTS_WS_URL] = 'ws://localhost:9099/someRunId';
process.env[ENV_VARS.TOKEN] = 'dummy';
});
afterEach((done) => {
delete process.env[ENV_VARS.ACTOR_EVENTS_WS_URL];
delete process.env[ENV_VARS.TOKEN];
wss.close(done);
});

test('slicing should work', async () => {
const getDataSpy = jest.spyOn(Dataset.prototype, 'getData').mockImplementation(async ({ limit }) => (
{ items: [...new Array(limit).keys()] } as any
));
jest.spyOn(Dataset.prototype, 'getInfo').mockImplementation(async () => ({
itemCount: 229, // a prime number
} as any));
jest.spyOn(Actor.prototype, 'getValue').mockImplementation(async () => ({}));

const dataset = new Dataset({
client: Configuration.getStorageClient(),
id: 'dataset-forEachParallel-test',
});

await dataset.forEachParallel((x) => { log.debug(`processing ${x}`); }, { persistState: true, batchSize: 7 });

expect(getDataSpy).toBeCalledTimes(Math.ceil(229 / 7));

[...Array(Math.ceil(229 / 7))].forEach((_, i) => {
expect(getDataSpy).toBeCalledWith(
expect.objectContaining({
limit: 7,
offset: i * 7,
}));
});
getDataSpy.mockRestore();
}, 60e3);
});
29 changes: 29 additions & 0 deletions test/e2e/extra-dataset/test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { getTestDir, initialize, expect } from '../tools.mjs';
import { Dataset } from 'apify-extra';
import { Actor, Configuration, log } from 'apify';

const testDir = getTestDir(import.meta.url);

const exit = process.exit;
process.exit = () => {};

await initialize(testDir);
await Actor.openDataset('dataset-forEachParallel-test');
const dataset = new Dataset({
id: 'dataset-forEachParallel-test',
client: Configuration.getStorageClient(),
});

const ITEM_COUNT = 229;
const target = [];

await dataset.pushDataParallel([...new Array(ITEM_COUNT)].map((_, i) => ({ index: i })), { batchSize: 10, parallelPushes: 4 });
await dataset.forEachParallel((x) => target.push(x), { persistState: true, batchSize: 10 });

expect((await dataset.getData()).total === ITEM_COUNT, 'all items pushed');
expect([...new Array(ITEM_COUNT)].every((_, i) => target.some(x => x.index === i)), 'all items processed');

expect(target.length === ITEM_COUNT, `forEach called ${target.length} times.`);

process.exit = exit;
process.exit(0);
1 change: 1 addition & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"baseUrl": ".",
"paths": {
"apify": ["packages/apify/src"],
"apify-extra": ["packages/apify-extra/src"],
"@apify/scraper-tools": ["packages/scraper-tools/src"]
}
}
Expand Down