"use strict";
/**
* @module opcua.client
*/
var util = require("util");
var _ = require("underscore");
var assert = require("node-opcua-assert");
var crypto = require("crypto");
var async = require("async");
var StatusCodes = require("node-opcua-status-code").StatusCodes;
var session_service = require("node-opcua-service-session");
var AnonymousIdentityToken = session_service.AnonymousIdentityToken;
var CreateSessionRequest = session_service.CreateSessionRequest;
var CreateSessionResponse = session_service.CreateSessionResponse;
var ActivateSessionRequest = session_service.ActivateSessionRequest;
var ActivateSessionResponse = session_service.ActivateSessionResponse;
var CloseSessionRequest = session_service.CloseSessionRequest;
var endpoints_service = require("node-opcua-service-endpoints");
var ApplicationDescription = endpoints_service.ApplicationDescription;
var ApplicationType = endpoints_service.ApplicationType;
var EndpointDescription = endpoints_service.EndpointDescription;
var MessageSecurityMode = require("node-opcua-service-secure-channel").MessageSecurityMode;
var SecurityPolicy = require("node-opcua-secure-channel").SecurityPolicy;
var getCryptoFactory = require("node-opcua-secure-channel").getCryptoFactory;
var fromURI = require("node-opcua-secure-channel").fromURI;
var crypto_utils = require("node-opcua-crypto").crypto_utils;
var UserNameIdentityToken = session_service.UserNameIdentityToken;
var buffer_utils = require("node-opcua-buffer-utils");
var createFastUninitializedBuffer = buffer_utils.createFastUninitializedBuffer;
var UserIdentityTokenType = require("node-opcua-service-endpoints").UserIdentityTokenType;
var ClientSession = require("./client_session").ClientSession;
var utils = require("node-opcua-utils");
var debugLog = require("node-opcua-debug").make_debugLog(__filename);
var doDebug = require("node-opcua-debug").checkDebugFlag(__filename);
var OPCUAClientBase = require("./client_base").OPCUAClientBase;
function validateServerNonce(serverNonce) {
return (serverNonce && serverNonce.length < 32) ? false : true;
}
/**
* @class OPCUAClient
* @extends OPCUAClientBase
* @param options
* @param [options.securityMode=MessageSecurityMode.None] {MessageSecurityMode} the default security mode.
* @param [options.securityPolicy =SecurityPolicy.NONE] {SecurityPolicy} the security mode.
* @param [options.requestedSessionTimeout= 60000] {Number} the requested session time out in CreateSession
* @param [options.applicationName="NodeOPCUA-Client"] {string} the client application name
* @param [options.endpoint_must_exist=true] {Boolean} set to false if the client should accept server endpoint mismatch
* @param [options.keepSessionAlive=false]{Boolean}
* @param [options.certificateFile="certificates/client_selfsigned_cert_1024.pem"] {String} client certificate pem file.
* @param [options.privateKeyFile="certificates/client_key_1024.pem"] {String} client private key pem file.
* @param [options.clientName=""] {String} a client name string that will be used to generate session names.
* @constructor
*/
function OPCUAClient(options) {
options = options || {};
OPCUAClientBase.apply(this, arguments);
// @property endpoint_must_exist {Boolean}
// if set to true , create Session will only accept connection from server which endpoint_url has been reported
// by GetEndpointsRequest.
// By default, the client is strict.
this.endpoint_must_exist = (options.endpoint_must_exist === null) ? true : options.endpoint_must_exist;
this.requestedSessionTimeout = options.requestedSessionTimeout || 60000; // 1 minute
this.applicationName = options.applicationName || "NodeOPCUA-Client";
this.clientName = options.clientName || "Session";
}
util.inherits(OPCUAClient, OPCUAClientBase);
OPCUAClient.prototype._nextSessionName = function () {
if (!this.___sessionName_counter) {
this.___sessionName_counter = 0;
}
this.___sessionName_counter += 1;
return this.clientName + this.___sessionName_counter;
};
var makeApplicationUrn = require("node-opcua-common").makeApplicationUrn;
OPCUAClient.prototype._getApplicationUri = function () {
// get applicationURI from certificate
var exploreCertificate = require("node-opcua-crypto").crypto_explore_certificate.exploreCertificate;
var certificate = this.getCertificate();
var applicationUri;
if (certificate) {
var e = exploreCertificate(certificate);
applicationUri = e.tbsCertificate.extensions.subjectAltName.uniformResourceIdentifier[0];
} else {
var hostname = require("node-opcua-hostname").get_fully_qualified_domain_name();
applicationUri = makeApplicationUrn(hostname, this.applicationName);
}
return applicationUri;
};
OPCUAClient.prototype.__resolveEndPoint = function () {
this.securityPolicy = this.securityPolicy || SecurityPolicy.None;
var endpoint = this.findEndpoint(this._secureChannel.endpoint_url, this.securityMode, this.securityPolicy);
this.endpoint = endpoint;
// this is explained here : see OPCUA Part 4 Version 1.02 $5.4.1 page 12:
// A Client shall verify the HostName specified in the Server Certificate is the same as the HostName
// contained in the endpointUrl provided in the EndpointDescription. If there is a difference then the
// Client shall report the difference and may close the SecureChannel.
if (!this.endpoint) {
if (this.endpoint_must_exist) {
debugLog("OPCUAClient#endpoint_must_exist = true and endpoint with url ", this._secureChannel.endpoint_url, " cannot be found");
return false;
} else {
// fallback :
// our strategy is to take the first server_end_point that match the security settings
// ( is this really OK ?)
// this will permit us to access a OPCUA Server using it's IP address instead of its hostname
endpoint = this.findEndpointForSecurity(this.securityMode, this.securityPolicy);
if (!endpoint) {
return false;
}
this.endpoint = endpoint;
}
}
return true;
};
OPCUAClient.prototype._createSession = function (callback) {
var self = this;
assert(typeof callback === "function");
assert(self._secureChannel);
if (!self.__resolveEndPoint() || !self.endpoint) {
return callback(new Error(" End point must exist " + self._secureChannel.endpoint_url));
}
self.serverUri = self.endpoint.server.applicationUri;
self.endpoint_url = self._secureChannel.endpoint_url;
var session = new ClientSession(self);
this.__createSession_step2(session, callback);
};
OPCUAClient.prototype.__createSession_step2 = function (session, callback) {
var self = this;
assert(typeof callback === "function");
assert(self._secureChannel);
assert(self.serverUri, " must have a valid server URI");
assert(self.endpoint_url, " must have a valid server endpoint_url");
assert(self.endpoint);
var applicationUri = self._getApplicationUri();
var applicationDescription = new ApplicationDescription({
applicationUri: applicationUri,
productUri: "NodeOPCUA-Client",
applicationName: {text: self.applicationName},
applicationType: ApplicationType.CLIENT,
gatewayServerUri: undefined,
discoveryProfileUri: undefined,
discoveryUrls: []
});
// note : do not confuse CreateSessionRequest.clientNonce with OpenSecureChannelRequest.clientNonce
// which are two different nonce, with different size (although they share the same name )
self.clientNonce = crypto.randomBytes(32);
var request = new CreateSessionRequest({
clientDescription: applicationDescription,
serverUri: self.serverUri,
endpointUrl: self.endpoint_url,
sessionName: self._nextSessionName(),
clientNonce: self.clientNonce,
clientCertificate: self.getCertificate(),
requestedSessionTimeout: self.requestedSessionTimeout,
maxResponseMessageSize: 800000
});
// a client Nonce must be provided if security mode is set
assert(self._secureChannel.securityMode === MessageSecurityMode.NONE || request.clientNonce !== null);
self.performMessageTransaction(request, function (err, response) {
if (!err) {
//xx console.log("xxxxx response",response.toString());
//xx console.log("xxxxx response",response.responseHeader.serviceResult);
if (response.responseHeader.serviceResult === StatusCodes.BadTooManySessions) {
err = new Error("Too Many Sessions : " + response.responseHeader.serviceResult.toString());
} else if (response.responseHeader.serviceResult === StatusCodes.Good) {
assert(response instanceof CreateSessionResponse);
// istanbul ignore next
if (!validateServerNonce(request.serverNonce)) {
return callback(new Error("invalid server Nonce"));
}
// todo: verify SignedSoftwareCertificates and response.serverSignature
session = session || new ClientSession(self);
session.name = request.sessionName;
session.sessionId = response.sessionId;
session.authenticationToken = response.authenticationToken;
session.timeout = response.revisedSessionTimeout;
session.serverNonce = response.serverNonce;
session.serverCertificate = response.serverCertificate;
session.serverSignature = response.serverSignature;
debugLog("revised session timeout = ".yellow, session.timeout);
self._server_endpoints = response.serverEndpoints;
session.serverEndpoints = response.serverEndpoints;
} else {
err = new Error("Error " + response.responseHeader.serviceResult.name + " " + response.responseHeader.serviceResult.description);
}
}
if (err) {
callback(err);
} else {
callback(null, session);
}
});
};
var computeSignature = require("node-opcua-secure-channel").computeSignature;
OPCUAClient.prototype.computeClientSignature = function (channel, serverCertificate, serverNonce) {
var self = this;
return computeSignature(serverCertificate, serverNonce, self.getPrivateKey(), channel.messageBuilder.securityPolicy);
};
function isAnonymous(userIdentityInfo) {
return !userIdentityInfo || (!userIdentityInfo.userName && !userIdentityInfo.password);
}
function isUserNamePassword(userIdentityInfo) {
var res = (userIdentityInfo.userName !== undefined) && (userIdentityInfo.password !== undefined);
return res;
}
function findUserTokenPolicy(endpoint_description, userTokenType) {
assert(endpoint_description instanceof EndpointDescription);
var r = _.filter(endpoint_description.userIdentityTokens, function (userIdentity) {
// assert(userIdentity instanceof UserTokenPolicy)
assert(userIdentity.tokenType);
return userIdentity.tokenType === userTokenType;
});
return r.length === 0 ? null : r[0];
}
function createAnonymousIdentityToken(session) {
var endpoint_desc = session.endpoint;
assert(endpoint_desc instanceof EndpointDescription);
var userTokenPolicy = findUserTokenPolicy(endpoint_desc, UserIdentityTokenType.ANONYMOUS);
if (!userTokenPolicy) {
throw new Error("Cannot find ANONYMOUS user token policy in end point description");
}
return new AnonymousIdentityToken({policyId: userTokenPolicy.policyId});
}
function createUserNameIdentityToken(session, userName, password) {
// assert(endpoint instanceof EndpointDescription);
assert(userName === null || typeof userName === "string");
assert(password === null || typeof password === "string");
var endpoint_desc = session.endpoint;
assert(endpoint_desc instanceof EndpointDescription);
var userTokenPolicy = findUserTokenPolicy(endpoint_desc, UserIdentityTokenType.USERNAME);
// istanbul ignore next
if (!userTokenPolicy) {
throw new Error("Cannot find USERNAME user token policy in end point description");
}
var securityPolicy = fromURI(userTokenPolicy.securityPolicyUri);
// if the security policy is not specified we use the session security policy
if (securityPolicy === SecurityPolicy.Invalid) {
securityPolicy = session._client._secureChannel.securityPolicy;
assert(securityPolicy);
}
var serverCertificate = session.serverCertificate;
// if server does not provide certificate use unencrypted password
if (serverCertificate === null) {
var userIdentityToken = new UserNameIdentityToken({
userName: userName,
password: Buffer.from(password, "utf-8"),
encryptionAlgorithm: null,
policyId: userTokenPolicy.policyId
});
return userIdentityToken;
}
assert(serverCertificate instanceof Buffer);
serverCertificate = crypto_utils.toPem(serverCertificate, "CERTIFICATE");
var publicKey = crypto_utils.extractPublicKeyFromCertificateSync(serverCertificate);
var serverNonce = session.serverNonce;
// if serverNonce not specified by server
if (serverNonce === null) {
serverNonce = Buffer.alloc(0);
}
assert(serverNonce instanceof Buffer);
// see Release 1.02 155 OPC Unified Architecture, Part 4
var cryptoFactory = getCryptoFactory(securityPolicy);
// istanbul ignore next
if (!cryptoFactory) {
throw new Error(" Unsupported security Policy");
}
var userIdentityToken = new UserNameIdentityToken({
userName: userName,
password: Buffer.from(password, "utf-8"),
encryptionAlgorithm: cryptoFactory.asymmetricEncryptionAlgorithm,
policyId: userTokenPolicy.policyId
});
// now encrypt password as requested
var lenBuf = createFastUninitializedBuffer(4);
lenBuf.writeUInt32LE(userIdentityToken.password.length + serverNonce.length, 0);
var block = Buffer.concat([lenBuf, userIdentityToken.password, serverNonce]);
userIdentityToken.password = cryptoFactory.asymmetricEncrypt(block, publicKey);
return userIdentityToken;
}
OPCUAClient.prototype.createUserIdentityToken = function (session, userIdentityToken, callback) {
assert(_.isFunction(callback));
var self = this;
if (isAnonymous(self.userIdentityInfo)) {
try {
userIdentityToken = createAnonymousIdentityToken(session);
return callback(null, userIdentityToken);
}
catch (err) {
return callback(err);
}
} else if (isUserNamePassword(self.userIdentityInfo)) {
var userName = self.userIdentityInfo.userName;
var password = self.userIdentityInfo.password;
try {
userIdentityToken = createUserNameIdentityToken(session, userName, password);
return callback(null, userIdentityToken);
}
catch (err) {
//xx console.log(err.stack);
return callback(err);
}
} else {
console.log(" userIdentityToken = ", userIdentityToken);
return callback(new Error("CLIENT: Invalid userIdentityToken"));
}
};
// see OPCUA Part 4 - $7.35
OPCUAClient.prototype._activateSession = function (session, callback) {
assert(typeof callback === "function");
var self = this;
// istanbul ignore next
if (!self._secureChannel) {
return callback(new Error(" No secure channel"));
}
var serverCertificate = session.serverCertificate;
// If the securityPolicyUri is NONE and none of the UserTokenPolicies requires encryption,
// the Client shall ignore the ApplicationInstanceCertificate (serverCertificate)
assert(serverCertificate === null || serverCertificate instanceof Buffer);
var serverNonce = session.serverNonce;
assert(!serverNonce || serverNonce instanceof Buffer);
// make sure session is attached to this client
var _old_client = session._client;
session._client = self;
self.createUserIdentityToken(session, self.userIdentityInfo, function (err, userIdentityToken) {
if (err) {
session._client = _old_client;
return callback(err);
}
// TODO. fill the ActivateSessionRequest
// see 5.6.3.2 Parameters OPC Unified Architecture, Part 4 30 Release 1.02
var request = new ActivateSessionRequest({
// This is a signature generated with the private key associated with the
// clientCertificate. The SignatureAlgorithm shall be the AsymmetricSignatureAlgorithm
// specified in the SecurityPolicy for the Endpoint. The SignatureData type is defined in 7.30.
clientSignature: self.computeClientSignature(self._secureChannel, serverCertificate, serverNonce),
// These are the SoftwareCertificates which have been issued to the Client application. The productUri contained
// in the SoftwareCertificates shall match the productUri in the ApplicationDescription passed by the Client in
// the CreateSession requests. Certificates without matching productUri should be ignored. Servers may reject
// connections from Clients if they are not satisfied with the SoftwareCertificates provided by the Client.
// This parameter only needs to be specified in the first ActivateSession request after CreateSession.
// It shall always be omitted if the maxRequestMessageSize returned from the Server in the CreateSession
// response is less than one megabyte. The SignedSoftwareCertificate type is defined in 7.31.
clientSoftwareCertificates: [],
// List of locale ids in priority order for localized strings. The first LocaleId in the list has the highest
// priority. If the Server returns a localized string to the Client, the Server shall return the translation
// with the highest priority that it can. If it does not have a translation for any of the locales identified
// in this list, then it shall return the string value that it has and include the locale id with the string.
// See Part 3 for more detail on locale ids. If the Client fails to specify at least one locale id, the Server
// shall use any that it has.
// This parameter only needs to be specified during the first call to ActivateSession during a single
// application Session. If it is not specified the Server shall keep using the current localeIds for the Session.
localeIds: [],
// The credentials of the user associated with the Client application. The Server uses these credentials to
// determine whether the Client should be allowed to activate a Session and what resources the Client has access
// to during this Session. The UserIdentityToken is an extensible parameter type defined in 7.35.
// The EndpointDescription specifies what UserIdentityTokens the Server shall accept.
userIdentityToken: userIdentityToken,
// If the Client specified a user identity token that supports digital signatures,
// then it shall create a signature and pass it as this parameter. Otherwise the parameter is omitted.
// The SignatureAlgorithm depends on the identity token type.
userTokenSignature: {
algorithm: null,
signature: null
}
});
session.performMessageTransaction(request, function (err, response) {
if (!err && response.responseHeader.serviceResult === StatusCodes.Good) {
assert(response instanceof ActivateSessionResponse);
session.serverNonce = response.serverNonce;
if (!validateServerNonce(session.serverNonce)) {
return callback(new Error("Invalid server Nonce"));
}
return callback(null, session);
} else {
err = err || new Error(response.responseHeader.serviceResult.toString());
session._client = _old_client;
return callback(err, null);
}
});
});
};
/**
* transfer session to this client
* @method reactivateSession
* @param session
* @param callback
* @return {*}
*/
OPCUAClient.prototype.reactivateSession = function (session, callback) {
var self = this;
assert(typeof callback === "function");
assert(this._secureChannel, " client must be connected first");
// istanbul ignore next
if (!this.__resolveEndPoint() || !this.endpoint) {
return callback(new Error(" End point must exist " + this._secureChannel.endpoint_url));
}
assert(session._client.endpointUrl === self.endpointUrl, "cannot reactivateSession on a different endpoint");
var old_client = session._client;
debugLog("OPCUAClient#reactivateSession");
this._activateSession(session, function (err) {
if (!err) {
if (old_client !== self) {
// remove session from old client:
old_client._removeSession(session);
assert(!_.contains(old_client._sessions, session));
self._addSession(session);
assert(_.contains(self._sessions, session));
}
} else {
// istanbul ignore next
if (doDebug) {
console.log("reactivateSession has failed !".red.bgWhite, err);
}
}
callback(err);
});
};
/**
* create and activate a new session
* @async
* @method createSession
*
* @param [userIdentityInfo {Object} ] optional
* @param [userIdentityInfo.userName {String} ]
* @param [userIdentityInfo.password {String} ]
*
* @param callback {Function}
* @param callback.err {Error|null} - the Error if the async method has failed
* @param callback.session {ClientSession} - the created session object.
*
*
* @example :
*
*
* // create a anonymous session
* client.createSession(function(err) {
* if (err) {} else {}
* });
*
* // create a session with a userName and password
* client.createSession({userName: "JoeDoe", password:"secret"}, function(err) {
* if (err) {} else {}
* });
*
*/
OPCUAClient.prototype.createSession = function (userIdentityInfo, callback) {
var self = this;
if (_.isFunction(userIdentityInfo)) {
callback = userIdentityInfo;
userIdentityInfo = {};
}
self.userIdentityInfo = userIdentityInfo;
assert(_.isFunction(callback));
self._createSession(function (err, session) {
if (err) {
callback(err);
} else {
self._addSession(session);
self._activateSession(session, function (err) {
callback(err, session);
});
}
});
};
OPCUAClient.prototype.changeSessionIdentity = function (session, userIdentityInfo, callback) {
var self = this;
assert(_.isFunction(callback));
var old_userIdentity = self.userIdentityInfo;
self.userIdentityInfo = userIdentityInfo;
self._activateSession(session, function (err) {
callback(err);
});
};
OPCUAClient.prototype._closeSession = function (session, deleteSubscriptions, callback) {
var self = this;
assert(_.isFunction(callback));
assert(_.isBoolean(deleteSubscriptions));
// istanbul ignore next
if (!self._secureChannel) {
return callback(new Error("no channel"));
}
assert(self._secureChannel);
var request = new CloseSessionRequest({
deleteSubscriptions: deleteSubscriptions
});
if (!self._secureChannel.isValid()) {
return callback();
}
session.performMessageTransaction(request, function (err, response) {
if (err) {
//xx console.log("xxx received : ", err, response);
//xx self._secureChannel.close(function () {
//xx callback(err, null);
//xx });
callback(err, null);
} else {
callback(err, response);
}
});
};
/**
*
* @method closeSession
* @async
* @param session {ClientSession} - the created client session
* @param deleteSubscriptions {Boolean} - whether to delete subscriptions or not
* @param callback {Function} - the callback
* @param callback.err {Error|null} - the Error if the async method has failed
*/
OPCUAClient.prototype.closeSession = function (session, deleteSubscriptions, callback) {
var self = this;
assert(_.isBoolean(deleteSubscriptions));
assert(_.isFunction(callback));
assert(session);
assert(session._client === self, "session must be attached to self");
session._closed = true;
//todo : send close session on secure channel
self._closeSession(session, deleteSubscriptions, function (err) {
session.emitCloseEvent();
self._removeSession(session);
assert(!_.contains(self._sessions, session));
assert(session._closed, "session must indicate it is closed");
callback(err);
});
};
OPCUAClient.prototype._ask_for_subscription_republish = function (session, callback) {
debugLog("_ask_for_subscription_republish ".bgCyan.yellow.bold);
//xx assert(session.getPublishEngine().nbPendingPublishRequests === 0, "at this time, publish request queue shall still be empty");
session.getPublishEngine().republish(function (err) {
debugLog("_ask_for_subscription_republish done".bgCyan.yellow.bold);
// xx assert(session.getPublishEngine().nbPendingPublishRequests === 0);
session.resumePublishEngine();
callback(err);
});
};
OPCUAClient.prototype._on_connection_reestablished = function (callback) {
var self = this;
assert(_.isFunction(callback));
// call base class implementation first
OPCUAClientBase.prototype._on_connection_reestablished.call(self, function (err) {
//
// a new secure channel has be created, we need to reactivate the session,
// and reestablish the subscription and restart the publish engine.
//
//
// see OPC UA part 4 ( version 1.03 ) figure 34 page 106
// 6.5 Reestablishing subscription....
//
//
//
// +---------------------+
// | CreateSecureChannel |
// | CreateSession |
// | ActivateSession |
// +---------------------+
// |
// |
// v
// +---------------------+
// | CreateSubscription |<-------------------------------------------------------------+
// +---------------------+ |
// | (1)
// |
// v
// +---------------------+
// (2)------------->| StartPublishEngine |
// +---------------------+
// |
// V
// +---------------------+
// +------->| Monitor Connection |
// | +---------------------+
// | |
// | v
// | Good / \
// +-----------------/ SR? \______Broken_____+
// \ / |
// \ / |
// |
// v
// +---------------------+
// | |
// | CreateSecureChannel |<-----+
// | | |
// +---------------------+ |
// | |
// v |
// / \ |
// / SR? \______Bad________+
// \ /
// \ /
// |
// |Good
// v
// +---------------------+
// | |
// | ActivateSession |
// | |
// +---------------------+
// |
// v +-------------------+ +----------------------+
// / \ | CreateSession | | |
// / SR? \______Bad_______>| ActivateSession |-----> | TransferSubscription |
// \ / | | | | (1)
// \ / +-------------------+ +----------------------+ ^
// | Good | |
// v (for each subscription) | |
// +--------------------+ / \ |
// | | OK / OK? \______Bad________+
// | RePublish |<----------------------------------------- \ /
// +-->| | \ /
// | +--------------------+
// | |
// | v
// | GOOD / \
// +------ / SR? \______Bad SubscriptionInvalidId______>(1)
// (2) \ /
// ^ \ /
// | |
// | |
// | BadMessageNotAvailable |
// +------------------------------------------------------+
//
debugLog(" Starting Session reactivation".red.bgWhite);
// repair session
var sessions = self._sessions;
async.map(sessions, function (session, next) {
debugLog("OPCUAClient#_on_connection_reestablished TRYING TO REACTIVATE SESSION");
self._activateSession(session, function (err) {
//
// Note: current limitation :
// - The reconnection doesn't work if connection break is cause by a server that crashes and restarts yet.
//
debugLog("ActivateSession : ", err ? err.message : "");
if (err) {
if (session.hasBeenClosed()) {
debugLog("Aborting reactivation of old session because user requested session to be closed".bgWhite.red);
return callback(new Error("reconnection cancelled due to session termination"));
}
// if failed => recreate a new Channel and transfer the subscription
var new_session = null;
async.series([
function (callback) {
debugLog("Activating old session has failed ! => Creating a new session ....".bgWhite.red);
session.getPublishEngine().suspend(true);
// create new session, based on old session,
// so we can reuse subscriptions data
self.__createSession_step2(session, function (err, _new_session) {
debugLog(" Creating a new session (based on old session data).... Done".bgWhite.cyan);
if (!err) {
new_session = _new_session;
assert(session === _new_session, "session should be recycled");
}
callback(err);
});
},
function (callback) {
debugLog(" activating a new session ....".bgWhite.red);
self._activateSession(new_session, function (err) {
debugLog(" activating a new session .... Done".bgWhite.cyan);
///xx self._addSession(new_session);
callback(err);
});
},
function (callback) {
// get the old subscriptions id from the old session
var subscriptionsIds = session.getPublishEngine().getSubscriptionIds();
debugLog(" session subscriptionCount = ", new_session.getPublishEngine().subscriptionCount);
if (subscriptionsIds.length === 0) {
debugLog(" No subscriptions => skipping transfer subscriptions");
return callback(); // no need to transfer subscriptions
}
debugLog(" asking server to transfer subscriptions = [", subscriptionsIds.join(", "), "]");
// Transfer subscriptions
var options = {
subscriptionIds: subscriptionsIds,
sendInitialValues: false
};
assert(new_session.getPublishEngine().nbPendingPublishRequests === 0, "we should not be publishing here");
new_session.transferSubscriptions(options, function (err, results) {
if (err) {
return callback(err);
}
debugLog("Transfer subscriptions done", results.toString());
debugLog(" new session subscriptionCount = ", new_session.getPublishEngine().subscriptionCount);
callback();
});
},
function (callback) {
assert(new_session.getPublishEngine().nbPendingPublishRequests === 0, "we should not be publishing here");
// call Republish
return self._ask_for_subscription_republish(new_session, callback);
},
function start_publishing_as_normal(callback) {
new_session.getPublishEngine().suspend(false);
callback();
}
], next);
} else {
// call Republish
return self._ask_for_subscription_republish(session, next);
}
});
}, function (err, results) {
return callback(err);
});
});
};
OPCUAClient.prototype.toString = function () {
OPCUAClientBase.prototype.toString.call(this);
console.log(" requestedSessionTimeout....... ", this.requestedSessionTimeout);
console.log(" endpoint_url................... ", this.endpoint_url);
console.log(" serverUri...................... ", this.serverUri);
};
exports.OPCUAClient = OPCUAClient;
exports.ClientSession = ClientSession;
/**
* @method withSession
* @param inner_func {function}
* @param inner_func.session {ClientSession}
* @param inner_func.callback {function}
* @param callback {function}
*/
OPCUAClient.prototype.withSession = function (endpointUrl, inner_func, callback) {
assert(_.isFunction(inner_func));
assert(_.isFunction(callback));
var client = this;
var the_session;
var the_error;
var need_disconnect = false;
async.series([
// step 1 : connect to
function (callback) {
client.connect(endpointUrl, function (err) {
need_disconnect = true;
if (err) {
console.log(" cannot connect to endpoint :", endpointUrl);
}
callback(err);
});
},
// step 2 : createSession
function (callback) {
client.createSession(function (err, session) {
if (!err) {
the_session = session;
}
callback(err);
});
},
function (callback) {
try {
inner_func(the_session, function (err) {
the_error = err;
callback();
});
}
catch (err) {
console.log("OPCUAClient#withClientSession", err.message);
the_error = err;
callback();
}
},
// close session
function (callback) {
the_session.close(/*deleteSubscriptions=*/true, function (err) {
if (err) {
console.log("OPCUAClient#withClientSession: session closed failed ?");
}
callback();
});
},
function (callback) {
client.disconnect(function (err) {
need_disconnect = false;
if (err) {
console.log("OPCUAClient#withClientSession: client disconnect failed ?");
}
callback();
});
}
], function (err1) {
if (need_disconnect) {
console.log("Disconnecting client after failure");
client.disconnect(function (err2) {
return callback(the_error || err1 || err2);
});
} else {
return callback(the_error || err1);
}
});
};