APIs

Show:
"use strict";
/**
 * @module opcua.client
 */


var util = require("util");
var assert = require("node-opcua-assert");
var _ = require("underscore");

var EventEmitter = require("events").EventEmitter;

var utils = require("node-opcua-utils");

var subscription_service = require("node-opcua-service-subscription");

var ClientSession = require("../src/client_session").ClientSession;
var ClientMonitoredItem = require("../src/client_monitored_item").ClientMonitoredItem;

var StatusCodes = require("node-opcua-status-code").StatusCodes;

var resolveNodeId = require("node-opcua-nodeid").resolveNodeId;

var AttributeIds = require("node-opcua-data-model").AttributeIds;
var TimestampsToReturn = require("node-opcua-service-read").TimestampsToReturn;

var debugLog = require("node-opcua-debug").make_debugLog(__filename);
var doDebug = require("node-opcua-debug").checkDebugFlag(__filename);

/**
 * a object to manage a subscription on the client side.
 *
 * @class ClientSubscription
 * @extends EventEmitter
 *
 * @param session
 * @param options {Object}
 * @param options.requestedPublishingInterval {Number}
 * @param options.requestedLifetimeCount {Number}
 * @param options.requestedMaxKeepAliveCount {Number}
 * @param options.maxNotificationsPerPublish {Number}
 * @param options.publishingEnabled {Boolean}
 * @param options.priority {Number}
 * @constructor
 *
 * events:
 *    "started",     callback(subscriptionId)  : the subscription has been initiated
 *    "terminated"                             : the subscription has been deleted
 *    "error",                                 : the subscription has received an error
 *    "keepalive",                             : the subscription has received a keep alive message from the server
 *    "received_notifications",                : the subscription has received one or more notification
 */
function ClientSubscription(session, options) {

    assert(session instanceof ClientSession);

    var self = this;
    self.publish_engine = session.getPublishEngine();


    //// options should have
    //var allowedProperties = [
    //    'requestedPublishingInterval',
    //    'requestedLifetimeCount',
    //    'requestedMaxKeepAliveCount',
    //    'maxNotificationsPerPublish',
    //    'publishingEnabled',
    //    'priority'
    //];

    options = options || {};
    options.requestedPublishingInterval = options.requestedPublishingInterval || 100;
    options.requestedLifetimeCount = options.requestedLifetimeCount || 60;
    options.requestedMaxKeepAliveCount = options.requestedMaxKeepAliveCount || 10;
    options.maxNotificationsPerPublish = utils.isNullOrUndefined(options.maxNotificationsPerPublish) ? 0 : options.maxNotificationsPerPublish;
    options.publishingEnabled = options.publishingEnabled ? true : false;
    options.priority = options.priority || 1;


    self.publishingInterval = options.requestedPublishingInterval;
    self.lifetimeCount = options.requestedLifetimeCount;
    self.maxKeepAliveCount = options.requestedMaxKeepAliveCount;
    self.maxNotificationsPerPublish = options.maxNotificationsPerPublish;
    self.publishingEnabled = options.publishingEnabled;
    self.priority = options.priority;

    self.subscriptionId = "pending";

    self._next_client_handle = 0;
    self.monitoredItems = {};

    /**
     * set to True when the server has notified us that this sbuscription has timed out
     * ( maxLifeCounter x published interval without being able to process a PublishRequest
     * @property hasTimedOut
     * @type {boolean}
     */
    self.hasTimedOut = false;

    setImmediate(function () {

        self.__create_subscription(function (err) {

            if (!err) {


                setImmediate(function () {
                    /**
                     * notify the observers that the subscription has now started
                     * @event started
                     */
                    self.emit("started", self.subscriptionId);
                });
            }
        });
    });
}
util.inherits(ClientSubscription, EventEmitter);


