APIs

Show:
"use strict";
/**
 * @module opcua.server
 */


var Dequeue = require("dequeue");

var subscription_service = require("node-opcua-service-subscription");
var DataChangeNotification = subscription_service.DataChangeNotification;
var EventNotificationList = subscription_service.EventNotificationList;
var NotificationMessage = subscription_service.NotificationMessage;
var StatusChangeNotification = subscription_service.StatusChangeNotification;
var MonitoringMode = subscription_service.MonitoringMode;

var StatusCodes = require("node-opcua-status-code").StatusCodes;
var Enum = require("node-opcua-enum");
var assert = require("node-opcua-assert");
var _ = require("underscore");

var AttributeIds = require("node-opcua-data-model").AttributeIds;

var SequenceNumberGenerator = require("node-opcua-secure-channel").SequenceNumberGenerator;

var EventEmitter = require("events").EventEmitter;
var util = require("util");

var SessionContext = require("node-opcua-address-space").SessionContext;

var EventFilter = require("node-opcua-service-filter").EventFilter;
var DataChangeFilter = require("node-opcua-service-subscription").DataChangeFilter;
var AggregateFilter = require("node-opcua-service-subscription").AggregateFilter;

var UAVariable = require("node-opcua-address-space").UAVariable;
var validateFilter  = require("./validate_filter").validateFilter;
var is_valid_dataEncoding = require("node-opcua-data-model").is_valid_dataEncoding;

var debugLog = require("node-opcua-debug").make_debugLog(__filename);
var doDebug = require("node-opcua-debug").checkDebugFlag(__filename);

var SubscriptionState = new Enum([
    "CLOSED",   // The Subscription has not yet been created or has terminated.
    "CREATING", // The Subscription is being created
    "NORMAL",   // The Subscription is cyclically checking for Notifications from its MonitoredItems.
                // The keep-alive counter is not used in this state.
    "LATE",     // The publishing timer has expired and there are Notifications available or a keep-alive Message is
                // ready to be sent, but there are no Publish requests queued. When in this state, the next Publish
                // request is processed when it is received. The keep-alive counter is not used in this state.
    "KEEPALIVE",// The Subscription is cyclically checking for Notification
                // alive counter to count down to 0 from its maximum.
    "TERMINATED"
]);
exports.SubscriptionState = SubscriptionState;


var SubscriptionDiagnostics = require("node-opcua-common").SubscriptionDiagnostics;

var minimumPublishingInterval = 100;  // fastest possible
var defaultPublishingInterval = 100;
var maximumPublishingInterval = 1000 * 60 * 60 * 24 * 30; // 1 month

function _adjust_publishing_interval(publishingInterval) {
    publishingInterval = publishingInterval || defaultPublishingInterval;
    publishingInterval = Math.max(publishingInterval, minimumPublishingInterval);
    publishingInterval = Math.min(publishingInterval, maximumPublishingInterval);
    return publishingInterval;
}

var minimumMaxKeepAliveCount = 2;
var maximumMaxKeepAliveCount = 12000;
function _adjust_maxKeepAliveCount(maxKeepAliveCount/*,publishingInterval*/) {
    maxKeepAliveCount = maxKeepAliveCount || minimumMaxKeepAliveCount;
    maxKeepAliveCount = Math.max(maxKeepAliveCount, minimumMaxKeepAliveCount);
    maxKeepAliveCount = Math.min(maxKeepAliveCount, maximumMaxKeepAliveCount);
    return maxKeepAliveCount;
}
function _adjust_lifeTimeCount(lifeTimeCount, maxKeepAliveCount,publishingInterval) {
    lifeTimeCount = lifeTimeCount || 1;

    // let's make sure that lifeTimeCount is at least three time maxKeepAliveCount
    // Note : the specs say ( part 3  - CreateSubscriptionParameter )
    //        "The lifetime count shall be a minimum of three times the keep keep-alive count."
    lifeTimeCount = Math.max(lifeTimeCount, maxKeepAliveCount * 3 );

    var minTicks = Math.ceil(10 *1000 / (publishingInterval)); // we want 10 seconds moin

    lifeTimeCount = Math.max(minTicks,lifeTimeCount);
    return lifeTimeCount;
}

function _adjust_publishinEnable(publishingEnabled) {
    return (publishingEnabled === null || publishingEnabled === undefined) ? true : !!publishingEnabled;
}

function _adjust_maxNotificationsPerPublish(maxNotificationsPerPublish) {
    maxNotificationsPerPublish += 0;
    assert(_.isNumber(maxNotificationsPerPublish));
    return (maxNotificationsPerPublish >= 0) ? maxNotificationsPerPublish : 0;
}
// verify that the injected publishEngine provides the expected services
// regarding the Subscription requirements...
function _assert_valid_publish_engine(publishEngine) {
    assert(_.isObject(publishEngine));
    assert(_.isNumber(publishEngine.pendingPublishRequestCount));
    assert(_.isFunction(publishEngine.send_notification_message));
    assert(_.isFunction(publishEngine.send_keep_alive_response));
    assert(_.isFunction(publishEngine.on_close_subscription));
}


