Skip to content

Commit

Permalink
chore: minor improvements of IterableRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
chdh committed Aug 5, 2024
1 parent af3dd4f commit 4af51d8
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 36 deletions.
80 changes: 46 additions & 34 deletions src/iterable-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ export interface IterableRequestItem {
* Metadata of all columns.
*/
columnMetadata: ColumnMetadataDef;

}

type iteratorPromiseResolveFunction = (value: IteratorResult<IterableRequestItem>) => void;
type iteratorPromiseRejectFunction = (error: Error) => void;
interface IteratorPromiseFunctions {resolve: iteratorPromiseResolveFunction, reject: iteratorPromiseRejectFunction}

// Internal class for the state controller logic of the iterator.
class IterableRequestController {
Expand All @@ -54,10 +56,9 @@ class IterableRequestController {
private fifoPauseLevel: number;
private fifoResumeLevel: number;

private promisePending: boolean;
private resolvePromise: iteratorPromiseResolveFunction | undefined;
private rejectPromise: iteratorPromiseRejectFunction | undefined;
private promises: IteratorPromiseFunctions[]; // FIFO of resolve/reject function pairs of pending promises
private terminatorResolve: (() => void) | undefined;
private terminatorPromise: Promise<void> | undefined;

// --- Constructor / Terminator ----------------------------------------------

Expand All @@ -73,7 +74,7 @@ class IterableRequestController {
this.fifoPauseLevel = fifoSize;
this.fifoResumeLevel = Math.floor(fifoSize / 2);

this.promisePending = false;
this.promises = [];

request.addListener('row', this.rowEventHandler);
request.addListener('columnMetadata', this.columnMetadataEventHandler);
Expand All @@ -86,29 +87,32 @@ class IterableRequestController {
return Promise.resolve();

Check warning on line 87 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L87

Added line #L87 was not covered by tests
}
this.request.connection.cancel();
return new Promise<void>((resolve: () => void) => {
this.terminatorResolve = resolve;
});
if (!this.terminatorPromise) {
this.terminatorPromise = new Promise<void>((resolve: () => void) => {
this.terminatorResolve = resolve;
});
}
return this.terminatorPromise;
}

// --- Promise logic ---------------------------------------------------------

private serveError(): boolean {
if (!this.error || !this.promisePending) {
if (!this.error || !this.promises.length) {
return false;
}
this.rejectPromise!(this.error);
this.promisePending = false;
const promise = this.promises.shift()!;
promise.reject(this.error);
return true;
}

private serveRowItem(): boolean {
if (!this.fifo.length || !this.promisePending) {
if (!this.fifo.length || !this.promises.length) {
return false;
}
const item = this.fifo.shift()!;
this.resolvePromise!({ value: item });
this.promisePending = false;
const promise = this.promises.shift()!;
promise.resolve({ value: item });
if (this.fifo.length <= this.fifoResumeLevel && this.requestPaused) {
this.request.resume();
this.requestPaused = false;
Expand All @@ -117,35 +121,39 @@ class IterableRequestController {
}

private serveRequestCompletion(): boolean {
if (!this.requestCompleted || !this.promisePending) {
if (!this.requestCompleted || !this.promises.length) {
return false;
}
this.resolvePromise!({ done: true, value: undefined });
this.promisePending = false;
const promise = this.promises.shift()!;
promise.resolve({ done: true, value: undefined });
return true;
}

private servePromise() {
if (this.serveError()) {
return;
}
private serveNextPromise(): boolean {
if (this.serveRowItem()) {
return;
return true;
}
if (this.serveError()) {
return true;
}
if (this.serveRequestCompletion()) {
return; // eslint-disable-line no-useless-return
return true;
}
return false;
}

private servePromises() {
while (true) {
if (!this.serveNextPromise()) {
break;
}
}
}

// This promise executor is called synchronously from within Iterator.next().
public promiseExecutor = (resolve: iteratorPromiseResolveFunction, reject: iteratorPromiseRejectFunction) => {
if (this.promisePending) {
throw new Error('Previous promise is still active.');
}
this.resolvePromise = resolve;
this.rejectPromise = reject;
this.promisePending = true;
this.servePromise();
this.promises.push({ resolve, reject });
this.servePromises();
};

// --- Event handlers --------------------------------------------------------
Expand All @@ -161,7 +169,7 @@ class IterableRequestController {
if (error && !this.error) {
this.error = error;
}
this.servePromise();
this.servePromises();
}

private columnMetadataEventHandler = (columnMetadata: ColumnMetadata[] | Record<string, ColumnMetadata>) => {
Expand All @@ -175,7 +183,7 @@ class IterableRequestController {
}
if (this.resultSetNo === 0 || !this.columnMetadata) {
this.error = new Error('No columnMetadata event received before row event.');
this.servePromise();
this.servePromises();
return;

Check warning on line 187 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L185-L187

Added lines #L185 - L187 were not covered by tests
}
const item: IterableRequestItem = { row, resultSetNo: this.resultSetNo, columnMetadata: this.columnMetadata };
Expand All @@ -184,7 +192,7 @@ class IterableRequestController {
this.request.pause();
this.requestPaused = true;
}
this.servePromise();
this.servePromises();
};

}
Expand All @@ -207,9 +215,13 @@ class IterableRequestIterator implements AsyncIterator<IterableRequestItem> {
return Promise.resolve({ value, done: true }); // eslint-disable-line @typescript-eslint/return-await
}

public async throw(_exception?: any): Promise<any> {
public async throw(exception?: any): Promise<any> {
await this.controller.terminate();

Check warning on line 219 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L219

Added line #L219 was not covered by tests
return Promise.resolve({ done: true }); // eslint-disable-line @typescript-eslint/return-await
if (exception) {
return Promise.reject(exception); // eslint-disable-line @typescript-eslint/return-await

Check warning on line 221 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L221

Added line #L221 was not covered by tests
} else {
return Promise.resolve({ done: true }); // eslint-disable-line @typescript-eslint/return-await

Check warning on line 223 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L223

Added line #L223 was not covered by tests
}
}

}
Expand Down
38 changes: 36 additions & 2 deletions test/integration/iterable-request-test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { assert } from 'chai';

import Connection from '../../src/connection';
import { RequestError } from '../../src/errors';
import IterableRequest, { type ColumnValue } from '../../src/iterable-request';
import { debugOptionsFromEnv } from '../helpers/debug-options-from-env';

Expand Down Expand Up @@ -96,10 +97,10 @@ describe('Iterable Request Test', function() {

await testForLoop(10000, 500);
await testForLoop(10000, 3);
await testForLoop(10000, 250);
await testForLoop(10000, 250, 100);
await testForLoop(100, 100);

async function testForLoop(n: number, abortCount: number) {
async function testForLoop(n: number, abortCount: number, sleepPos = -1) {
const sql = `
with cte1 as
(select 1 as i union all select i + 1 from cte1 where i < ${n})
Expand All @@ -114,6 +115,9 @@ describe('Iterable Request Test', function() {
const row = item.row as ColumnValue[];
const i = row[0].value;
assert(i === ctr + 1);
if (ctr === sleepPos) {
await sleep(250);
}
ctr++;
if (ctr === abortCount) {
break;
Expand All @@ -124,4 +128,34 @@ describe('Iterable Request Test', function() {

});

it('tests the error handling logic of the iterable request module', async function() {
const sql = `
select 1
select 2
select 3 / 0
`;

const request = new IterableRequest(sql);
connection.execSql(request);

let ctr = 0;
let errCtr = 0;
try {
for await (const item of request) {
assert(item.resultSetNo === ctr + 1);
const row = item.row as ColumnValue[];
const i = row[0].value;
assert(i === ctr + 1);
ctr++;
}
} catch (err) {
assert.instanceOf(err, RequestError);
assert((err as RequestError).message.toLowerCase().includes('divide by zero'));
errCtr++;
}
assert(ctr === 2);
assert(errCtr === 1);
});


});

0 comments on commit 4af51d8

Please sign in to comment.