Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement an async iterable request class #1644

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 266 additions & 0 deletions src/iterable-request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// This module implements an iterable `Request` class.

import Request, { type RequestOptions } from './request';
import { type ColumnMetadata } from './token/colmetadata-token-parser';

export interface ColumnValue {
metadata: ColumnMetadata;
value: any;
}

type RowData = ColumnValue[] | Record<string, ColumnValue>; // type variant depending on config.options.useColumnNames
type ColumnMetadataDef = ColumnMetadata[] | Record<string, ColumnMetadata>; // type variant depending on config.options.useColumnNames
Copy link
Collaborator

@arthurschreiber arthurschreiber Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.options.useColumnNames is super bad API design, and I'd prefer if we could not propagate this nonsense. 😬

The reason why it's bad API design is that the config value changes the shape of the data that is being returned from tedious. Flipping this config basically from true to false or the other way around basically breaks all code that was written with the other value set.

Much better would be to have an API that allows the code to explicitly either use objects or use arrays.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. But as you wrote before, this would be a profound change, beyond the scope of this PR.


export interface IterableRequestOptions extends RequestOptions {
iteratorFifoSize: number;
}

/**
* The item object of the request iterator.
*/
export interface IterableRequestItem {

/**
* Row data.
*/
row: RowData;

/**
* Result set number, 1..n.
*/
resultSetNo: number;

/**
* Metadata of all columns.
*/
columnMetadata: ColumnMetadataDef;

}

type iteratorPromiseResolveFunction = (value: IteratorResult<IterableRequestItem>) => void;
type iteratorPromiseRejectFunction = (error: Error) => void;
interface IteratorPromiseFunctions {resolve: iteratorPromiseResolveFunction, reject: iteratorPromiseRejectFunction}

// Internal class for the state controller logic of the iterator.
class IterableRequestController {

private request: Request;
private requestCompleted: boolean;
private requestPaused: boolean;
private error: Error | undefined;
private terminating: boolean;

private resultSetNo: number;
private columnMetadata: ColumnMetadataDef | undefined;
private fifo: IterableRequestItem[];
private fifoPauseLevel: number;
private fifoResumeLevel: number;

private promises: IteratorPromiseFunctions[]; // FIFO of resolve/reject function pairs of pending promises
private terminatorResolve: (() => void) | undefined;
private terminatorPromise: Promise<void> | undefined;

// --- Constructor / Terminator ----------------------------------------------

constructor(request: Request, options?: IterableRequestOptions) {
this.request = request;
this.requestCompleted = false;
this.requestPaused = false;
this.terminating = false;

this.resultSetNo = 0;
this.fifo = [];
const fifoSize = options?.iteratorFifoSize ?? 1024;
this.fifoPauseLevel = fifoSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is similar to highWaterMark on regular streams? I.e. once the buffer (here called fifo) reaches that size, no more data will be buffered and the underlying request will be paused?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Do you suggest to rename it to this.fifoHighWaterMark? And also rename the option parameter?

this.fifoResumeLevel = Math.floor(fifoSize / 2);

this.promises = [];

request.addListener('row', this.rowEventHandler);
request.addListener('columnMetadata', this.columnMetadataEventHandler);
}

public terminate(): Promise<void> {
this.terminating = true;
this.request.resume(); // (just to be sure)
if (this.requestCompleted || !this.request.connection) {
return Promise.resolve();

Check warning on line 87 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L87

Added line #L87 was not covered by tests
}
this.request.connection.cancel();
if (!this.terminatorPromise) {
this.terminatorPromise = new Promise<void>((resolve: () => void) => {
this.terminatorResolve = resolve;
});
}
return this.terminatorPromise;
}

// --- Promise logic ---------------------------------------------------------

private serveError(): boolean {
if (!this.error || !this.promises.length) {
return false;
}
const promise = this.promises.shift()!;
promise.reject(this.error);
return true;
}

private serveRowItem(): boolean {
if (!this.fifo.length || !this.promises.length) {
return false;
}
const item = this.fifo.shift()!;
const promise = this.promises.shift()!;
promise.resolve({ value: item });
if (this.fifo.length <= this.fifoResumeLevel && this.requestPaused) {
this.request.resume();
this.requestPaused = false;
}
return true;
}

private serveRequestCompletion(): boolean {
if (!this.requestCompleted || !this.promises.length) {
return false;
}
const promise = this.promises.shift()!;
promise.resolve({ done: true, value: undefined });
return true;
}

private serveNextPromise(): boolean {
if (this.serveRowItem()) {
return true;
}
if (this.serveError()) {
return true;
}
if (this.serveRequestCompletion()) {
return true;
}
return false;
}

private servePromises() {
while (true) {
if (!this.serveNextPromise()) {
break;
}
}
}

// This promise executor is called synchronously from within Iterator.next().
public promiseExecutor = (resolve: iteratorPromiseResolveFunction, reject: iteratorPromiseRejectFunction) => {
this.promises.push({ resolve, reject });
this.servePromises();
};

// --- Event handlers --------------------------------------------------------

public completionCallback(error: Error | null | undefined) {
this.requestCompleted = true;
if (this.terminating) {
if (this.terminatorResolve) {
this.terminatorResolve();
}
return;
}
if (error && !this.error) {
this.error = error;
}
this.servePromises();
}

private columnMetadataEventHandler = (columnMetadata: ColumnMetadata[] | Record<string, ColumnMetadata>) => {
this.resultSetNo++;
this.columnMetadata = columnMetadata;
};

private rowEventHandler = (row: RowData) => {
if (this.requestCompleted || this.error || this.terminating) {
return;

Check warning on line 182 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L182

Added line #L182 was not covered by tests
}
if (this.resultSetNo === 0 || !this.columnMetadata) {
this.error = new Error('No columnMetadata event received before row event.');
this.servePromises();
return;

Check warning on line 187 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L185-L187

Added lines #L185 - L187 were not covered by tests
}
const item: IterableRequestItem = { row, resultSetNo: this.resultSetNo, columnMetadata: this.columnMetadata };
this.fifo.push(item);
if (this.fifo.length >= this.fifoPauseLevel && !this.requestPaused) {
this.request.pause();
this.requestPaused = true;
}
this.servePromises();
};

}