function createSubscriptionDiagnostics(self) {

    self.subscriptionDiagnostics = new SubscriptionDiagnostics({});

    // "sessionId"
    self.subscriptionDiagnostics.__defineGetter__("sessionId",                  function() { return self.getSessionId(); });
    self.subscriptionDiagnostics.__defineGetter__("subscriptionId",             function() { return self.id; });
    self.subscriptionDiagnostics.__defineGetter__("priority",                   function() { return self.priority; });
    self.subscriptionDiagnostics.__defineGetter__("publishingInterval",         function() { return self.publishingInterval; });
    self.subscriptionDiagnostics.__defineGetter__("maxLifetimeCount",           function() { return self.lifeTimeCount; });
    self.subscriptionDiagnostics.__defineGetter__("maxKeepAliveCount",          function() { return self.maxKeepAliveCount; });
    self.subscriptionDiagnostics.__defineGetter__("maxNotificationsPerPublish", function() { return self.maxNotificationsPerPublish; });
    self.subscriptionDiagnostics.__defineGetter__("publishingEnabled",          function() { return self.publishingEnabled; });
    self.subscriptionDiagnostics.__defineGetter__("monitoredItemCount",         function() { return self.monitoredItemCount; });
    self.subscriptionDiagnostics.__defineGetter__("nextSequenceNumber",         function() { return self._get_future_sequence_number(); });
    self.subscriptionDiagnostics.__defineGetter__("disabledMonitoredItemCount", function() { return self.disabledMonitoredItemCount; });

    /* those member of self.subscriptionDiagnostics are handled directly

     modifyCount
     enableCount,
     disableCount,
     republishRequestCount,
     notificationsCount,
     publishRequestCount,
     dataChangeNotificationsCount,
     eventNotificationsCount,
    */

    /*
     those members are not updated yet in the code :
     "republishMessageRequestCount",
     "republishMessageCount",
     "transferRequestCount",
     "transferredToAltClientCount",
     "transferredToSameClientCount",
     "latePublishRequestCount",
     "currentKeepAliveCount",
     "currentLifetimeCount",
     "unacknowledgedMessageCount",
     "discardedMessageCount",
     "monitoringQueueOverflowCount",
     "eventQueueOverFlowCount"
     */
    // add object in Variable SubscriptionDiagnosticArray (i=2290) ( Array of SubscriptionDiagnostics)
    // add properties in Variable to reflect
}
/**
 * The Subscription class used in the OPCUA server side.
 * @class Subscription
 * @param {Object} options
 * @param options.id {Integer} - a unique identifier
 * @param options.publishingInterval {Integer} - [optional](default:1000) the publishing interval.
 * @param options.maxKeepAliveCount  {Integer} - [optional](default:10) the max KeepAlive Count.
 * @param options.lifeTimeCount      {Integer} - [optional](default:10) the max Life Time Count
 * @param options.publishingEnabled  {Boolean} - [optional](default:true)
 * @param options.sessionId          {NodeId}  - [optional]
 * @param options.maxNotificationsPerPublish {Integer} - [optional](default:0)
 * @param options.priority {Byte}
 * @constructor
 */
function Subscription(options) {

    options = options || {};

    EventEmitter.apply(this, arguments);
    var self = this;

    self._sessionId = options.sessionId;

    self.publishEngine = options.publishEngine;
    _assert_valid_publish_engine(self.publishEngine);

    self.id = options.id || "<invalid_id>";

    self.priority = options.priority || 0;


    /**
     * the Subscription publishing interval
     * @property  publishingInterval
     * @type {number}
     * @default 1000
     */
    self.publishingInterval = _adjust_publishing_interval(options.publishingInterval);

    /**
     * The keep alive count defines how many times the publish interval need to
     * expires without having notifications available before the server send an
     * empty message.
     * OPCUA Spec says: a value of 0 is invalid.
     * @property  maxKeepAliveCount
     * @type {number}
     * @default 10
     *
     */
    self.maxKeepAliveCount = _adjust_maxKeepAliveCount(options.maxKeepAliveCount,self.publishingInterval);

    self.resetKeepAliveCounter();

    /**
     * The life time count defines how many times the publish interval expires without
     * having a connection to the client to deliver data.
     * If the life time count reaches maxKeepAliveCount, the subscription will
     * automatically terminate.
     * OPCUA Spec: The life-time count shall be a minimum of three times the keep keep-alive count.
     *
     * Note: this has to be interpreted as without having a PublishRequest available
     * @property  lifeTimeCount
     * @type {Number}
     * @default 1
     */
    self.lifeTimeCount = _adjust_lifeTimeCount(options.lifeTimeCount, self.maxKeepAliveCount,self.publishingInterval);


    /**
     * The maximum number of notifications that the Client wishes to receive in a
     * single Publish response. A value of zero indicates that there is no limit.
     * The number of notifications per Publish is the sum of monitoredItems in the
     * DataChangeNotification and events in the EventNotificationList.
     *
     * @property maxNotificationsPerPublish
     * @type {Number}
     * #default 0
     */
    self.maxNotificationsPerPublish = _adjust_maxNotificationsPerPublish(options.maxNotificationsPerPublish);

    self._life_time_counter = 0;
    self.resetLifeTimeCounter();

    // notification message that are ready to be sent to the client
    self._pending_notifications = new Dequeue();

    self._sent_notifications = [];

    self._sequence_number_generator = new SequenceNumberGenerator();

    // initial state of the subscription
    self.state = SubscriptionState.CREATING;

    self.publishIntervalCount = 0;

    self.monitoredItems = {}; // monitored item map

    /**
     *  number of monitored Item
     *  @property monitoredItemIdCounter
     *  @type {Number}
     */
    self.monitoredItemIdCounter = 0;

    self.publishingEnabled = _adjust_publishinEnable(options.publishingEnabled);

    createSubscriptionDiagnostics(self);

    // A boolean value that is set to TRUE to mean that either a NotificationMessage or a keep-alive Message has been
    // sent on the Subscription. It is a flag that is used to ensure that either a NotificationMessage or a keep-alive
    // Message is sent out the first time the publishing timer expires.
    self.messageSent = false;

    self._unacknowledgedMessageCount = 0;
    
    self.timerId = null;
    self._start_timer();

}

util.inherits(Subscription, EventEmitter);

var ObjectRegistry = require("node-opcua-object-registry").ObjectRegistry;
Subscription.registry = new ObjectRegistry();

Subscription.prototype.getSessionId = function () {
    var self = this;
    return self._sessionId;
};

Subscription.prototype.toString = function () {

    var self = this;
    var str = "";
    str += "  publishingEnabled  " + self.publishingEnabled + "\n";
    str += "  maxKeepAliveCount  " + self.maxKeepAliveCount + "\n";
    str += "  publishingInterval " + self.publishingInterval + "\n";
    str += "  lifeTimeCount      " + self.lifeTimeCount + "\n";
    str += "  maxKeepAliveCount  " + self.maxKeepAliveCount + "\n";
    return str;
};

/**
 * @method modify
 * @param param {Object}
 * @param param.requestedPublishingInterval  {Duration}  requestedPublishingInterval =0 means fastest possible
 * @param param.requestedLifetimeCount       {Counter}   requestedLifetimeCount      ===0 means no change
 * @param param.requestedMaxKeepAliveCount   {Counter}   requestedMaxKeepAliveCount  ===0 means no change
 * @param param.maxNotificationsPerPublish   {Counter}
 * @param param.priority                     {Byte}
 *
 */
