Skip to content

Commit

Permalink
Merge pull request #1262 from ainblockchain/release/v1.2.0
Browse files Browse the repository at this point in the history
Release/v1.2.0
  • Loading branch information
platfowner committed Apr 17, 2024
2 parents 3b0ba96 + 34ce7a5 commit 26bd79a
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 43 deletions.
3 changes: 3 additions & 0 deletions client/protocol_versions.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,8 @@
},
"1.1.4": {
"min": "1.0.0"
},
"1.2.0": {
"min": "1.0.0"
}
}
3 changes: 3 additions & 0 deletions common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,9 @@ const BlockchainEventMessageTypes = {
DEREGISTER_FILTER: 'DEREGISTER_FILTER',
EMIT_EVENT: 'EMIT_EVENT',
EMIT_ERROR: 'EMIT_ERROR',
// NOTE(platfowner): Message types for custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171).
PING: 'PING',
PONG: 'PONG',
};

const ValueChangedEventSources = {
Expand Down
91 changes: 55 additions & 36 deletions event-handler/event-channel-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,39 @@ class EventChannelManager {
// TODO(cshcomcom): Handle MAX connections.

logger.info(`[${LOG_HEADER}] New connection (${channelId})`);

webSocket.on('message', (message) => {
this.handleMessage(channel, message);
try {
const parsedMessage = JSON.parse(message);
const messageType = parsedMessage.type;
if (!messageType) {
throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_TYPE_IN_MSG,
`No message type in (${JSON.stringify(message)})`);
}
const messageData = parsedMessage.data;
if (!messageData) {
throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_DATA_IN_MSG,
`No message data in (${JSON.stringify(message)})`);
}
// NOTE(platfowner): A custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171).
if (messageType === BlockchainEventMessageTypes.PONG) {
this.handlePong(webSocket);
} else {
this.handleMessage(channel, messageType, messageData);
}
} catch (err) {
logger.error(`[${LOG_HEADER}] Error while process message ` +
`(message: ${JSON.stringify(message, null, 2)}, ` +
`error message: ${err.message})`);
this.handleEventError(channel, err);
}
});

webSocket.on('close', (_) => {
this.closeChannel(channel);
});

// Heartbeat
webSocket.on('pong', (_) => {
webSocket.isAlive = true;
})

webSocket.isAlive = true;
} catch (err) {
webSocket.terminate();
Expand Down Expand Up @@ -222,36 +244,28 @@ class EventChannelManager {
}
}

handleMessage(channel, message) { // TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION.
const LOG_HEADER = 'handleMessage';
try {
const parsedMessage = JSON.parse(message);
const messageType = parsedMessage.type;
if (!messageType) {
throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_TYPE_IN_MSG,
`Can't find type from message (${JSON.stringify(message)})`);
}
const messageData = parsedMessage.data;
if (!messageData) {
throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_DATA_IN_MSG,
`Can't find data from message (${JSON.stringify(message)})`);
}
switch (messageType) {
case BlockchainEventMessageTypes.REGISTER_FILTER:
this.handleRegisterFilterMessage(channel, messageData);
break;
case BlockchainEventMessageTypes.DEREGISTER_FILTER:
this.handleDeregisterFilterMessage(channel, messageData);
break;
default:
throw new EventHandlerError(EventHandlerErrorCode.INVALID_MESSAGE_TYPE,
`Invalid message type (${messageType})`);
}
} catch (err) {
logger.error(`[${LOG_HEADER}] Error while process message ` +
`(message: ${JSON.stringify(message, null, 2)}, ` +
`error message: ${err.message})`);
this.handleEventError(channel, err);
/**
* Handles a pong message.
*/
handlePong(webSocket) {
webSocket.isAlive = true;
}

/**
* Handles a (non-pong) message from the channel.
*/
// TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION.
handleMessage(channel, messageType, messageData) {
switch (messageType) {
case BlockchainEventMessageTypes.REGISTER_FILTER:
this.handleRegisterFilterMessage(channel, messageData);
break;
case BlockchainEventMessageTypes.DEREGISTER_FILTER:
this.handleDeregisterFilterMessage(channel, messageData);
break;
default:
throw new EventHandlerError(EventHandlerErrorCode.INVALID_MESSAGE_TYPE,
`Invalid message type (${messageType})`);
}
}

Expand Down Expand Up @@ -325,11 +339,16 @@ class EventChannelManager {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
this.sendPing(ws);
});
}, NodeConfigs.EVENT_HANDLER_HEARTBEAT_INTERVAL_MS || 15000);
}