// Internal class for the iterator object which is passed to the API client.
class IterableRequestIterator implements AsyncIterator<IterableRequestItem> {

private controller: IterableRequestController;

constructor(controller: IterableRequestController) {
this.controller = controller;
}

public next(): Promise<IteratorResult<IterableRequestItem>> {
return new Promise<IteratorResult<IterableRequestItem>>(this.controller.promiseExecutor);
}

public async return(value?: any): Promise<any> {
await this.controller.terminate();
return { value, done: true };
}

public async throw(exception?: any): Promise<any> {
await this.controller.terminate();

Check warning on line 219 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L219

Added line #L219 was not covered by tests
if (exception) {
throw exception;

Check warning on line 221 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L221

Added line #L221 was not covered by tests
} else {
return { done: true };

Check warning on line 223 in src/iterable-request.ts

View check run for this annotation

Codecov / codecov/patch

src/iterable-request.ts#L223

Added line #L223 was not covered by tests
}
}

}

/**
* An iterable `Request` class.
*
* This iterable version is a super class of the normal `Request` class.
*
* Usage:
* ```js
* const request = new IterableRequest("select 42, 'hello world'");
* connection.execSql(request);
* for await (const item of request) {
* console.log(item.row);
* }
* ```
*/
class IterableRequest extends Request implements AsyncIterable<IterableRequestItem> {

private iterator: IterableRequestIterator;

constructor(sqlTextOrProcedure: string | undefined, options?: IterableRequestOptions) {
super(sqlTextOrProcedure, completionCallback, options);
const controller = new IterableRequestController(this, options);
this.iterator = new IterableRequestIterator(controller);

function completionCallback(error: Error | null | undefined) {
if (controller) {
controller.completionCallback(error);
}
}
}

[Symbol.asyncIterator](): AsyncIterator<IterableRequestItem> {
return this.iterator;
}
Comment on lines +259 to +261
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you thought about implementing this through using events.on? I haven't tested this, but it would allow you to get rid of the custom iterator implementation.

  constructor(sqlTextOrProcedure: string | undefined, options?: IterableRequestOptions) {
    super(sqlTextOrProcedure, completionCallback, options);
    const controller = new AbortController();
    this.doneSignal = controller.signal;

    function completionCallback(error: Error | null | undefined) {
       controller.abort(error);
    }
  }

  [Symbol.asyncIterator](): AsyncIterator<IterableRequestItem> {
    return events.on(this, 'row', { signal: this.doneSignal });
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I didn't know about events.on(). It does more or less the same as my controller and iterator classes.
I will refactor my code into using events.on().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Tedious still has to support Node 18, I had a look at that old implementation of event.on().
The close event was not yet implemented in that version. I can't find a proper way to end the iteration so that the remaining items in the FIFO are still processed. abortListener() calls errorHandler() which calls toError.reject(err) which rejects the waiting Promise,
Calling iterator.return() resolves the waiting Promise with done = true.

Should I use my implementation for old Node versions and event.on() for new Node versions?


}

export default IterableRequest;
module.exports = IterableRequest;
2 changes: 1 addition & 1 deletion src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export interface ParameterOptions {
scale?: number;
}

interface RequestOptions {
export interface RequestOptions {
statementColumnEncryptionSetting?: SQLServerStatementColumnEncryptionSetting;
}

Expand Down
2 changes: 2 additions & 0 deletions src/tedious.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import BulkLoad from './bulk-load';
import Connection, { type ConnectionAuthentication, type ConnectionConfiguration, type ConnectionOptions } from './connection';
import Request from './request';
import IterableRequest from './iterable-request';
import { name } from './library';

import { ConnectionError, RequestError } from './errors';
Expand All @@ -21,6 +22,7 @@ export {
BulkLoad,
Connection,
Request,
IterableRequest,
library,
ConnectionError,
RequestError,
Expand Down
Loading
Loading