Subscription.prototype.modify = function (param) {
    var self = this;

    // update diagnostic counter
    self.subscriptionDiagnostics.modifyCount +=1;

    var publishingInterval_old = self.publishingInterval;

    param.requestedPublishingInterval = param.requestedPublishingInterval || 0;
    param.requestedMaxKeepAliveCount = param.requestedMaxKeepAliveCount || self.maxKeepAliveCount;
    param.requestedLifetimeCount = param.requestedLifetimeCount || self.lifeTimeCount;

    self.publishingInterval = _adjust_publishing_interval(param.requestedPublishingInterval);
    self.maxKeepAliveCount = _adjust_maxKeepAliveCount(param.requestedMaxKeepAliveCount,self.publishingInterval);
    self.lifeTimeCount = _adjust_lifeTimeCount(param.requestedLifetimeCount, self.maxKeepAliveCount,self.publishingInterval);

    self.maxNotificationsPerPublish = param.maxNotificationsPerPublish;
    self.priority = param.priority;

    self.resetLifeTimeAndKeepAliveCounters();

    if (publishingInterval_old !== self.publishingInterval) {
        // todo
    }
    self._stop_timer();
    self._start_timer();

};

Subscription.prototype._stop_timer = function () {
    var self = this;
    if (self.timerId) {
        debugLog("Subscription#_stop_timer subscriptionId=".bgWhite.blue,self.id);
        clearInterval(self.timerId);
        self.timerId = null;
        Subscription.registry.unregister(self);
    }
};


Subscription.prototype._start_timer = function () {

    var self = this;
    debugLog("Subscription#_start_timer  subscriptionId=".bgWhite.blue,self.id," publishingInterval = ",self.publishingInterval);

    assert(self.timerId === null);
    // from the spec:
    // When a Subscription is created, the first Message is sent at the end of the first publishing cycle to
    // inform the Client that the Subscription is operational. A NotificationMessage is sent if there are
    // Notifications ready to be reported. If there are none, a keep-alive Message is sent instead that
    // contains a sequence number of 1, indicating that the first NotificationMessage has not yet been sent.
    // This is the only time a keep-alive Message is sent without waiting for the maximum keep-alive count
    // to be reached, as specified in (f) above.


    // make sure that a keep-alive Message will be send at the end of the first publishing cycle
    // if there are no Notifications ready.
    self._keep_alive_counter = self.maxKeepAliveCount;

    assert(self.publishingInterval >= minimumPublishingInterval);
    self.timerId = setInterval(self._tick.bind(self), self.publishingInterval);

    Subscription.registry.register(self);

};

// counter
Subscription.prototype._get_next_sequence_number = function () {
    return this._sequence_number_generator.next();
};

// counter
Subscription.prototype._get_future_sequence_number = function () {
    return this._sequence_number_generator.future();
};


Subscription.prototype.setPublishingMode = function (publishingEnabled) {

    this.publishingEnabled = !!publishingEnabled;

    // update diagnostics

    if (this.publishingEnabled) {
        this.subscriptionDiagnostics.enableCount += 1;
    } else {
        this.subscriptionDiagnostics.disableCount += 1;
    }

    this.resetLifeTimeCounter();

    if (!publishingEnabled  && this.state !== SubscriptionState.CLOSED) {
        this.state = SubscriptionState.NORMAL;
    }
    return StatusCodes.Good;
};


/**
 *  _publish_pending_notifications send a "notification" event:
 *
 * @method _publish_pending_notifications *
 * @private
 *
 */
Subscription.prototype._publish_pending_notifications = function () {

    var self = this;
    var publishEngine = self.publishEngine;
    var subscriptionId = self.id;

    // preconditions
    assert(publishEngine.pendingPublishRequestCount > 0);
    assert(self.hasPendingNotifications);

    function _count_notification_message(notifData) {

        if (notifData instanceof DataChangeNotification) {
            self.subscriptionDiagnostics.dataChangeNotificationsCount += 1;
        } else if (notifData instanceof EventNotificationList) {
            self.subscriptionDiagnostics.eventNotificationsCount += 1;
        } else {
            // TODO
        }
    }

    // todo : get rid of this....
    self.emit("notification");

    var notificationMessage = self._popNotificationToSend().notification;

    self.emit("notificationMessage",notificationMessage);


    assert(_.isArray(notificationMessage.notificationData));

    notificationMessage.notificationData.forEach(_count_notification_message);


    assert(notificationMessage.hasOwnProperty("sequenceNumber"));
    assert(notificationMessage.hasOwnProperty("notificationData"));

    var moreNotifications = (self.hasPendingNotifications);

    // update diagnostics
    if(self.subscriptionDiagnostics) {
        self.subscriptionDiagnostics.notificationsCount += 1;
        self.subscriptionDiagnostics.publishRequestCount += 1;
    }

    publishEngine.send_notification_message({
        subscriptionId: subscriptionId,
        sequenceNumber: notificationMessage.sequenceNumber,
        notificationData: notificationMessage.notificationData,
        moreNotifications: moreNotifications
    },false);
    self.messageSent = true;
    self._unacknowledgedMessageCount ++;
    
    self.resetLifeTimeAndKeepAliveCounters();

    if (doDebug) {
        debugLog("Subscription sending a notificationMessage subscriptionId=", subscriptionId,
            "sequenceNumber = ",notificationMessage.sequenceNumber.toString());
        // debugLog(notificationMessage.toString());
    }

    if (self.state !== SubscriptionState.CLOSED) {
        assert(notificationMessage.notificationData.length >0, "We are not expecting a keep-alive message here");
        self.state = SubscriptionState.NORMAL;
        debugLog("subscription " + self.id + " set to NORMAL".bgYellow);
    }

};

Subscription.prototype._process_keepAlive = function () {
    var self = this;

    //xx assert(!self.publishingEnabled || (!self.hasPendingNotifications && !self.hasMonitoredItemNotifications));

    self.increaseKeepAliveCounter();

    if (self.keepAliveCounterHasExpired) {

        if (self._sendKeepAliveResponse()) {

            self.resetLifeTimeAndKeepAliveCounters();

        } else {
            debugLog("     -> subscription.state === LATE , because keepAlive Response cannot be send due to lack of PublishRequest");
            self.state = SubscriptionState.LATE;
        }
    }
};



Subscription.prototype.process_subscription = function () {

    var self = this;

    assert(self.publishEngine.pendingPublishRequestCount >0);

    if (!self.publishingEnabled) {
        // no publish to do, except keep alive
        self._process_keepAlive();
        return;
    }

    if( !self.hasPendingNotifications && self.hasMonitoredItemNotifications) {
        // collect notification from monitored items
        self._harvestMonitoredItems();
    }

    // let process them first
    if (self.hasPendingNotifications) {

        self._publish_pending_notifications();

        if (self.state === SubscriptionState.NORMAL && self.hasPendingNotifications) {

            // istanbul ignore next
            if (doDebug) {
                debugLog("    -> pendingPublishRequestCount > 0 && normal state => re-trigger tick event immediately ");
            }

            // let process an new publish request
            setImmediate(self._tick.bind(self));
        }

    } else {
        self._process_keepAlive();
    }
};

