Skip to content

Commit

Permalink
Merge pull request #173 from ainblockchain/bugfix/platfowner/bugfix
Browse files Browse the repository at this point in the history
Add client-side impls of custom handshake timeout and custom ping-pong for event handler
  • Loading branch information
platfowner authored Apr 16, 2024
2 parents eb32d0a + 59f8ab0 commit 95b4533
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 58 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
"devDependencies": {
"@mxssfd/typedoc-theme": "^1.1.3",
"@types/jest": "^29.5.12",
"@types/ws": "^8.5.10",
"jest": "^29.7.0",
"ts-jest": "^29.1.2",
"typedoc": "^0.25.13",
Expand All @@ -54,6 +53,7 @@
"@types/node": "^12.7.3",
"@types/randombytes": "^2.0.0",
"@types/semver": "^7.3.4",
"@types/ws": "8.5.3",
"axios": "^0.21.4",
"bip39": "^3.0.2",
"browserify-cipher": "^1.0.1",
Expand Down
126 changes: 85 additions & 41 deletions src/event-manager/event-channel-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
EventChannelMessageTypes,
EventChannelMessage,
BlockchainEventTypes,
EventChannelConnectionOptions,
DisconnectionCallback,
} from '../types';
import EventFilter from './event-filter';
Expand All @@ -27,6 +26,8 @@ export default class EventChannelClient {
private _endpointUrl?: string;
/** Whether it's connected or not. */
private _isConnected: boolean;
/** The handshake timeout object. */
private _handshakeTimeout?: ReturnType<typeof setTimeout> | null;
/** The heartbeat timeout object. */
private _heartbeatTimeout?: ReturnType<typeof setTimeout> | null;

Expand All @@ -41,6 +42,7 @@ export default class EventChannelClient {
this._ws = undefined;
this._endpointUrl = undefined;
this._isConnected = false;
this._handshakeTimeout = undefined;
this._heartbeatTimeout = undefined;
}

Expand All @@ -50,11 +52,10 @@ export default class EventChannelClient {

/**
* Opens a new event channel.
* @param {EventChannelConnectionOptions} connectionOption The event channel connection options.
* @param {DisconnectionCallback} disconnectionCallback The disconnection callback function.
* @returns {Promise<void>} A promise for the connection success.
*/
connect(connectionOption: EventChannelConnectionOptions, disconnectionCallback?: DisconnectionCallback): Promise<any> {
connect(disconnectionCallback?: DisconnectionCallback): Promise<any> {
return new Promise(async (resolve, reject) => {
if (this.isConnected) {
reject(new Error(`Can't connect multiple channels`));
Expand Down Expand Up @@ -85,31 +86,53 @@ export default class EventChannelClient {
}

this._endpointUrl = url;
// TODO(platfowner): Add a custom handshake timeout.
this._ws = new WebSocket(url, [], { handshakeTimeout: connectionOption.handshakeTimeout || DEFAULT_HANDSHAKE_TIMEOUT_MS });
this._ws = new WebSocket(url);
// NOTE(platfowner): A custom handshake timeout (see https://github.com/ainblockchain/ain-js/issues/171).
this.startHandshakeTimer(DEFAULT_HANDSHAKE_TIMEOUT_MS);

this._ws.onmessage = (event: { data: unknown }) => {
if (typeof event.data !== 'string') {
console.error(`Non-string event data: ${event.data}`);
return;
}
this.handleMessage(event.data);
try {
const parsedMessage = JSON.parse(event.data);
const messageType = parsedMessage.type;
if (!messageType) {
throw Error(`No message type in (${event.data})`);
}
const messageData = parsedMessage.data;
if (!messageData) {
throw Error(`No message data in (${event.data})`);
}
// NOTE(platfowner): A custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171).
if (messageType === EventChannelMessageTypes.PING) {
this.handlePing();
} else {
this.handleMessage(messageType, messageData);
}
} catch (err) {
console.error(err);
}
};

this._ws.onerror = async (event: unknown) => {
console.error(event);
this.disconnect();
};

this._ws.onopen = () => {
this._isConnected = true;
this.startHeartbeatTimer(connectionOption.heartbeatIntervalMs || DEFAULT_HEARTBEAT_INTERVAL_MS);
// Handshake timeout
if (this._handshakeTimeout) {
clearTimeout(this._handshakeTimeout);
this._handshakeTimeout = null;
}
// Heartbeat timeout
this.startHeartbeatTimer(DEFAULT_HEARTBEAT_INTERVAL_MS);
resolve(this);
};
// TODO(platfowner): Add a custom ping-poing for heartbeat.
// NOTE(jiyoung): implement onping method here.
// this._wsClient.on('ping', () => {
// if (this._heartbeatTimeout) {
// clearTimeout(this._heartbeatTimeout);
// }
// this.startHeartbeatTimer(connectionOption.heartbeatIntervalMs || DEFAULT_HEARTBEAT_INTERVAL_MS);
// });

this._ws.onclose = () => {
this.disconnect();
if (disconnectionCallback) {
Expand All @@ -131,13 +154,24 @@ export default class EventChannelClient {
}
}

/**
* Starts the handshake timer for the event channel.
* @param {number} timeoutMs The timeout value in miliseconds.
*/
startHandshakeTimer(timeoutMs: number) {
this._handshakeTimeout = setTimeout(() => {
console.error(`Handshake timeouted! Closing the websocket.`);
this._ws!.close();
}, timeoutMs);
}

/**
* Starts the heartbeat timer for the event channel.
* @param {number} timeoutMs The timeout value in miliseconds.
*/
startHeartbeatTimer(timeoutMs: number) {
this._heartbeatTimeout = setTimeout(() => {
console.log(`Connection timeout! Terminate the connection. All event subscriptions are stopped.`);
console.error(`Heartbeat timeouted! Closing the websocket.`);
this._ws!.close();
}, timeoutMs);
}
Expand Down Expand Up @@ -187,32 +221,34 @@ export default class EventChannelClient {
}

/**
* Handles a message from the event channel.
* @param {string} message The message.
* Handles a ping message from the event channel.
*/
handleMessage(message: string) {
try {
const parsedMessage = JSON.parse(message);
const messageType = parsedMessage.type;
if (!messageType) {
throw Error(`Can't find type from message (${message})`);
}
const messageData = parsedMessage.data;
if (!messageData) {
throw Error(`Can't find data from message (${message})`);
}
switch (messageType) {
case EventChannelMessageTypes.EMIT_EVENT:
this.handleEmitEventMessage(messageData);
break;
case EventChannelMessageTypes.EMIT_ERROR:
this.handleEmitErrorMessage(messageData);
break;
default:
break;
}
} catch (err) {
console.error(err);
handlePing() {
this.sendPong();
// Heartbeat timeout
if (this._heartbeatTimeout) {
clearTimeout(this._heartbeatTimeout);
this._heartbeatTimeout = null;
}
this.startHeartbeatTimer(DEFAULT_HEARTBEAT_INTERVAL_MS);
}

/**
* Handles a (non-ping) message from the event channel.
*
* @param {EventChannelMessageTypes} messageType The message type.
* @param {any} messageData The message data.
*/
handleMessage(messageType: EventChannelMessageTypes, messageData: any) {
switch (messageType) {
case EventChannelMessageTypes.EMIT_EVENT:
this.handleEmitEventMessage(messageData);
break;
case EventChannelMessageTypes.EMIT_ERROR:
this.handleEmitErrorMessage(messageData);
break;
default:
break;
}
}

Expand Down Expand Up @@ -259,4 +295,12 @@ export default class EventChannelClient {
const deregisterMessage = this.buildMessage(EventChannelMessageTypes.DEREGISTER_FILTER, filterObj);
this.sendMessage(deregisterMessage);
}

/**
* Sends a pong message.
*/
sendPong() {
const pongMessage = this.buildMessage(EventChannelMessageTypes.PONG, {});
this.sendMessage(pongMessage);
}
}
6 changes: 2 additions & 4 deletions src/event-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import Ain from '../ain';
import {
BlockFinalizedEventConfig, BlockFinalizedEvent,
ErrorFirstCallback,
EventChannelConnectionOptions,
BlockchainEventConfig, BlockchainEventCallback,
TxStateChangedEventConfig, TxStateChangedEvent,
ValueChangedEventConfig, ValueChangedEvent, DisconnectionCallback, FilterDeletedEventCallback, BlockchainErrorCallback,
Expand Down Expand Up @@ -31,11 +30,10 @@ export default class EventManager {

/**
* Opens a new event channel.
* @param {EventChannelConnectionOptions} connectionOption The event channel connection options.
* @param {DisconnectionCallback} disconnectionCallback The disconnection callback function.
*/
async connect(connectionOption?: EventChannelConnectionOptions, disconnectionCallback?: DisconnectionCallback) {
await this._eventChannelClient.connect(connectionOption || {}, disconnectionCallback);
async connect(disconnectionCallback?: DisconnectionCallback) {
await this._eventChannelClient.connect(disconnectionCallback);
}

/**
Expand Down
11 changes: 3 additions & 8 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ export enum EventChannelMessageTypes {
DEREGISTER_FILTER = 'DEREGISTER_FILTER',
EMIT_EVENT = 'EMIT_EVENT',
EMIT_ERROR = 'EMIT_ERROR',
// NOTE(platfowner): Message types for custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171).
PING = 'PING',
PONG = 'PONG',
}

/**
Expand Down Expand Up @@ -392,14 +395,6 @@ export interface TxStateChangedEventConfig {
*/
export type BlockchainEventConfig = BlockFinalizedEventConfig | ValueChangedEventConfig | TxStateChangedEventConfig;

/**
* An interface for event-channel-connection options (blockchain event handler).
*/
export interface EventChannelConnectionOptions {
handshakeTimeout?: number;
heartbeatIntervalMs?: number;
}

/**
* An interface for error handling callbacks (blockchain event handler).
*/
Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -708,10 +708,10 @@
resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-2.0.1.tgz#20f18294f797f2209b5f65c8e3b5c8e8261d127c"
integrity sha512-Hl219/BT5fLAaz6NDkSuhzasy49dwQS/DSdu4MdggFB8zcXv7vflBI3xp7FEmkmdDkBUI2bPUNeMttp2knYdxw==

"@types/ws@^8.5.10":
version "8.5.10"
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-8.5.10.tgz#4acfb517970853fa6574a3a6886791d04a396787"
integrity sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==
"@types/[email protected].3":
version "8.5.3"
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-8.5.3.tgz#7d25a1ffbecd3c4f2d35068d0b283c037003274d"
integrity sha512-6YOoWjruKj1uLf3INHH7D3qTXwFfEsg1kf3c0uDdSBJwfa/llkwIjrAGV7j7mVgGNbzTQ3HiHKKDXl6bJPD97w==
dependencies:
"@types/node" "*"

Expand Down

0 comments on commit 95b4533

Please sign in to comment.