diff --git a/client/protocol_versions.json b/client/protocol_versions.json index 68cf1c450..191dc587b 100644 --- a/client/protocol_versions.json +++ b/client/protocol_versions.json @@ -137,5 +137,8 @@ }, "1.1.4": { "min": "1.0.0" + }, + "1.2.0": { + "min": "1.0.0" } } \ No newline at end of file diff --git a/common/constants.js b/common/constants.js index 0a538512c..1445346d7 100644 --- a/common/constants.js +++ b/common/constants.js @@ -681,6 +681,9 @@ const BlockchainEventMessageTypes = { DEREGISTER_FILTER: 'DEREGISTER_FILTER', EMIT_EVENT: 'EMIT_EVENT', EMIT_ERROR: 'EMIT_ERROR', + // NOTE(platfowner): Message types for custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171). + PING: 'PING', + PONG: 'PONG', }; const ValueChangedEventSources = { diff --git a/event-handler/event-channel-manager.js b/event-handler/event-channel-manager.js index 11535fe6d..a57d1245e 100644 --- a/event-handler/event-channel-manager.js +++ b/event-handler/event-channel-manager.js @@ -82,17 +82,39 @@ class EventChannelManager { // TODO(cshcomcom): Handle MAX connections. logger.info(`[${LOG_HEADER}] New connection (${channelId})`); + webSocket.on('message', (message) => { - this.handleMessage(channel, message); + try { + const parsedMessage = JSON.parse(message); + const messageType = parsedMessage.type; + if (!messageType) { + throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_TYPE_IN_MSG, + `No message type in (${JSON.stringify(message)})`); + } + const messageData = parsedMessage.data; + if (!messageData) { + throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_DATA_IN_MSG, + `No message data in (${JSON.stringify(message)})`); + } + // NOTE(platfowner): A custom ping-pong (see https://github.com/ainblockchain/ain-js/issues/171). + if (messageType === BlockchainEventMessageTypes.PONG) { + this.handlePong(webSocket); + } else { + this.handleMessage(channel, messageType, messageData); + } + } catch (err) { + logger.error(`[${LOG_HEADER}] Error while process message ` + + `(message: ${JSON.stringify(message, null, 2)}, ` + + `error message: ${err.message})`); + this.handleEventError(channel, err); + } }); + webSocket.on('close', (_) => { this.closeChannel(channel); }); - // Heartbeat - webSocket.on('pong', (_) => { - webSocket.isAlive = true; - }) + webSocket.isAlive = true; } catch (err) { webSocket.terminate(); @@ -222,36 +244,28 @@ class EventChannelManager { } } - handleMessage(channel, message) { // TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION. - const LOG_HEADER = 'handleMessage'; - try { - const parsedMessage = JSON.parse(message); - const messageType = parsedMessage.type; - if (!messageType) { - throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_TYPE_IN_MSG, - `Can't find type from message (${JSON.stringify(message)})`); - } - const messageData = parsedMessage.data; - if (!messageData) { - throw new EventHandlerError(EventHandlerErrorCode.MISSING_MESSAGE_DATA_IN_MSG, - `Can't find data from message (${JSON.stringify(message)})`); - } - switch (messageType) { - case BlockchainEventMessageTypes.REGISTER_FILTER: - this.handleRegisterFilterMessage(channel, messageData); - break; - case BlockchainEventMessageTypes.DEREGISTER_FILTER: - this.handleDeregisterFilterMessage(channel, messageData); - break; - default: - throw new EventHandlerError(EventHandlerErrorCode.INVALID_MESSAGE_TYPE, - `Invalid message type (${messageType})`); - } - } catch (err) { - logger.error(`[${LOG_HEADER}] Error while process message ` + - `(message: ${JSON.stringify(message, null, 2)}, ` + - `error message: ${err.message})`); - this.handleEventError(channel, err); + /** + * Handles a pong message. + */ + handlePong(webSocket) { + webSocket.isAlive = true; + } + + /** + * Handles a (non-pong) message from the channel. + */ + // TODO(cshcomcom): Manage EVENT_PROTOCOL_VERSION. + handleMessage(channel, messageType, messageData) { + switch (messageType) { + case BlockchainEventMessageTypes.REGISTER_FILTER: + this.handleRegisterFilterMessage(channel, messageData); + break; + case BlockchainEventMessageTypes.DEREGISTER_FILTER: + this.handleDeregisterFilterMessage(channel, messageData); + break; + default: + throw new EventHandlerError(EventHandlerErrorCode.INVALID_MESSAGE_TYPE, + `Invalid message type (${messageType})`); } } @@ -325,11 +339,16 @@ class EventChannelManager { return ws.terminate(); } ws.isAlive = false; - ws.ping(); + this.sendPing(ws); }); }, NodeConfigs.EVENT_HANDLER_HEARTBEAT_INTERVAL_MS || 15000); } + sendPing(webSocket) { + const pingMessage = this.makeMessage(BlockchainEventMessageTypes.PING, {}); + webSocket.send(JSON.stringify(pingMessage)); + } + stopHeartbeat() { clearInterval(this.heartbeatInterval); } diff --git a/package.json b/package.json index 368d32d12..ade8a91ac 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "ain-blockchain", "description": "AI Network Blockchain", - "version": "1.1.4", + "version": "1.2.0", "private": true, "license": "MIT", "author": "dev@ainetwork.ai", @@ -99,7 +99,7 @@ "web3-eth-accounts": "^1.6.1", "winston": "^3.3.3", "winston-daily-rotate-file": "^4.4.2", - "ws": "^7.4.6" + "ws": "^8.16.0" }, "devDependencies": { "chai": "^4.2.0", diff --git a/test/integration/event_handler.test.js b/test/integration/event_handler.test.js index 8a7569ca1..7a9d8b2d6 100644 --- a/test/integration/event_handler.test.js +++ b/test/integration/event_handler.test.js @@ -217,15 +217,19 @@ describe('Event Handler Test', function() { it('Wait BLOCK_FINALIZED events', function(done) { this.timeout(3 * epochMs); - wsClient.once('message', (message) => { + function messageHandler(message) { const parsedMessage = JSON.parse(message); const messageType = parsedMessage.type; const eventType = _.get(parsedMessage, 'data.type'); if (messageType === BlockchainEventMessageTypes.EMIT_EVENT && eventType === BlockchainEventTypes.BLOCK_FINALIZED) { done(); + // NOTE(platfowner): Avoid test failure with "done() called multiple times". + wsClient.removeListener('message', messageHandler); } - }); + } + // NOTE(platfowner): Use 'on' instead of 'once' due to heartbeats with custom ping-pong. + wsClient.on('message', messageHandler); }); it('Deregister filter & check number of filters === 0', function(done) { @@ -324,7 +328,8 @@ describe('Event Handler Test', function() { block_number: null, }; registerFilter(wsClient, filterId, BlockchainEventTypes.BLOCK_FINALIZED, config); - wsClient.once('message', (message) => { + // NOTE(platfowner): Use 'on' instead of 'once' due to heartbeats with custom ping-pong. + wsClient.on('message', (message) => { const parsedMessage = JSON.parse(message); const messageType = parsedMessage.type; const eventType = _.get(parsedMessage, 'data.type'); @@ -343,7 +348,8 @@ describe('Event Handler Test', function() { path: targetPath, }; registerFilter(wsClient, filterId, BlockchainEventTypes.VALUE_CHANGED, config); - wsClient.once('message', (message) => { + // NOTE(platfowner): Use 'on' instead of 'once' due to heartbeats with custom ping-pong. + wsClient.on('message', (message) => { const parsedMessage = JSON.parse(message); const messageType = parsedMessage.type; const eventType = _.get(parsedMessage, 'data.type'); diff --git a/yarn.lock b/yarn.lock index 952e7003e..52cb47b4a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6596,11 +6596,16 @@ write@1.0.3: dependencies: mkdirp "^0.5.1" -ws@^7.4.5, ws@^7.4.6: +ws@^7.4.5: version "7.5.7" resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.7.tgz#9e0ac77ee50af70d58326ecff7e85eb3fa375e67" integrity sha512-KMvVuFzpKBuiIXW3E4u3mySRO2/mCHSyZDJQM5NQ9Q9KHWHWh0NHgfbRMLLrceUK5qAL4ytALJbpRMjixFZh8A== +ws@^8.16.0: + version "8.16.0" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.16.0.tgz#d1cd774f36fbc07165066a60e40323eab6446fd4" + integrity sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ== + xdg-basedir@^4.0.0: version "4.0.0" resolved "https://registry.yarnpkg.com/xdg-basedir/-/xdg-basedir-4.0.0.tgz#4bc8d9984403696225ef83a1573cbbcb4e79db13"