function w(s, w) {
    return ("000" + s).substr(-w);
}
function t(d) {
    return w(d.getHours(), 2) + ":" + w(d.getMinutes(), 2) + ":" + w(d.getSeconds(), 2) + ":" + w(d.getMilliseconds(), 3);
}
/**
 * @method _tick
 * @private
 */
Subscription.prototype._tick = function () {

    var self = this;

    self.discardOldSentNotifications();

    // istanbul ignore next
    if (doDebug) {
        debugLog((t(new Date()) + "  " + self._life_time_counter + "/"  + self.lifeTimeCount + "   Subscription#_tick").cyan,"  processing subscriptionId=", self.id, "hasMonitoredItemNotifications = ",self.hasMonitoredItemNotifications," publishingIntervalCount =",self.publishIntervalCount);
    }
    if (self.publishEngine._on_tick) {
        self.publishEngine._on_tick();
    }

    self.publishIntervalCount += 1;

    self.increaseLifeTimeCounter();

    if (self.lifeTimeHasExpired) {

        /* istanbul ignore next */
        if (doDebug) {
            debugLog("Subscription " + self.id + " has expired !!!!! => Terminating".red.bold);
        }
        /**
         * notify the subscription owner that the subscription has expired by exceeding its life time.
         * @event expired
         *
         */
        self.emit("expired");

        // notify new terminated status only when subscription has timeout.
        debugLog("adding StatusChangeNotification notification message for BadTimeout subscription = ", self.id);
        self._addNotificationMessage([new StatusChangeNotification({statusCode: StatusCodes.BadTimeout})]);

        // kill timer and delete monitored items
        self.terminate();

        return;

    }

    var publishEngine = self.publishEngine;

    // istanbul ignore next
    if(doDebug) { debugLog("Subscription#_tick  self._pending_notifications= ",self._pending_notifications.length);}

    if (publishEngine.pendingPublishRequestCount === 0 && ( self.hasPendingNotifications || self.hasMonitoredItemNotifications) ) {

        // istanbul ignore next
        if (doDebug) {
            debugLog("subscription set to LATE  hasPendingNotifications = ",self.hasPendingNotifications," hasMonitoredItemNotifications =",self.hasMonitoredItemNotifications);
        }
        self.state = SubscriptionState.LATE;
        return;
    }

    if (publishEngine.pendingPublishRequestCount > 0 ) {

        if (self.hasPendingNotifications) {
            // simply pop pending notification and send it
            self.process_subscription();

        } else if (self.hasMonitoredItemNotifications) {
            self.process_subscription();

        } else {
            self._process_keepAlive();
        }
    } else {
        self._process_keepAlive();
    }
};


/**
 * @method _sendKeepAliveResponse
 * @private
 */
Subscription.prototype._sendKeepAliveResponse = function () {

    var self = this;
    var future_sequence_number = self._get_future_sequence_number();

    debugLog("     -> Subscription#_sendKeepAliveResponse subscriptionId",self.id);

    if (self.publishEngine.send_keep_alive_response(self.id, future_sequence_number)) {

        self.messageSent = true;

        /**
         * notify the subscription owner that a keepalive message has to be sent.
         * @event keepalive
         *
         */
        self.emit("keepalive", future_sequence_number);
        self.state = SubscriptionState.KEEPALIVE;

        return true;
    }
    return false;
};


/**
 * @method resetKeepAliveCounter
 * @private
 * Reset the Lifetime Counter Variable to the value specified for the lifetime of a Subscription in
 * the CreateSubscription Service( 5.13.2).
 */
Subscription.prototype.resetKeepAliveCounter = function () {
    var self = this;
    self._keep_alive_counter = 0;

    // istanbul ignore next
    if (doDebug) {
        debugLog("     -> subscriptionId",self.id," Resetting keepAliveCounter = ",self._keep_alive_counter,self.maxKeepAliveCount);
    }
};

/**
 * @method increaseKeepAliveCounter
 * @private
 */
Subscription.prototype.increaseKeepAliveCounter = function () {
    var self = this;
    self._keep_alive_counter += 1;

    // istanbul ignore next
    if (doDebug) {
        debugLog("     -> subscriptionId",self.id," Increasing keepAliveCounter = ", self._keep_alive_counter, self.maxKeepAliveCount);
    }
};

/**
 * @property keepAliveCounterHasExpired
 * @private
 * @type {Boolean} true if the keep alive counter has reach its limit.
 */
Subscription.prototype.__defineGetter__("keepAliveCounterHasExpired", function () {
    var self = this;
    return self._keep_alive_counter >= self.maxKeepAliveCount;
});


/**
 * Reset the Lifetime Counter Variable to the value specified for the lifetime of a Subscription in
 * the CreateSubscription Service( 5.13.2).
 * @method resetLifeTimeCounter
 * @private
 */
Subscription.prototype.resetLifeTimeCounter = function () {
    var self = this;
    self._life_time_counter = 0;
};
/**
 * @method increaseLifeTimeCounter
 * @private
 */
Subscription.prototype.increaseLifeTimeCounter = function () {
    var self = this;
    self._life_time_counter += 1;
};

/**
 *  True if the subscription life time has expired.
 *
 * @property lifeTimeHasExpired
 * @type {boolean} - true if the subscription life time has expired.
 */
Subscription.prototype.__defineGetter__("lifeTimeHasExpired", function () {
    var self = this;
    assert(self.lifeTimeCount > 0);
    return self._life_time_counter >= self.lifeTimeCount;
});

/**
 * number of milliseconds before this subscription times out (lifeTimeHasExpired === true);
 * @property timeToExpiration
 * @type {Number}
 */
Subscription.prototype.__defineGetter__("timeToExpiration", function () {
    var self = this;
    return (self.lifeTimeCount - self._life_time_counter ) * self.publishingInterval;
});

Subscription.prototype.__defineGetter__("timeToKeepAlive", function () {
    var self = this;
    return (self.maxKeepAliveCount - self._keep_alive_counter ) * self.publishingInterval;
});

/**
 *
 *  the server invokes the resetLifeTimeAndKeepAliveCounters method of the subscription
 *  when the server  has send a Publish Response, so that the subscription
 *  can reset its life time counter.
 *
 * @method resetLifeTimeAndKeepAliveCounters
 *
 */