ClientSubscription.prototype.__create_subscription = function (callback) {

    assert(_.isFunction(callback));

    var self = this;

    var session = self.publish_engine.session;

    debugLog("ClientSubscription created ".yellow.bold);

    var request = new subscription_service.CreateSubscriptionRequest({
        requestedPublishingInterval: self.publishingInterval,
        requestedLifetimeCount: self.lifetimeCount,
        requestedMaxKeepAliveCount: self.maxKeepAliveCount,
        maxNotificationsPerPublish: self.maxNotificationsPerPublish,
        publishingEnabled: self.publishingEnabled,
        priority: self.priority
    });

    session.createSubscription(request, function (err, response) {

        if (err) {
            /* istanbul ignore next */
            self.emit("internal_error", err);
            if (callback) {
                return callback(err);
            }

        } else {
            self.subscriptionId = response.subscriptionId;
            self.publishingInterval = response.revisedPublishingInterval;
            self.lifetimeCount = response.revisedLifetimeCount;
            self.maxKeepAliveCount = response.revisedMaxKeepAliveCount;

            self.timeoutHint = (self.maxKeepAliveCount + 10 ) * self.publishingInterval;

            if (doDebug) {
                debugLog("registering callback".yellow.bold);
                debugLog("publishingInterval               ".yellow.bold, self.publishingInterval);
                debugLog("lifetimeCount                    ".yellow.bold, self.lifetimeCount);
                debugLog("maxKeepAliveCount                ".yellow.bold, self.maxKeepAliveCount);
                debugLog("publish request timeout hint =   ".yellow.bold, self.timeoutHint);
            }

            self.publish_engine.registerSubscription(self);

            if (callback) {
                callback(err);
            }
        }
    });
};


ClientSubscription.prototype.__on_publish_response_DataChangeNotification = function (notification) {

    assert(notification._schema.name === "DataChangeNotification");

    var self = this;

    var monitoredItems = notification.monitoredItems;

    monitoredItems.forEach(function (monitoredItem) {

        var monitorItemObj = self.monitoredItems[monitoredItem.clientHandle];
        if (monitorItemObj) {
            if (monitorItemObj.itemToMonitor.attributeId === AttributeIds.EventNotifier) {
                console.log("Warning".yellow, " Server send a DataChangeNotification for an EventNotifier. EventNotificationList was expected".cyan);
                console.log("         the Server may not be fully OPCUA compliant".cyan, ". This notification will be ignored.".yellow);
            } else {
                monitorItemObj._notify_value_change(monitoredItem.value);
            }
        }

    });

};

ClientSubscription.prototype.__on_publish_response_StatusChangeNotification = function (notification) {

    var self = this;

    assert(notification._schema.name === "StatusChangeNotification");

    debugLog("Client has received a Status Change Notification ", notification.statusCode.toString());

    self.publish_engine.cleanup_acknowledgment_for_subscription(notification.subscriptionId);

    if (notification.statusCode === StatusCodes.GoodSubscriptionTransferred) {
        // OPCUA UA Spec 1.0.3 : part 3 - page 82 - 5.13.7 TransferSubscriptions:
        // If the Server transfers the Subscription to the new Session, the Server shall issue a StatusChangeNotification
        // notificationMessage with the status code Good_SubscriptionTransferred to the old Session.
        console.log("ClientSubscription#__on_publish_response_StatusChangeNotification : GoodSubscriptionTransferred");
        self.hasTimedOut = true;
        self.terminate();
    }
    if (notification.statusCode === StatusCodes.BadTimeout) {
        // the server tells use that the subscription has timed out ..
        // this mean that this subscription has been closed on the server side and cannot process any
        // new PublishRequest.
        //
        // from Spec OPCUA Version 1.03 Part 4 - 5.13.1.1 Description : Page 69:
        //
        // h. Subscriptions have a lifetime counter that counts the number of consecutive publishing cycles in
        //    which there have been no Publish requests available to send a Publish response for the
        //    Subscription. Any Service call that uses the SubscriptionId or the processing of a Publish
        //    response resets the lifetime counter of this Subscription. When this counter reaches the value
        //    calculated for the lifetime of a Subscription based on the MaxKeepAliveCount parameter in the
        //    CreateSubscription Service (5.13.2), the Subscription is closed. Closing the Subscription causes
        //    its MonitoredItems to be deleted. In addition the Server shall issue a StatusChangeNotification
        //    notificationMessage with the status code Bad_Timeout.
        //
        self.hasTimedOut = true;
        self.terminate();
    }
    /**
     * notify the observers that the server has send a status changed notification (such as BadTimeout )
     * @event status_changed
     */
    self.emit("status_changed", notification.statusCode, notification.diagnosticInfo);

};