sendPing(webSocket) {
const pingMessage = this.makeMessage(BlockchainEventMessageTypes.PING, {});
webSocket.send(JSON.stringify(pingMessage));
}

stopHeartbeat() {
clearInterval(this.heartbeatInterval);
}
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "ain-blockchain",
"description": "AI Network Blockchain",
"version": "1.1.4",
"version": "1.2.0",
"private": true,
"license": "MIT",
"author": "[email protected]",
Expand Down Expand Up @@ -99,7 +99,7 @@
"web3-eth-accounts": "^1.6.1",
"winston": "^3.3.3",
"winston-daily-rotate-file": "^4.4.2",
"ws": "^7.4.6"
"ws": "^8.16.0"
},
"devDependencies": {
"chai": "^4.2.0",
Expand Down
14 changes: 10 additions & 4 deletions test/integration/event_handler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,19 @@ describe('Event Handler Test', function() {

it('Wait BLOCK_FINALIZED events', function(done) {
this.timeout(3 * epochMs);
wsClient.once('message', (message) => {
function messageHandler(message) {
const parsedMessage = JSON.parse(message);
const messageType = parsedMessage.type;
const eventType = _.get(parsedMessage, 'data.type');
if (messageType === BlockchainEventMessageTypes.EMIT_EVENT &&
eventType === BlockchainEventTypes.BLOCK_FINALIZED) {
done();
// NOTE(platfowner): Avoid test failure with "done() called multiple times".
wsClient.removeListener('message', messageHandler);
}
});
}
// NOTE(platfowner): Use 'on' instead of 'once' due to heartbeats with custom ping-pong.
wsClient.on('message', messageHandler);
});

it('Deregister filter & check number of filters === 0', function(done) {
Expand Down Expand Up @@ -324,7 +328,8 @@ describe('Event Handler Test', function() {
block_number: null,
};
registerFilter(wsClient, filterId, BlockchainEventTypes.BLOCK_FINALIZED, config);
wsClient.once('message', (message) => {
// NOTE(platfowner): Use 'on' instead of 'once' due to heartbeats with custom ping-pong.
wsClient.on('message', (message) => {
const parsedMessage = JSON.parse(message);
const messageType = parsedMessage.type;
const eventType = _.get(parsedMessage, 'data.type');
Expand All @@ -343,7 +348,8 @@ describe('Event Handler Test', function() {
path: targetPath,
};
registerFilter(wsClient, filterId, BlockchainEventTypes.VALUE_CHANGED, config);
wsClient.once('message', (message) => {
// NOTE(platfowner): Use 'on' instead of 'once' due to heartbeats with custom ping-pong.
wsClient.on('message', (message) => {
const parsedMessage = JSON.parse(message);
const messageType = parsedMessage.type;
const eventType = _.get(parsedMessage, 'data.type');
Expand Down
7 changes: 6 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6596,11 +6596,16 @@ [email protected]:
dependencies:
mkdirp "^0.5.1"

ws@^7.4.5, ws@^7.4.6:
ws@^7.4.5:
version "7.5.7"
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.7.tgz#9e0ac77ee50af70d58326ecff7e85eb3fa375e67"
integrity sha512-KMvVuFzpKBuiIXW3E4u3mySRO2/mCHSyZDJQM5NQ9Q9KHWHWh0NHgfbRMLLrceUK5qAL4ytALJbpRMjixFZh8A==

ws@^8.16.0:
version "8.16.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.16.0.tgz#d1cd774f36fbc07165066a60e40323eab6446fd4"
integrity sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==

xdg-basedir@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/xdg-basedir/-/xdg-basedir-4.0.0.tgz#4bc8d9984403696225ef83a1573cbbcb4e79db13"
Expand Down

0 comments on commit 26bd79a

Please sign in to comment.