Subscription.prototype.resetLifeTimeAndKeepAliveCounters = function () {
    var self = this;
    self.resetLifeTimeCounter();
    self.resetKeepAliveCounter();
};

/**
 * Terminates the subscription.
 * @method terminate
 *
 * Calling this method will also remove any monitored items.
 *
 */
Subscription.prototype.terminate = function () {
    var self = this;

    if (self.state === SubscriptionState.CLOSED) {
        // todo verify if asserting is required here
        return;
    }
    assert(self.state !== SubscriptionState.CLOSED, "terminate already called ?");

    // stop timer
    self._stop_timer();

    debugLog("terminating Subscription  ", self.id, " with ", self.monitoredItemCount, " monitored items");

    // dispose all monitoredItem
    var keys = Object.keys(self.monitoredItems);

    keys.forEach(function (key) {
        var status = self.removeMonitoredItem(key);
        assert(status === StatusCodes.Good);
    });

    assert(self.monitoredItemCount === 0);


    self.state = SubscriptionState.CLOSED;

    self.publishEngine.on_close_subscription(self);

    /**
     * notify the subscription owner that the subscription has been terminated.
     * @event "terminated"
     */
    self.emit("terminated");
};

function assert_validNotificationData(n) {
    assert(
        n instanceof DataChangeNotification ||
        n instanceof EventNotificationList  ||
        n instanceof StatusChangeNotification
    );
}

/**
 * @method _addNotificationMessage
 * @param notificationData {Array<DataChangeNotification|EventNotificationList|StatusChangeNotification>}
 */
Subscription.prototype._addNotificationMessage = function (notificationData) {

    assert(_.isArray(notificationData));
    assert(notificationData.length === 1 || notificationData.length === 2); // as per spec part 3.

    // istanbul ignore next
    if (false && doDebug) {
        debugLog("Subscription#_addNotificationMessage".yellow,notificationData.toString());
    }
    var self = this;
    assert(_.isObject(notificationData[0]));

    assert_validNotificationData(notificationData[0]);
    if (notificationData.length === 2) {
        assert_validNotificationData(notificationData[1]);
    }

    var notification_message = new NotificationMessage({
        sequenceNumber: self._get_next_sequence_number(),
        publishTime: new Date(),
        notificationData: notificationData
    });

    self._pending_notifications.push({
        notification: notification_message,
        start_tick: self.publishIntervalCount,
        publishTime: new Date(),
        sequenceNumber: notification_message.sequenceNumber
    });

};


Subscription.prototype.getMessageForSequenceNumber = function (sequenceNumber) {

    var self = this;

    function filter_func(e) {
        return e.sequenceNumber === sequenceNumber;
    }

    var notification_message = _.find(self._sent_notifications, filter_func);

    if (!notification_message) {
        return null;
    }
    return notification_message;

};

/**
 * Extract the next Notification that is ready to be sent to the client.
 * @method _popNotificationToSend
 * @return {NotificationMessage}  the Notification to send._pending_notifications
 */
Subscription.prototype._popNotificationToSend = function () {
    var self = this;
    assert(self._pending_notifications.length >0);
    var notification_message = self._pending_notifications.shift();
    self._sent_notifications.push(notification_message);
    return notification_message;
};

/**
 * returns true if the notification has expired
 * @method notificationHasExpired
 * @param notification
 * @return {boolean}
 */
Subscription.prototype.notificationHasExpired = function (notification) {
    var self = this;
    assert(notification.hasOwnProperty("start_tick"));
    assert(_.isFinite(notification.start_tick + self.maxKeepAliveCount));
    return (notification.start_tick + self.maxKeepAliveCount) < self.publishIntervalCount;
};

var maxNotificationMessagesInQueue = 100;
/**
 * discardOldSentNotification find all sent notification message that have expired keep-alive
 * and destroy them.
 * @method discardOldSentNotifications
 * @private
 *
 * Subscriptions maintain a retransmission queue of sent  NotificationMessages.
 * NotificationMessages are retained in this queue until they are acknowledged or until they have
 * been in the queue for a minimum of one keep-alive interval.
 *
 */
Subscription.prototype.discardOldSentNotifications = function () {

    var self = this;

    // Sessions maintain a retransmission queue of sent NotificationMessages. NotificationMessages
    // are retained in this queue until they are acknowledged. The Session shall maintain a
    // retransmission queue size of at least two times the number of Publish requests per Session the
    // Server supports.  Clients are required to acknowledge NotificationMessages as they are received. In the
    // case of a retransmission queue overflow, the oldest sent NotificationMessage gets deleted. If a
    // Subscription is transferred to another Session, the queued NotificationMessages for this
    // Subscription are moved from the old to the new Session.
    if (maxNotificationMessagesInQueue <= self._sent_notifications.length) {
        debugLog("discardOldSentNotifications = ",self._sent_notifications.length);
        self._sent_notifications.splice(self._sent_notifications.length - maxNotificationMessagesInQueue);
    }
    //
    //var arr = _.filter(self._sent_notifications,function(notification){
    //   return self.notificationHasExpired(notification);
    //});
    //var results = arr.map(function(notification){
    //    return self.acknowledgeNotification(notification.sequenceNumber);
    //});
    //xx return results;
};

function getSequenceNumbers(arr) {
    return arr.map(function (e) {
        return e.notification.sequenceNumber;
    });
}
/**
 *  returns in an array the sequence numbers of the notifications that haven't been
 *  acknowledged yet.
 *
 *  @method getAvailableSequenceNumbers
 *  @return {Integer[]}
 *
 */
Subscription.prototype.getAvailableSequenceNumbers = function () {
    var self = this;
    var availableSequenceNumbers = getSequenceNumbers(self._sent_notifications);
    return availableSequenceNumbers;
};


/**
 * @method acknowledgeNotification
 * @param sequenceNumber {Number}
 * @return {StatusCode}
 */
Subscription.prototype.acknowledgeNotification = function (sequenceNumber) {
    var self = this;

    var foundIndex = -1;
    _.find(self._sent_notifications, function (e, index) {
        if (e.sequenceNumber === sequenceNumber) {
            foundIndex = index;
        }
    });
    if (foundIndex === -1) {
        if (doDebug) {
            debugLog("acknowledging sequence FAILED !!! ".red, sequenceNumber.toString().cyan);
        }
        return StatusCodes.BadSequenceNumberUnknown;
    } else {
        if (doDebug) {
            debugLog("acknowledging sequence ".yellow, sequenceNumber.toString().cyan);
        }
        self._sent_notifications.splice(foundIndex, 1);
        self._unacknowledgedMessageCount --;
        return StatusCodes.Good;
    }
};