ClientSubscription.prototype.__on_publish_response_EventNotificationList = function (notification) {
    assert(notification._schema.name === "EventNotificationList");

    var self = this;

    notification.events.forEach(function (event) {
        var monitorItemObj = self.monitoredItems[event.clientHandle];
        assert(monitorItemObj, "Expecting a monitored item");


        monitorItemObj._notify_value_change(event.eventFields);
    });
};

ClientSubscription.prototype.onNotificationMessage = function (notificationMessage) {

    var self = this;
    assert(notificationMessage.hasOwnProperty("sequenceNumber"));

    self.lastSequenceNumber = notificationMessage.sequenceNumber;

    self.emit("raw_notification", notificationMessage);

    var notificationData = notificationMessage.notificationData;

    if (notificationData.length === 0) {
        // this is a keep alive message
        debugLog("Client : received a keep alive notification from client".yellow);
        /**
         * notify the observers that a keep alive Publish Response has been received from the server.
         * @event keepalive
         */
        self.emit("keepalive");

    } else {

        /**
         * notify the observers that some notifications has been received from the server in  a PublishResponse
         * each modified monitored Item
         * @event  received_notifications
         */
        self.emit("received_notifications", notificationMessage);
        // let publish a global event

        // now process all notifications
        notificationData.forEach(function (notification) {
            // DataChangeNotification / StatusChangeNotification / EventNotification
            switch (notification._schema.name) {
                case "DataChangeNotification":
                    // now inform each individual monitored item
                    self.__on_publish_response_DataChangeNotification(notification);
                    break;
                case "StatusChangeNotification":
                    self.__on_publish_response_StatusChangeNotification(notification);
                    break;
                case "EventNotificationList":
                    self.__on_publish_response_EventNotificationList(notification);
                    break;
                default:
                    console.log(" Invalid notification :", notification.toString());
            }
        });
    }

};


/**
 * the associated session
 * @property session
 * @type {ClientSession}
 */
ClientSubscription.prototype.__defineGetter__("session", function () {
    return this.publish_engine.session;
});


ClientSubscription.prototype._terminate_step2 = function (callback) {
    var self = this;


    setImmediate(function () {
        /**
         * notify the observers tha the client subscription has terminated
         * @event  terminated
         */
        self.subscriptionId = "terminated";
        self.emit("terminated");
        callback();
    });

};

/**
 * @method terminate
 */
ClientSubscription.prototype.terminate = function (callback) {

    var self = this;

    callback = callback || function () {
      };

    if (self.subscriptionId === "terminated") {
        // already terminated... just ignore
        callback(new Error("Already Terminated"));
        return;
    }

    if (_.isFinite(self.subscriptionId)) {

        self.publish_engine.unregisterSubscription(self.subscriptionId);

        if (!self.session) {
            return self._terminate_step2(callback);
        }


        self.session.deleteSubscriptions({
            subscriptionIds: [self.subscriptionId]
        }, function (err) {

            if (err) {
                /**
                 * notify the observers that an error has occurred
                 * @event internal_error
                 * @param {Error} err the error
                 */
                self.emit("internal_error", err);
            }
            self._terminate_step2(callback);
        });

    } else {
        assert(self.subscriptionId === "pending");
        self._terminate_step2(callback);
    }
};

/**
 * @method nextClientHandle
 */
ClientSubscription.prototype.nextClientHandle = function () {
    this._next_client_handle += 1;
    return this._next_client_handle;
};


ClientSubscription.prototype._add_monitored_item = function (clientHandle, monitoredItem) {
    var self = this;
    assert(self.isActive(), "subscription must be active and not terminated");
    assert(monitoredItem.monitoringParameters.clientHandle === clientHandle);
    self.monitoredItems[clientHandle] = monitoredItem;

    /**
     * notify the observers that a new monitored item has been added to the subscription.
     * @event item_added
     * @param {MonitoredItem} the monitored item.
     */
    self.emit("item_added", monitoredItem);
};


