"use strict";
/**
* @module opcua.transport
*/
// system requires
const assert = require("node-opcua-assert").assert;
const _ = require("underscore");
const util = require("util");
// opcua requires
const StatusCode = require("node-opcua-status-code").StatusCode;
const StatusCodes = require("node-opcua-status-code").StatusCodes;
const BinaryStream = require("node-opcua-binary-stream").BinaryStream;
const verify_message_chunk = require("node-opcua-chunkmanager").verify_message_chunk;
const TCPErrorMessage = require("../_generated_/_auto_generated_TCPErrorMessage").TCPErrorMessage;
const HelloMessage = require("../_generated_/_auto_generated_HelloMessage").HelloMessage;
const AcknowledgeMessage = require("../_generated_/_auto_generated_AcknowledgeMessage").AcknowledgeMessage;
const packTcpMessage = require("./tools").packTcpMessage;
const decodeMessage = require("./tools").decodeMessage;
const debug = require("node-opcua-debug");
const hexDump = debug.hexDump;
const debugLog = debug.make_debugLog(__filename);
const doDebug = debug.checkDebugFlag(__filename);
const TCP_transport = require("./tcp_transport").TCP_transport;
/**
* @class ServerTCP_transport
* @extends TCP_transport
* @constructor
*
*/
const ServerTCP_transport = function () {
TCP_transport.call(this);
};
util.inherits(ServerTCP_transport, TCP_transport);
ServerTCP_transport.prototype.end = "DEPRECATED";
ServerTCP_transport.prototype._abortWithError = function (statusCode, extraErrorDescription, callback) {
assert(statusCode instanceof StatusCode);
assert(_.isFunction(callback), "expecting a callback");
const self = this;
/* istanbul ignore else */
if (!self.__aborted) {
self.__aborted = 1;
// send the error message and close the connection
assert(StatusCodes.hasOwnProperty(statusCode.name));
debugLog(" Server aborting because ".red + statusCode.name.cyan);
debugLog(" extraErrorDescription ".red + extraErrorDescription.cyan);
const errorResponse = new TCPErrorMessage({statusCode: statusCode, reason: statusCode.description});
const messageChunk = packTcpMessage("ERR", errorResponse);
self.write(messageChunk);
self.disconnect(function () {
self.__aborted = 2;
callback(new Error(extraErrorDescription + " StatusCode = " + statusCode.name));
});
} else {
callback(new Error(statusCode.name));
}
};
function clamp_value(value, min_val, max_val) {
assert(min_val < max_val);
if (value === 0) {
return max_val;
}
if (value < min_val) {
return min_val;
}
/* istanbul ignore next*/
if (value >= max_val) {
return max_val;
}
return value;
}
const minimumBufferSize = 8192;
ServerTCP_transport.prototype._send_ACK_response = function (helloMessage) {
const self = this;
assert(helloMessage.receiveBufferSize >= minimumBufferSize);
assert(helloMessage.sendBufferSize >= minimumBufferSize);
self.receiveBufferSize = clamp_value(helloMessage.receiveBufferSize, 8192, 512 * 1024);
self.sendBufferSize = clamp_value(helloMessage.sendBufferSize, 8192, 512 * 1024);
self.maxMessageSize = clamp_value(helloMessage.maxMessageSize, 100000, 16 * 1024 * 1024);
self.maxChunkCount = clamp_value(helloMessage.maxChunkCount, 0, 65535);
const acknowledgeMessage = new AcknowledgeMessage({
protocolVersion: self.protocolVersion,
receiveBufferSize: self.receiveBufferSize,
sendBufferSize: self.sendBufferSize,
maxMessageSize: self.maxMessageSize,
maxChunkCount: self.maxChunkCount
});
//xx acknowledgeMessage.receiveBufferSize = 8192;
//xx acknowledgeMessage.sendBufferSize = 8192;
//xx console.log("xxx receiveBufferSize = ",acknowledgeMessage.receiveBufferSize , helloMessage.receiveBufferSize) ;
//xx console.log("xxx sendBufferSize = ",acknowledgeMessage.sendBufferSize , helloMessage.sendBufferSize);
//xx console.log("xxx maxMessageSize = ",acknowledgeMessage.maxMessageSize , helloMessage.maxMessageSize);
//xx console.log("xxx maxChunkCount = ",acknowledgeMessage.maxChunkCount , helloMessage.maxChunkCount);
const messageChunk = packTcpMessage("ACK", acknowledgeMessage);
/* istanbul ignore next*/
if (doDebug) {
verify_message_chunk(messageChunk);
debugLog("server send: " + "ACK".yellow);
debugLog("server send: " + hexDump(messageChunk));
debugLog("acknowledgeMessage=", acknowledgeMessage);
}
// send the ACK reply
self.write(messageChunk);
};
ServerTCP_transport.prototype._install_HEL_message_receiver = function (callback) {
const self = this;
self._install_one_time_message_receiver(function (err, data) {
if (err) {
//err is either a timeout or connection aborted ...
self._abortWithError(StatusCodes.BadConnectionRejected, err.message, callback);
} else {
// handle the HEL message
self._on_HEL_message(data, callback);
}
});
};
ServerTCP_transport.prototype._on_HEL_message = function (data, callback) {
const self = this;
assert(data instanceof Buffer);
assert(!self._helloreceived);
const stream = new BinaryStream(data);
const msgType = data.slice(0, 3).toString("ascii");
/* istanbul ignore next*/
if (doDebug) {
debugLog("SERVER received " + msgType.yellow);
debugLog("SERVER received " + hexDump(data));
}
if (msgType === "HEL") {
assert(data.length >= 24);
const helloMessage = decodeMessage(stream, HelloMessage);
assert(_.isFinite(self.protocolVersion));
// OPCUA Spec 1.03 part 6 - page 41
// The Server shall always accept versions greater than what it supports.
if (helloMessage.protocolVersion !== self.protocolVersion) {
debugLog("warning ! client sent helloMessage.protocolVersion = 0x"+helloMessage.protocolVersion.toString(16)," whereas server protocolVersion is 0x"+self.protocolVersion.toString(16));
}
if (helloMessage.protocolVersion === 0xDEADBEEF || helloMessage.protocolVersion < self.protocolVersion) {
// Note: 0xDEADBEEF is our special version number to simulate BadProtocolVersionUnsupported in tests
// invalid protocol version requested by client
return self._abortWithError(StatusCodes.BadProtocolVersionUnsupported, "Protocol Version Error" + self.protocolVersion, callback);
}
// OPCUA Spec 1.04 part 6 - page 45
// UASC is designed to operate with different TransportProtocols that may have limited buffer
// sizes. For this reason, OPC UA Secure Conversation will break OPC UA Messages into several
// pieces (called ‘MessageChunks’) that are smaller than the buffer size allowed by the
// TransportProtocol. UASC requires a TransportProtocol buffer size that is at least 8 192 bytes
if (helloMessage.receiveBufferSize < minimumBufferSize || helloMessage.sendBufferSize < minimumBufferSize) {
return self._abortWithError(StatusCodes.BadConnectionRejected,
"Buffer size too small (should be at least " + minimumBufferSize , callback);
}
// the helloMessage shall only be received once.
self._helloreceived = true;
self._send_ACK_response(helloMessage);
callback(null); // no Error
} else {
// invalid packet , expecting HEL
debugLog("BadCommunicationError ".red, "Expecting 'HEL' message to initiate communication");
self._abortWithError(StatusCodes.BadCommunicationError, "Expecting 'HEL' message to initiate communication", callback);
}
};
/**
* Initialize the server transport.
*
*
* The ServerTCP_transport initialisation process starts by waiting for the client to send a "HEL" message.
*
* The ServerTCP_transport replies with a "ACK" message and then start waiting for further messages of any size.
*
* The callback function received an error:
* - if no message from the client is received within the ```self.timeout``` period,
* - or, if the connection has dropped within the same interval.
* - if the protocol version specified within the HEL message is invalid or is greater than ```self.protocolVersion```
*
* @method init
* @param socket {Socket}
* @param callback {Function}
* @param callback.err {Error|null} err = null if init succeeded
*
*/
ServerTCP_transport.prototype.init = function (socket, callback) {
assert(!this.socket, "init already called!");
assert(_.isFunction(callback), "expecting a valid callback ");
const self = this;
self._install_socket(socket);
self._install_HEL_message_receiver(callback);
};
exports.ServerTCP_transport = ServerTCP_transport;