/**
 *
 * @property pendingNotificationsCount  - number of pending notifications
 * @type {Number}
 */
Subscription.prototype.__defineGetter__("pendingNotificationsCount", function () {
    return this._pending_notifications.length;
});

/**
 * return True is there are pending notifications for this subscription. (i.e moreNotifications)
 *
 * @property hasPendingNotifications
 * @type {Boolean}
 */
Subscription.prototype.__defineGetter__("hasPendingNotifications", function () {
    var self = this;
    return self.pendingNotificationsCount > 0;
});

/**
 * number of sent notifications
 * @property sentNotificationsCount
 * @type {Number}
 */
Subscription.prototype.__defineGetter__("sentNotificationsCount", function () {
    return this._sent_notifications.length;
});

/**
 * number of monitored items.
 * @property monitoredItemCount
 * @type {Number}
 */
Subscription.prototype.__defineGetter__("monitoredItemCount", function () {
    return Object.keys(this.monitoredItems).length;
});

/**
 * number of disabled monitored items.
 * @property disabledMonitoredItemCount
 * @type {Number}
 */
Subscription.prototype.__defineGetter__("disabledMonitoredItemCount", function () {

    return  _.reduce(_.values(this.monitoredItems),function(cumul,monitoredItem) {
        return cumul + ( (monitoredItem.monitoringMode === MonitoringMode.Disabled ) ? 1 : 0 );
    },0);

});

/**
 * The number of unacknowledged messages saved in the republish queue.
 * @property unacknowledgedMessageCount
 * @type {Number}
 */
Subscription.prototype.__defineGetter__("unacknowledgedMessageCount", function () {
    var self = this;
    return self._unacknowledgedMessageCount;
});



var MonitoredItem = require("./monitored_item").MonitoredItem;


var MonitoredItemCreateRequest = require("node-opcua-service-subscription").MonitoredItemCreateRequest;


/**
 * adjust monitored item sampling interval
 *  - an samplingInterval ===0 means that we use a event-base model ( no sampling)
 *  - otherwise the sampling is adjusted
 *
 * @method adjustSamplingInterval
 * @param samplingInterval
 * @param node
 * @return {number|*}
 * @private
 */
Subscription.prototype.adjustSamplingInterval = function (samplingInterval, node) {

    var self = this;

    if (samplingInterval < 0) {
        // - The value -1 indicates that the default sampling interval defined by the publishing
        //   interval of the Subscription is requested.
        // - Any negative number is interpreted as -1.
        samplingInterval = self.publishingInterval;

    } else if (samplingInterval === 0 ) {

        // OPCUA 1.0.3 Part 4 - 5.12.1.2
        // The value 0 indicates that the Server should use the fastest practical rate.

        // The fastest supported sampling interval may be equal to 0, which indicates
        // that the data item is exception-based rather than being sampled at some period.
        // An exception-based model means that the underlying system does not require sampling and reports data changes.

        var dataValueSamplingInterval = node.readAttribute(SessionContext.defaultContext, AttributeIds.MinimumSamplingInterval);

        // TODO if attributeId === AttributeIds.Value : sampling interval required here
        if (dataValueSamplingInterval.statusCode === StatusCodes.Good) {
            // node provides a Minimum sampling interval ...
            samplingInterval = dataValueSamplingInterval.value.value;
            assert(samplingInterval >=0 && samplingInterval <= MonitoredItem.maximumSamplingInterval);

            // note : at this stage, a samplingInterval===0 means that the data item is really exception-based

        }

    } else if (samplingInterval < MonitoredItem.minimumSamplingInterval) {

        samplingInterval = MonitoredItem.minimumSamplingInterval;

    } else if (samplingInterval > MonitoredItem.maximumSamplingInterval) {

        // If the requested samplingInterval is higher than the
        // maximum sampling interval supported by the Server, the maximum sampling
        // interval is returned.
        samplingInterval = MonitoredItem.maximumSamplingInterval;
    }

    var node_minimumSamplingInterval = (node && node.minimumSamplingInterval) ? node.minimumSamplingInterval : 0;
    samplingInterval = Math.max(samplingInterval, node_minimumSamplingInterval);

    return samplingInterval;
};


var checkSelectClauses = require("node-opcua-address-space").checkSelectClauses;

function analyseEventFilterResult(node, eventFilter) {
    assert(eventFilter instanceof EventFilter);
    var selectClauseResults = checkSelectClauses(node, eventFilter.selectClauses);

    var whereClauseResult = new subscription_service.ContentFilterResult();

    return new subscription_service.EventFilterResult({
        selectClauseResults: selectClauseResults,
        selectClauseDiagnosticInfos: [],
        whereClauseResult: whereClauseResult
    });
}
function analyseDataChangeFilterResult(node, dataChangeFilter) {
    assert(dataChangeFilter instanceof subscription_service.DataChangeFilter);
    // the opcua specification doesn't provide dataChangeFilterResult
    return null;
}
function analyseAggregateFilterResult(node, aggregateFilter) {
    assert(aggregateFilter instanceof subscription_service.AggregateFilter);
    return new subscription_service.AggregateFilterResult({});
}


function _process_filter(node, filter) {

    if (!filter) {
        return null;
    }

    if (filter instanceof EventFilter) {
        return analyseEventFilterResult(node, filter);
    } else if (filter instanceof DataChangeFilter) {
        return analyseDataChangeFilterResult(node, filter);
    } else if (filter instanceof AggregateFilter) {
        return analyseAggregateFilterResult(node, filter);
    }
    // istanbul ignore next
    throw new Error("invalid filter");
}