ClientSubscription.prototype._wait_for_subscription_to_be_ready = function (done) {

    var self = this;

    var _watch_dog = 0;

    function wait_for_subscription_and_monitor() {

        _watch_dog++;

        if (self.subscriptionId === "pending") {
            // the subscriptionID is not yet known because the server hasn't replied yet
            // let postpone this call, a little bit, to let things happen
            setImmediate(wait_for_subscription_and_monitor);

        } else if (self.subscriptionId === "terminated") {
            // the subscription has been terminated in the meantime
            // this indicates a potential issue in the code using this api.
            if (_.isFunction(done)) {
                done(new Error("subscription has been deleted"));
            }
        } else {
            done();
        }
    }

    setImmediate(wait_for_subscription_and_monitor);

};
/**
 * add a monitor item to the subscription
 *
 * @method monitor
 * @async
 * @param itemToMonitor                        {ReadValueId}
 * @param itemToMonitor.nodeId                 {NodeId}
 * @param itemToMonitor.attributeId            {AttributeId}
 * @param itemToMonitor.indexRange             {null|NumericRange}
 * @param itemToMonitor.dataEncoding
 * @param requestedParameters                  {MonitoringParameters}
 * @param requestedParameters.clientHandle     {IntegerId}
 * @param requestedParameters.samplingInterval {Duration}
 * @param requestedParameters.filter           {ExtensionObject|null} EventFilter/DataChangeFilter
 * @param requestedParameters.queueSize        {Counter}
 * @param requestedParameters.discardOldest    {Boolean}
 * @param timestampsToReturn                   {TimestampsToReturn} //{TimestampsToReturnId}
 * @param  [done]                              {Function} optional done callback
 * @return {ClientMonitoredItem}
 *
 *
 * Monitoring a simple Value Change
 * ---------------------------------
 *
 * @example:
 *
 *   clientSubscription.monitor(
 *     // itemToMonitor:
 *     {
 *       nodeId: "ns=0;i=2258",
 *       attributeId: AttributeIds.Value,
 *       indexRange: null,
 *       dataEncoding: { namespaceIndex: 0, name: null }
 *     },
 *     // requestedParameters:
 *     {
 *        samplingInterval: 3000,
 *        filter: null,
 *        queueSize: 1,
 *        discardOldest: true
 *     },
 *     TimestampsToReturn.Neither
 *   );
 *
 * Monitoring a Value Change With a DataChange  Filter
 * ---------------------------------------------------
 *
 * options.trigger       {DataChangeTrigger} {Status|StatusValue|StatusValueTimestamp}
 * options.deadbandType  {DeadbandType}      {None|Absolute|Percent}
 * options.deadbandValue {Double}

 * @example:
 *
 *   clientSubscription.monitor(
 *     // itemToMonitor:
 *     {
 *       nodeId: "ns=0;i=2258",
 *       attributeId: AttributeIds.Value,
 *     },
 *     // requestedParameters:
 *     {
 *        samplingInterval: 3000,
 *        filter: new DataChangeFilter({
 *             trigger: DataChangeTrigger.StatusValue,
 *             deadbandType: DeadBandType.Absolute,
 *             deadbandValue: 0.1
 *        }),
 *        queueSize: 1,
 *        discardOldest: true
 *     },
 *     TimestampsToReturn.Neither
 *   );
 *
 *
 * Monitoring an Event
 * -------------------
 *
 *  If the monitor attributeId is EventNotifier then the filter must be specified
 *
 * @example:
 *
 *  var filter =  new subscription_service.EventFilter({
 *    selectClauses: [
 *             { browsePath: [ {name: 'ActiveState'  }, {name: 'id'}  ]},
 *             { browsePath: [ {name: 'ConditionName'}                ]}
 *    ],
 *    whereClause: []
 *  });
 *
 *  clientSubscription.monitor(
 *     // itemToMonitor:
 *     {
 *       nodeId: "ns=0;i=2258",
 *       attributeId: AttributeIds.EventNotifier,
 *       indexRange: null,
 *       dataEncoding: { namespaceIndex: 0, name: null }
 *     },
 *     // requestedParameters:
 *     {
 *        samplingInterval: 3000,
 *
 *        filter: filter,
 *
 *        queueSize: 1,
 *        discardOldest: true
 *     },
 *     TimestampsToReturn.Neither
 *   );
 *
 *
 *
 *
 *
 *
 */