Subscription.prototype.createMonitoredItem = function (addressSpace, timestampsToReturn, monitoredItemCreateRequest) {

    var self = this;
    assert(addressSpace.constructor.name === "AddressSpace");
    assert(monitoredItemCreateRequest instanceof MonitoredItemCreateRequest);


    function handle_error(statusCode) {
        return new subscription_service.MonitoredItemCreateResult({statusCode: statusCode});
    }

    var itemToMonitor = monitoredItemCreateRequest.itemToMonitor;

    var node = addressSpace.findNode(itemToMonitor.nodeId);
    if (!node) {
        return handle_error(StatusCodes.BadNodeIdUnknown);
    }


    if (itemToMonitor.attributeId === AttributeIds.Value && !(node instanceof UAVariable)) {
        // AttributeIds.Value is only valid for monitoring value of UAVariables.
        return handle_error(StatusCodes.BadAttributeIdInvalid);
    }


    if (itemToMonitor.attributeId === AttributeIds.INVALID) {
        return handle_error(StatusCodes.BadAttributeIdInvalid);
    }

    if (!itemToMonitor.indexRange.isValid()) {
        return handle_error(StatusCodes.BadIndexRangeInvalid);
    }

    // check dataEncoding applies only on Values
    if (itemToMonitor.dataEncoding.name && itemToMonitor.attributeId !== AttributeIds.Value) {
        return handle_error(StatusCodes.BadDataEncodingInvalid);
    }

    // check dataEncoding
    if (!is_valid_dataEncoding(itemToMonitor.dataEncoding)) {
        return handle_error(StatusCodes.BadDataEncodingUnsupported);
    }

    // check that item can be read by current user session

    // filter
    var requestedParameters = monitoredItemCreateRequest.requestedParameters;
    var filter = requestedParameters.filter;
    var statusCodeFilter = validateFilter(filter, itemToMonitor , node);
    if (statusCodeFilter !== StatusCodes.Good) {
        return handle_error(statusCodeFilter);
    }
    //xx var monitoringMode      = monitoredItemCreateRequest.monitoringMode; // Disabled, Sampling, Reporting
    //xx var requestedParameters = monitoredItemCreateRequest.requestedParameters;

    var monitoredItemCreateResult = self._createMonitoredItemStep2(timestampsToReturn, monitoredItemCreateRequest, node);

    assert(monitoredItemCreateResult.statusCode === StatusCodes.Good);

    var monitoredItem = self.getMonitoredItem(monitoredItemCreateResult.monitoredItemId);
    assert(monitoredItem);

    // TODO: fix old way to set node. !!!!
    monitoredItem.setNode(node);

    self.emit("monitoredItem", monitoredItem, itemToMonitor);

    self._createMonitoredItemStep3(monitoredItem,monitoredItemCreateRequest);

    return monitoredItemCreateResult;

};

var g_monitoredItemId =1;
function getNextMonitoredItemId() {
    return g_monitoredItemId++;
}
/**
 *
 * @method _createMonitoredItemStep2
 * @param timestampsToReturn
 * @param {MonitoredItemCreateRequest} monitoredItemCreateRequest - the parameters describing the monitored Item to create
 * @param node {BaseNode}
 * @return {subscription_service.MonitoredItemCreateResult}
 * @private
 */
Subscription.prototype._createMonitoredItemStep2 = function (timestampsToReturn, monitoredItemCreateRequest, node) {

    var self = this;

    // note : most of the parameter inconsistencies shall have been handled by the caller
    // any error here will raise an assert here

    assert(monitoredItemCreateRequest instanceof MonitoredItemCreateRequest);
    var itemToMonitor = monitoredItemCreateRequest.itemToMonitor;

    //xx check if attribute Id invalid (we only support Value or EventNotifier )
    //xx assert(itemToMonitor.attributeId !== AttributeIds.INVALID);

    self.monitoredItemIdCounter += 1;


    var monitoredItemId = getNextMonitoredItemId();

    var requestedParameters = monitoredItemCreateRequest.requestedParameters;

    // adjust requestedParameters.samplingInterval
    requestedParameters.samplingInterval = self.adjustSamplingInterval(requestedParameters.samplingInterval, node);

    // reincorporate monitoredItemId and itemToMonitor into the requestedParameters
    requestedParameters.monitoredItemId = monitoredItemId;
    requestedParameters.itemToMonitor = itemToMonitor;


    var monitoredItem = new MonitoredItem(requestedParameters);
    monitoredItem.timestampsToReturn = timestampsToReturn;
    monitoredItem.$subscription = self;

    assert(monitoredItem.monitoredItemId === monitoredItemId);
    self.monitoredItems[monitoredItemId] = monitoredItem;

    var filterResult = _process_filter(node, requestedParameters.filter);


    var monitoredItemCreateResult = new subscription_service.MonitoredItemCreateResult({
        statusCode: StatusCodes.Good,
        monitoredItemId: monitoredItemId,
        revisedSamplingInterval: monitoredItem.samplingInterval,
        revisedQueueSize: monitoredItem.queueSize,
        filterResult: filterResult
    });
    return monitoredItemCreateResult;
};


Subscription.prototype._createMonitoredItemStep3 = function (monitoredItem,monitoredItemCreateRequest) {

    assert(monitoredItem.monitoringMode === MonitoringMode.Invalid);
    assert(_.isFunction(monitoredItem.samplingFunc));
    var monitoringMode = monitoredItemCreateRequest.monitoringMode; // Disabled, Sampling, Reporting
    monitoredItem.setMonitoringMode(monitoringMode);

};

/**
 * get a monitoredItem by Id.
 * @method getMonitoredItem
 * @param monitoredItemId  {Number} the id of the monitored item to get.
 * @return {MonitoredItem}
 */
Subscription.prototype.getMonitoredItem = function (monitoredItemId) {
    assert(_.isFinite(monitoredItemId));
    var self = this;
    return self.monitoredItems[monitoredItemId];
};

/**
 * getMonitoredItems is used to get information about monitored items of a subscription.Its intended
 * use is defined in Part 4. This method is the implementation of the Standard OPCUA GetMonitoredItems Method.
 * @method getMonitoredItems
 * @param  result.serverHandles {Int32[]} Array of serverHandles for all MonitoredItems of the subscription identified by subscriptionId.
 *         result.clientHandles {Int32[]} Array of clientHandles for all MonitoredItems of the subscription identified by subscriptionId.
 *         result.statusCode    {StatusCode}
 * from spec:
 * This method can be used to get the  list of monitored items in a subscription if CreateMonitoredItems failed due to
 * a network interruption and the client does not know if the creation succeeded in the server.
 *
 */
Subscription.prototype.getMonitoredItems = function (/*out*/ result) {

    result = result || {};
    var subscription = this;
    result.serverHandles = [];
    result.clientHandles = [];
    result.statusCode = StatusCodes.Good;

    Object.keys(subscription.monitoredItems).forEach(function (monitoredItemId) {

        var monitoredItem = subscription.getMonitoredItem(monitoredItemId);

        result.clientHandles.push(monitoredItem.clientHandle);
        // TODO:  serverHandle is defined anywhere in the OPCUA Specification 1.02
        //        I am not sure what shall be reported for serverHandle...
        //        using monitoredItem.monitoredItemId instead...
        //        May be a clarification in the OPCUA Spec is required.
        result.serverHandles.push(monitoredItemId);

    });
    return result;
};

MonitoredItem.prototype.resendInitialValues = function() {
    // tte first Publish response(s) after the TransferSubscriptions call shall contain the current values of all
    // Monitored Items in the Subscription where the Monitoring Mode is set to Reporting.
    // the first Publish response after the TransferSubscriptions call shall contain only the value changes since
    // the last Publish response was sent.
    // This parameter only applies to MonitoredItems used for monitoring Attribute changes.
    var self = this;
    self._stop_sampling();
    self._start_sampling(true);
};

Subscription.prototype.resendInitialValues = function(){
    var subscription = this;
    _.forEach(subscription.monitoredItems,function (monitoredItem/*,monitoredItemId*/) {
        monitoredItem.resendInitialValues();
    });
};


/**
 * remove a monitored Item from the subscription.
 * @method removeMonitoredItem
 * @param monitoredItemId  {Number} the id of the monitored item to get.
 */
Subscription.prototype.removeMonitoredItem = function (monitoredItemId) {

    debugLog("Removing monitoredIem ", monitoredItemId);

    assert(_.isFinite(monitoredItemId));
    var self = this;
    if (!self.monitoredItems.hasOwnProperty(monitoredItemId)) {
        return StatusCodes.BadMonitoredItemIdInvalid;
    }

    var monitoredItem = self.monitoredItems[monitoredItemId];

    monitoredItem.terminate();

    /**
     *
     * notify that a monitored item has been removed from the subscription
     * @event removeMonitoredItem
     * @param monitoredItem {MonitoredItem}
     */
    self.emit("removeMonitoredItem", monitoredItem);

    delete self.monitoredItems[monitoredItemId];

    return StatusCodes.Good;

};



/**
 * @property hasMonitoredItemNotifications true if monitored Item have uncollected Notifications
 * @type {Boolean}
 */
Subscription.prototype.__defineGetter__("hasMonitoredItemNotifications", function () {
    var self = this;
    if (self._hasMonitoredItemNotifications) {
        return true;
    }
    var keys = Object.keys(self.monitoredItems);
    var i,key;
    var n = keys.length;
    for (i=0;i<n;i++) {
        key = keys[i];
        var monitoredItem = self.monitoredItems[key];
        if (monitoredItem.hasMonitoredItemNotifications) {
            self._hasMonitoredItemNotifications = true;
            return true;
        }
    }
    return false;
});

/**
 * extract up to maxNotificationsPerPublish notifications
 * @param monitoredItems
 * @param maxNotificationsPerPublish
 * @return {Array}
 */
function extract_notifications_chunk(monitoredItems, maxNotificationsPerPublish) {

    var n = maxNotificationsPerPublish === 0 ?
        monitoredItems.length :
        Math.min(monitoredItems.length, maxNotificationsPerPublish);

    var chunk_monitoredItems = [];
    while (n) {
        chunk_monitoredItems.push(monitoredItems.shift());
        n--;
    }
    return chunk_monitoredItems;
}

function add_all_in(notifications,all_notifications) {
  for(var i =0;i <notifications.length;i++) {
      var n = notifications[i];
      all_notifications.push(n);
  }
}

function filter_instanceof(Class,e) {
    return (e instanceof Class);
}
// collect DataChangeNotification
Subscription.prototype._collectNotificationData = function () {

    var self = this;
    var notifications = [];

    // reset cache ...
    self._hasMonitoredItemNotifications = false;

    var all_notifications = new Dequeue();

    // visit all monitored items
    var keys = Object.keys(self.monitoredItems);
    var i,key;
    var n = keys.length;
    for (i=0;i<n;i++) {
        key = keys[i];
        var monitoredItem = self.monitoredItems[key];
        notifications = monitoredItem.extractMonitoredItemNotifications();
        add_all_in(notifications,all_notifications);
    }

    var notificationsMessage = [];

    while (all_notifications.length > 0) {

        // split into one or multiple dataChangeNotification with no more than
        //  self.maxNotificationsPerPublish monitoredItems
        var notifications_chunk = extract_notifications_chunk(all_notifications, self.maxNotificationsPerPublish);

        // separate data for DataChangeNotification (MonitoredItemNotification) from data for EventNotificationList(EventFieldList)
        var dataChangedNotificationData = notifications_chunk.filter(filter_instanceof.bind(null,subscription_service.MonitoredItemNotification));
        var eventNotificationListData = notifications_chunk.filter(filter_instanceof.bind(null,subscription_service.EventFieldList));

        assert(notifications_chunk.length === dataChangedNotificationData.length +  eventNotificationListData.length);

        notifications = [];

        // add dataChangeNotification
        if (dataChangedNotificationData.length) {
            var dataChangeNotification = new DataChangeNotification({
                monitoredItems: dataChangedNotificationData,
                diagnosticInfos: []
            });
            notifications.push(dataChangeNotification);
        }

        // add dataChangeNotification
        if (eventNotificationListData.length) {
            var eventNotificationList = new EventNotificationList({
                events: eventNotificationListData
            });

            notifications.push(eventNotificationList);
        }

        assert(notifications.length === 1 || notifications.length === 2);
        notificationsMessage.push(notifications);
    }

    assert(notificationsMessage instanceof Array);
    return notificationsMessage;
};

Subscription.prototype._harvestMonitoredItems = function () {

    var self = this;

    // Only collect data change notification for the time being
    var notificationData = self._collectNotificationData();
    assert(notificationData instanceof Array);

    // istanbul ignore next
    if (doDebug) {
        debugLog("Subscription#_harvestMonitoredItems =>",notificationData.length);
    }
    notificationData.forEach(function (notificationMessage) {
        self._addNotificationMessage(notificationMessage);
    });
    self._hasMonitoredItemNotifications = false;

};
Subscription.prototype.__defineGetter__("subscriptionId",function(){ return this.id; });


Subscription.prototype.notifyTransfer = function() {

    // OPCUA UA Spec 1.0.3 : part 3 - page 82 - 5.13.7 TransferSubscriptions:
    // If the Server transfers the Subscription to the new Session, the Server shall issue a StatusChangeNotification
    // notificationMessage with the status code Good_SubscriptionTransferred to the old Session.
    var self = this;

    console.warn(" Subscription => Notifying Transfer                                  ".bgWhite.red);

    var notificationData = [new StatusChangeNotification({statusCode: StatusCodes.GoodSubscriptionTransferred})];

    self.publishEngine.send_notification_message({
        subscriptionId: self.id,
        sequenceNumber: self._get_next_sequence_number(),
        notificationData: notificationData,
        moreNotifications: false
    },true);

};


exports.Subscription = Subscription;