ClientSubscription.prototype.monitor = function (itemToMonitor, requestedParameters, timestampsToReturn, done) {
    var self = this;
    assert(done === undefined || _.isFunction(done));

    itemToMonitor.nodeId = resolveNodeId(itemToMonitor.nodeId);
    var monitoredItem = new ClientMonitoredItem(this, itemToMonitor, requestedParameters, timestampsToReturn);

    self._wait_for_subscription_to_be_ready(function (err) {
        if (err) {
            return done && done(err);
        }
        monitoredItem._monitor(done);
    });
    return monitoredItem;
};


var ClientMonitoredItemGroup = require("./client_monitored_item_group").ClientMonitoredItemGroup;
/**
 *
 * @param itemsToMonitor
 * @param requestedParameters
 * @param timestampsToReturn
 * @param done
 */
ClientSubscription.prototype.monitorItems = function (itemsToMonitor, requestedParameters, timestampsToReturn, done) {
    var self = this;

    // Try to resolve the nodeId and fail fast if we can't.
    itemsToMonitor.forEach(function (itemToMonitor) {
        itemToMonitor.nodeId = resolveNodeId(itemToMonitor.nodeId);
    });

    var monitoredItemGroup = new ClientMonitoredItemGroup(this, itemsToMonitor, requestedParameters, timestampsToReturn);

    self._wait_for_subscription_to_be_ready(function (err) {
        if (err) {
            return done && done(err);
        }
        monitoredItemGroup._monitor(done);
    });
    return monitoredItemGroup;
};

// ClientSubscription.prototype.monitorOld = function (itemToMonitor, requestedParameters, timestampsToReturn, done) {
//
//     var self = this;
//
//
//     assert(itemToMonitor.nodeId);
//     assert(itemToMonitor.attributeId);
//     assert(done === undefined || _.isFunction(done));
//     assert(!_.isFunction(timestampsToReturn));
//
//     // Try to resolve the nodeId and fail fast if we can't.
//     resolveNodeId(itemToMonitor.nodeId);
//
//     timestampsToReturn = timestampsToReturn || TimestampsToReturn.Neither;
//
//
//     var monitoredItem = new ClientMonitoredItem(this, itemToMonitor, requestedParameters, timestampsToReturn);
//
//     var _watch_dog = 0;
//
//     function wait_for_subscription_and_monitor() {
//
//         _watch_dog++;
//
//         if (self.subscriptionId === "pending") {
//             // the subscriptionID is not yet known because the server hasn't replied yet
//             // let postpone this call, a little bit, to let things happen
//             setImmediate(wait_for_subscription_and_monitor);
//
//         } else if (self.subscriptionId === "terminated") {
//             // the subscription has been terminated in the meantime
//             // this indicates a potential issue in the code using this api.
//             if (_.isFunction(done)) {
//                 done(new Error("subscription has been deleted"));
//             }
//         } else {
//             //xxx console.log("xxxx _watch_dog ",_watch_dog);
//             monitoredItem._monitor(done);
//         }
//     }
//
//     setImmediate(wait_for_subscription_and_monitor);
//     return monitoredItem;
// };


ClientSubscription.prototype.isActive = function () {
    var self = this;
    return typeof self.subscriptionId !== "string";
};

ClientSubscription.prototype._remove = function (monitoredItem) {
    var self = this;
    var clientHandle = monitoredItem.monitoringParameters.clientHandle;
    assert(clientHandle);
    assert(self.monitoredItems.hasOwnProperty(clientHandle));
    monitoredItem.removeAllListeners();
    delete self.monitoredItems[clientHandle];
};

ClientSubscription.prototype._delete_monitored_items = function (monitoredItems, callback) {
    assert(_.isArray(monitoredItems));
    var self = this;
    assert(self.isActive());

    monitoredItems.forEach(function (monitoredItem) {
        self._remove(monitoredItem);
    });

    self.session.deleteMonitoredItems({
        subscriptionId: self.subscriptionId,
        monitoredItemIds: monitoredItems.map(function (monitoredItem) {
            return monitoredItem.monitoredItemId;
        })
    }, function (err) {


        callback(err);
    });
};

ClientSubscription.prototype._delete_monitored_item = function (monitoredItem, callback) {
    var self = this;
    self._delete_monitored_items([monitoredItem], callback);
};

ClientSubscription.prototype.setPublishingMode = function (publishingEnabled, callback) {
    assert(_.isFunction(callback));
    var self = this;
    self.session.setPublishingMode(publishingEnabled, self.subscriptionId, function (err, results) {
        if (err) {
            return callback(err);
        }
        if (results[0] !== StatusCodes.Good) {
            return callback(new Error("Cannot setPublishingMode " + results[0].toString()));
        }
        callback();
    });
};


var async = require("async");
/**
 *  utility function to recreate new subscription
 *  @method recreateSubscriptionAndMonitoredItem
 */
ClientSubscription.prototype.recreateSubscriptionAndMonitoredItem = function (callback) {

    debugLog("ClientSubscription#recreateSubscriptionAndMonitoredItem");
    var subscription = this;

    var monitoredItems_old = subscription.monitoredItems;

    subscription.publish_engine.unregisterSubscription(subscription.subscriptionId);

    async.series([

        subscription.__create_subscription.bind(subscription),

        function (callback) {

            var test = subscription.publish_engine.getSubscription(subscription.subscriptionId);
            assert(test === subscription);

            // re-create monitored items

            var itemsToCreate = [];
            _.forEach(monitoredItems_old, function (monitoredItem /*, clientHandle*/) {
                assert(monitoredItem.monitoringParameters.clientHandle > 0);
                itemsToCreate.push({
                    itemToMonitor: monitoredItem.itemToMonitor,
                    monitoringMode: monitoredItem.monitoringMode,
                    requestedParameters: monitoredItem.monitoringParameters
                });

            });

            var createMonitorItemsRequest = new subscription_service.CreateMonitoredItemsRequest({
                subscriptionId: subscription.subscriptionId,
                timestampsToReturn: TimestampsToReturn.Both, // self.timestampsToReturn,
                itemsToCreate: itemsToCreate
            });

            subscription.session.createMonitoredItems(createMonitorItemsRequest, function (err, response) {

                if (!err) {
                    assert(response instanceof subscription_service.CreateMonitoredItemsResponse);
                    var monitoredItemResults = response.results;

                    monitoredItemResults.forEach(function (monitoredItemResult, index) {

                        var clientHandle = itemsToCreate[index].requestedParameters.clientHandle;
                        var monitoredItem = subscription.monitoredItems[clientHandle];

                        if (monitoredItemResult.statusCode === StatusCodes.Good) {

                            monitoredItem.result = monitoredItemResult;
                            monitoredItem.monitoredItemId = monitoredItemResult.monitoredItemId;
                            monitoredItem.monitoringParameters.samplingInterval = monitoredItemResult.revisedSamplingInterval;
                            monitoredItem.monitoringParameters.queueSize = monitoredItemResult.revisedQueueSize;
                            monitoredItem.filterResult = monitoredItemResult.filterResult;

                            // istanbul ignore next
                            if (doDebug) {
                                debugLog("monitoredItemResult.statusCode = ", monitoredItemResult.toString());
                            }

                        } else {
                            // TODO: what should we do ?
                            debugLog("monitoredItemResult.statusCode = ", monitoredItemResult.statusCode.toString());
                        }
                    });

                }
                callback(err);
            });

        }

    ], callback);
};

exports.ClientSubscription = ClientSubscription;