"use strict";
/**
* @module opcua.server
*
*/
var assert = require("node-opcua-assert");
var _ = require("underscore");
var EventEmitter = require("events").EventEmitter;
var util = require("util");
var subscription_service = require("node-opcua-service-subscription");
var read_service = require("node-opcua-service-read");
var DataValue = require("node-opcua-data-value").DataValue;
var Variant = require("node-opcua-variant").Variant;
var StatusCodes = require("node-opcua-status-code").StatusCodes;
var AttributeIds = require("node-opcua-data-model").AttributeIds;
var BaseNode = require("node-opcua-address-space").BaseNode;
var sameDataValue = require("node-opcua-data-value").sameDataValue;
var sameVariant = require("node-opcua-variant/src/variant_tools").sameVariant;
var MonitoringMode = subscription_service.MonitoringMode;
var MonitoringParameters = subscription_service.MonitoringParameters;
var MonitoredItemModifyResult = subscription_service.MonitoredItemModifyResult;
var TimestampsToReturn = read_service.TimestampsToReturn;
var EventFilter = require("node-opcua-service-filter").EventFilter;
var apply_timestamps = require("node-opcua-data-value").apply_timestamps;
var defaultItemToMonitor = {indexRange: null, attributeId: read_service.AttributeIds.Value};
var SessionContext = require("node-opcua-address-space").SessionContext;
var minimumSamplingInterval = 50; // 50 ms as a minimum sampling interval
var defaultSamplingInterval = 1500; // 1500 ms as a default sampling interval
var maximumSamplingInterval = 1000 * 60 * 60; // 1 hour !
var debugLog = require("node-opcua-debug").make_debugLog(__filename);
var doDebug = require("node-opcua-debug").checkDebugFlag(__filename);
function _adjust_sampling_interval(samplingInterval) {
if (samplingInterval === 0) {
return samplingInterval;
}
assert(samplingInterval >= 0, " this case should have been prevented outside");
samplingInterval = samplingInterval || defaultSamplingInterval;
maximumSamplingInterval = maximumSamplingInterval || defaultSamplingInterval;
samplingInterval = Math.max(samplingInterval, minimumSamplingInterval);
samplingInterval = Math.min(samplingInterval, maximumSamplingInterval);
return samplingInterval;
}
var maxQueueSize = 5000;
function _adjust_queue_size(queueSize) {
queueSize = Math.min(queueSize, maxQueueSize);
queueSize = Math.max(1, queueSize);
return queueSize;
}
/**
* a server side monitored item
*
* - Once created, the MonitoredItem will raised an "samplingEvent" event every "samplingInterval" millisecond
* until {{#crossLink "MonitoredItem/terminate:method"}}{{/crossLink}} is called.
*
* - It is up to the event receiver to call {{#crossLink "MonitoredItem/recordValue:method"}}{{/crossLink}}.
*
* @class MonitoredItem
* @param options the options
* @param options.clientHandle {Number} - the client handle
* @param options.samplingInterval {Number} - the sampling Interval
* @param options.filter {ExtensionObject}
* @param options.discardOldest {boolean} - if discardOldest === true, older items are removed from the queue
* when
* @param options.queueSize {Number} - the size of the queue.
* @param options.monitoredItemId {Number} the monitoredItem Id assigned by the server to this monitoredItem.
* @param options.itemToMonitor
* @param options.monitoringMode
* @param options.timestampsToReturn
* @constructor
*/
function MonitoredItem(options) {
assert(options.hasOwnProperty("monitoredItemId"));
assert(!options.monitoringMode, "use setMonitoring mode explicitly to activate the monitored item");
EventEmitter.apply(this, arguments);
options.itemToMonitor = options.itemToMonitor || defaultItemToMonitor;
var self = this;
self._samplingId = null;
self._set_parameters(options);
self.monitoredItemId = options.monitoredItemId; //( known as serverHandle)
self.queue = [];
self.overflow = false;
self.oldDataValue = new DataValue({statusCode: StatusCodes.BadDataUnavailable}); // unset initially
// user has to call setMonitoringMode
self.monitoringMode = MonitoringMode.Invalid;
self.timestampsToReturn = options.timestampsToReturn || TimestampsToReturn.Neither;
self.itemToMonitor = options.itemToMonitor;
self.filter = options.filter || null;
/**
* @property node the associated node object in the address space
* @type {BaseNode|null}
*/
self._node = null;
self._semantic_version = 0;
if (doDebug) {
debugLog("Monitoring ", options.itemToMonitor.toString());
}
}
util.inherits(MonitoredItem, EventEmitter);
var ObjectRegistry = require("node-opcua-object-registry").ObjectRegistry;
MonitoredItem.registry = new ObjectRegistry();
MonitoredItem.minimumSamplingInterval = minimumSamplingInterval;
MonitoredItem.defaultSamplingInterval = defaultSamplingInterval;
MonitoredItem.maximumSamplingInterval = maximumSamplingInterval;
function _validate_parameters(options) {
//xx assert(options instanceof MonitoringParameters);
assert(options.hasOwnProperty("clientHandle"));
assert(options.hasOwnProperty("samplingInterval"));
assert(_.isFinite(options.clientHandle));
assert(_.isFinite(options.samplingInterval));
assert(_.isBoolean(options.discardOldest));
assert(_.isFinite(options.queueSize));
assert(options.queueSize >= 0);
}
MonitoredItem.prototype.__defineGetter__("node", function () {
return this._node;
});
MonitoredItem.prototype.__defineSetter__("node", function () {
throw new Error("Unexpected way to set node");
});
MonitoredItem.prototype.setNode = function (node) {
var self = this;
assert(!self.node || self.node === node, "node already set");
self._node = node;
self._semantic_version = node.semantic_version;
};
MonitoredItem.prototype._stop_sampling = function () {
var self = this;
MonitoredItem.registry.unregister(self);
if (self._on_opcua_event_received_callback) {
assert(_.isFunction(self._on_opcua_event_received_callback));
self.node.removeListener("event", self._on_opcua_event_received_callback);
self._on_opcua_event_received_callback = null;
}
if (self._attribute_changed_callback) {
assert(_.isFunction(self._attribute_changed_callback));
var event_name = BaseNode.makeAttributeEventName(self.itemToMonitor.attributeId);
self.node.removeListener(event_name, self._attribute_changed_callback);
self._attribute_changed_callback = null;
}
if (self._value_changed_callback) {
// samplingInterval was 0 for a exception-based data Item
// we setup a event listener that we need to unwind here
assert(_.isFunction(self._value_changed_callback));
assert(!self._samplingId);
self.node.removeListener("value_changed", self._value_changed_callback);
self._value_changed_callback = null;
}
if (self._semantic_changed_callback) {
assert(_.isFunction(self._semantic_changed_callback));
assert(!self._samplingId);
self.node.removeListener("semantic_changed", self._semantic_changed_callback);
self._semantic_changed_callback = null;
}
if (self._samplingId) {
self._clear_timer();
}
assert(!self._samplingId);
assert(!self._value_changed_callback);
assert(!self._semantic_changed_callback);
assert(!self._attribute_changed_callback);
assert(!self._on_opcua_event_received_callback);
};
MonitoredItem.prototype._on_value_changed = function (dataValue) {
var self = this;
assert(dataValue instanceof DataValue);
self.recordValue(dataValue, false);
};
MonitoredItem.prototype._on_semantic_changed = function () {
var self = this;
var dataValue = self.node.readValue();
self._on_value_changed(dataValue);
};
var extractEventFields = require("node-opcua-service-filter").extractEventFields;
MonitoredItem.prototype._on_opcua_event = function (eventData) {
var self = this;
assert(!self.filter || self.filter instanceof EventFilter);
var selectClauses = self.filter ? self.filter.selectClauses : [];
var eventFields = extractEventFields(selectClauses, eventData);
// istanbul ignore next
if (doDebug) {
console.log(" RECEIVED INTERNAL EVENT THAT WE ARE MONITORING");
console.log(self.filter ? self.filter.toString() : "no filter");
eventFields.forEach(function (e) {
console.log(e.toString());
});
}
self._enqueue_event(eventFields);
};
MonitoredItem.prototype._getSession = function () {
var self = this;
if (!self.$subscription) {
return null;
}
if (!self.$subscription.$session) {
return null;
}
return self.$subscription.$session;
};
MonitoredItem.prototype._start_sampling = function (recordInitialValue) {
var self = this;
// make sure oldDataValue is scrapped so first data recording can happen
self.oldDataValue = new DataValue({statusCode: StatusCodes.BadDataUnavailable}); // unset initially
self._stop_sampling();
MonitoredItem.registry.register(self);
var context = new SessionContext({session: self._getSession()});
if (self.itemToMonitor.attributeId === AttributeIds.EventNotifier) {
// istanbul ignore next
if (doDebug) {
debugLog("xxxxxx monitoring EventNotifier on", self.node.nodeId.toString(), self.node.browseName.toString());
}
// we are monitoring OPCUA Event
self._on_opcua_event_received_callback = self._on_opcua_event.bind(self);
self.node.on("event", self._on_opcua_event_received_callback);
return;
}
if (self.itemToMonitor.attributeId !== AttributeIds.Value) {
// sampling interval only applies to Value Attributes.
self.samplingInterval = 0; // turned to exception-based regardless of requested sampling interval
// non value attribute only react on value change
self._attribute_changed_callback = self._on_value_changed.bind(this);
var event_name = BaseNode.makeAttributeEventName(self.itemToMonitor.attributeId);
self.node.on(event_name, self._attribute_changed_callback);
if (recordInitialValue) {
// read initial value
var dataValue = self.node.readAttribute(context, self.itemToMonitor.attributeId);
self.recordValue(dataValue, true);
}
return;
}
if (self.samplingInterval === 0) {
// we have a exception-based dataItem : event based model, so we do not need a timer
// rather , we setup the "value_changed_event";
self._value_changed_callback = self._on_value_changed.bind(this);
self._semantic_changed_callback = self._on_semantic_changed.bind(this);
self.node.on("value_changed", self._value_changed_callback);
self.node.on("semantic_changed", self._semantic_changed_callback);
// initiate first read
if (recordInitialValue) {
//xx setImmediate(function() {
self.node.readValueAsync(context, function (err, dataValue) {
self.recordValue(dataValue, true);
});
//xx });
}
} else {
self._set_timer();
if (recordInitialValue) {
setImmediate(function () {
//xx console.log("Record Initial Value ",self.node.nodeId.toString());
// initiate first read (this requires self._samplingId to be set)
self._on_sampling_timer();
});
}
}
};
MonitoredItem.prototype.setMonitoringMode = function (monitoringMode) {
var self = this;
assert(monitoringMode !== MonitoringMode.Invalid);
if (monitoringMode === self.monitoringMode) {
// nothing to do
return;
}
var old_monitoringMode = self.monitoringMode;
self.monitoringMode = monitoringMode;
if (self.monitoringMode === MonitoringMode.Disabled) {
self._stop_sampling();
// OPCUA 1.03 part 4 : $5.12.4
// setting the mode to DISABLED causes all queued Notifications to be deleted
self.queue = [];
self.overflow = false;
} else {
assert(self.monitoringMode === MonitoringMode.Sampling || self.monitoringMode === MonitoringMode.Reporting);
// OPCUA 1.03 part 4 : $5.12.1.3
// When a MonitoredItem is enabled (i.e. when the MonitoringMode is changed from DISABLED to
// SAMPLING or REPORTING) or it is created in the enabled state, the Server shall report the first
// sample as soon as possible and the time of this sample becomes the starting point for the next
// sampling interval.
var recordInitialValue = (old_monitoringMode === MonitoringMode.Invalid || old_monitoringMode === MonitoringMode.Disabled);
self._start_sampling(recordInitialValue);
}
};
MonitoredItem.prototype._set_parameters = function (options) {
var self = this;
_validate_parameters(options);
self.clientHandle = options.clientHandle;
// The Server may support data that is collected based on a sampling model or generated based on an
// exception-based model. The fastest supported sampling interval may be equal to 0, which indicates
// that the data item is exception-based rather than being sampled at some period. An exception-based
// model means that the underlying system does not require sampling and reports data changes.
self.samplingInterval = _adjust_sampling_interval(options.samplingInterval);
self.discardOldest = options.discardOldest;
self.queueSize = _adjust_queue_size(options.queueSize);
};
/**
* Terminate the MonitoredItem.
* @method terminate
*
* This will stop the internal sampling timer.
*/
MonitoredItem.prototype.terminate = function () {
var self = this;
self._stop_sampling();
};
/**
* @method _on_sampling_timer
* @private
* request
*
*/
MonitoredItem.prototype._on_sampling_timer = function () {
var self = this;
// istanbul ignore next
if (doDebug) {
debugLog("MonitoredItem#_on_sampling_timer", self.node ? self.node.nodeId.toString() : "null", " isSampling?=", self._is_sampling);
}
if (self._samplingId) {
assert(self.monitoringMode === MonitoringMode.Sampling || self.monitoringMode === MonitoringMode.Reporting);
if (self._is_sampling) {
// previous sampling call is not yet completed..
// there is nothing we can do about it except waiting until next tick.
// note : see also issue #156 on github
return;
}
//xx console.log("xxxx ON SAMPLING");
assert(!self._is_sampling, "sampling func shall not be re-entrant !! fix it");
assert(_.isFunction(self.samplingFunc));
self._is_sampling = true;
self.samplingFunc.call(self, self.oldDataValue, function (err, newDataValue) {
if (err) {
console.log(" SAMPLING ERROR =>", err);
} else {
self._on_value_changed(newDataValue);
}
self._is_sampling = false;
});
} else {
/* istanbul ignore next */
debugLog("_on_sampling_timer call but MonitoredItem has been terminated !!! ");
}
};
var extractRange = require("node-opcua-data-value").extractRange;
var DataChangeFilter = subscription_service.DataChangeFilter;
var DataChangeTrigger = subscription_service.DataChangeTrigger;
var DeadbandType = subscription_service.DeadbandType;
function statusCodeHasChanged(newDataValue, oldDataValue) {
assert(newDataValue instanceof DataValue);
assert(oldDataValue instanceof DataValue);
return newDataValue.statusCode !== oldDataValue.statusCode;
}
var check_deadband = require("node-opcua-service-subscription").check_deadband;
function valueHasChanged(self, newDataValue, oldDataValue, deadbandType, deadbandValue) {
assert(newDataValue instanceof DataValue);
assert(oldDataValue instanceof DataValue);
switch (deadbandType) {
case DeadbandType.None:
assert(newDataValue.value instanceof Variant);
assert(newDataValue.value instanceof Variant);
// No Deadband calculation should be applied.
return check_deadband(oldDataValue.value, newDataValue.value, DeadbandType.None);
case DeadbandType.Absolute:
// AbsoluteDeadband
return check_deadband(oldDataValue.value, newDataValue.value, DeadbandType.Absolute, deadbandValue);
default:
// Percent_2 PercentDeadband (This type is specified in Part 8).
assert(deadbandType === DeadbandType.Percent);
// The range of the deadbandValue is from 0.0 to 100.0 Percent.
assert(deadbandValue >= 0 && deadbandValue <= 100);
// DeadbandType = PercentDeadband
// For this type of deadband the deadbandValue is defined as the percentage of the EURange. That is,
// it applies only to AnalogItems with an EURange Property that defines the typical value range for the
// item. This range shall be multiplied with the deadbandValue and then compared to the actual value change
// to determine the need for a data change notification. The following pseudo code shows how the deadband
// is calculated:
// DataChange if (absolute value of (last cached value - current value) >
// (deadbandValue/100.0) * ((high-low) of EURange)))
//
// Specifying a deadbandValue outside of this range will be rejected and reported with the
// StatusCode Bad_DeadbandFilterInvalid (see Table 27).
// If the Value of the MonitoredItem is an array, then the deadband calculation logic shall be applied to
// each element of the array. If an element that requires a DataChange is found, then no further
// deadband checking is necessary and the entire array shall be returned.
assert(self.node !== null, "expecting a valid address_space object here to get access the the EURange");
if (self.node.euRange) {
// double,double
var rangeVariant = self.node.euRange.readValue().value;
var range = rangeVariant.value.high - rangeVariant.value.high;
assert(_.isFinite(range));
return check_deadband(oldDataValue.value, newDataValue.value, DeadbandType.Percent, deadbandValue, range);
}
return true;
}
}
function timestampHasChanged(t1, t2) {
if ((t1 || !t2) || (t2 || !t1)) {
return true;
}
if (!t1 || !t2) {
return false;
}
return t1.getTime() !== t2.getTime();
}
function apply_datachange_filter(self, newDataValue, oldDataValue) {
assert(self.filter);
assert(self.filter instanceof DataChangeFilter);
assert(newDataValue instanceof DataValue);
assert(oldDataValue instanceof DataValue);
var trigger = self.filter.trigger;
switch (trigger.value) {
case DataChangeTrigger.Status.value: // Status
// Report a notification ONLY if the StatusCode associated with
// the value changes. See Table 166 for StatusCodes defined in
// this standard. Part 8 specifies additional StatusCodes that are
// valid in particular for device data.
return statusCodeHasChanged(newDataValue, oldDataValue);
case DataChangeTrigger.StatusValue.value: // StatusValue
// Report a notification if either the StatusCode or the value
// change. The Deadband filter can be used in addition for
// filtering value changes.
// This is the default setting if no filter is set.
return statusCodeHasChanged(newDataValue, oldDataValue) ||
valueHasChanged(self, newDataValue, oldDataValue, self.filter.deadbandType, self.filter.deadbandValue);
default: // StatusValueTimestamp
// Report a notification if either StatusCode, value or the
// SourceTimestamp change.
//
// If a Deadband filter is specified,this trigger has the same behaviour as STATUS_VALUE_1.
//
// If the DataChangeFilter is not applied to the monitored item, STATUS_VALUE_1
// is the default reporting behaviour
assert(trigger === DataChangeTrigger.StatusValueTimestamp);
return timestampHasChanged(newDataValue.sourceTimestamp, oldDataValue.sourceTimestamp) ||
statusCodeHasChanged(newDataValue, oldDataValue) ||
valueHasChanged(self, newDataValue, oldDataValue, self.filter.deadbandType, self.filter.deadbandValue);
}
return false;
}
function apply_filter(self, newDataValue) {
if (!self.oldDataValue) {
return true; // keep
}
if (self.filter instanceof DataChangeFilter) {
return apply_datachange_filter(self, newDataValue, self.oldDataValue);
}
return true; // keep
// else {
// // if filter not set, by default report changes to Status or Value only
// return !sameDataValue(newDataValue, self.oldDataValue, TimestampsToReturn.Neither);
// }
// return true; // keep
}
/**
* @property isSampling
* @type boolean
*/
MonitoredItem.prototype.__defineGetter__("isSampling", function () {
var self = this;
return !!self._samplingId || _.isFunction(self._value_changed_callback) ||
_.isFunction(self._attribute_changed_callback);
});
/**
* @method recordValue
* @param dataValue {DataValue} the whole dataValue
* @param skipChangeTest {Boolean}
*
* Note: recordValue can only be called within timer event
*/
MonitoredItem.prototype.recordValue = function (dataValue, skipChangeTest) {
var self = this;
assert(dataValue instanceof DataValue);
assert(dataValue !== self.oldDataValue, "recordValue expects different dataValue to be provided");
// extract the range that we are interested with
dataValue = extractRange(dataValue, self.itemToMonitor.indexRange);
// istanbul ignore next
if (doDebug) {
debugLog("MonitoredItem#recordValue", self.node.nodeId.toString(), self.node.browseName.toString(), " has Changed = ", !sameDataValue(dataValue, self.oldDataValue));
}
// if semantic has changed, value need to be enqueued regardless of other assumptions
var hasSemanticChanged = self.node && (self.node.semantic_version !== self._semantic_version);
if (hasSemanticChanged) {
return self._enqueue_value(dataValue);
}
if (!skipChangeTest) {
var hasChanged = !sameDataValue(dataValue, self.oldDataValue);
if (!hasChanged) {
return;
}
}
if (!apply_filter(self, dataValue)) {
return;
}
if (self.itemToMonitor.indexRange && !self.itemToMonitor.indexRange.isEmpty()) {
// when an indexRange is provided , make sure that no record happens unless
// extracted variant in the selected range has really changed.
if (sameVariant(dataValue.value, self.oldDataValue.value)) {
return;
}
}
// store last value
self._enqueue_value(dataValue);
};
MonitoredItem.prototype._setOverflowBit = function (notification) {
if (notification.hasOwnProperty("value")) {
assert(notification.value.statusCode === StatusCodes.Good);
notification.value.statusCode = StatusCodes.makeStatusCode(notification.value.statusCode, "Overflow | InfoTypeDataValue");
assert(_.isEqual(notification.value.statusCode, StatusCodes.GoodWithOverflowBit));
assert(notification.value.statusCode.hasOverflowBit);
}
};
function setSemanticChangeBit(notification) {
if (notification && notification.hasOwnProperty("value")) {
notification.value.statusCode = StatusCodes.makeStatusCode(notification.value.statusCode, "SemanticChanged");
}
}
MonitoredItem.prototype._enqueue_notification = function (notification) {
var self = this;
if (self.queueSize === 1) {
// ensure queuesize
if (!self.queue || self.queue.length !== 1) {
self.queue = [null];
}
self.queue[0] = notification;
assert(self.queue.length === 1);
} else {
if (self.discardOldest) {
// push new value to queue
self.queue.push(notification);
if (self.queue.length > self.queueSize) {
self.overflow = true;
self.queue.shift(); // remove front element
// set overflow bit
self._setOverflowBit(self.queue[0]);
}
} else {
if (self.queue.length < self.queueSize) {
self.queue.push(notification);
} else {
self.overflow = true;
self._setOverflowBit(notification);
self.queue[self.queue.length - 1] = notification;
}
}
}
assert(self.queue.length >= 1);
};
MonitoredItem.prototype._makeDataChangeNotification = function (dataValue) {
var self = this;
var attributeId = self.itemToMonitor.attributeId;
dataValue = apply_timestamps(dataValue, self.timestampsToReturn, attributeId);
return new subscription_service.MonitoredItemNotification({clientHandle: self.clientHandle, value: dataValue});
};
function isGoodish(statusCode) {
return statusCode.value < 0x10000000;
}
/**
*
* @param dataValue {DataValue}
* @private
*/
MonitoredItem.prototype._enqueue_value = function (dataValue) {
var self = this;
// preconditions:
assert(dataValue instanceof DataValue);
// lets verify that, if status code is good then we have a valid Variant in the dataValue
assert(!isGoodish(dataValue.statusCode) || dataValue.value instanceof Variant);
//xx assert(isGoodish(dataValue.statusCode) || util.isNullOrUndefined(dataValue.value) );
// let's check that data Value is really a different object
// we may end up with corrupted queue if dataValue are recycled and stored as is in notifications
assert(dataValue !== self.oldDataValue, "dataValue cannot be the same object twice!");
// let's check that data Value is really a different object
// we may end up with corrupted queue if dataValue are recycled and stored as is in notifications
assert(!self.oldDataValue || !dataValue.value || (dataValue.value !== self.oldDataValue.value), "dataValue cannot be the same object twice!");
if (!(!self.oldDataValue || !self.oldDataValue.value
|| !dataValue.value || !(dataValue.value.value instanceof Object)
|| (dataValue.value.value !== self.oldDataValue.value.value)) && !(dataValue.value.value instanceof Date)) {
throw new Error("dataValue.value.value cannot be the same object twice! " + self.node.browseName.toString() + " " + dataValue.toString() + " " + self.oldDataValue.toString().cyan);
}
// istanbul ignore next
if (doDebug) {
debugLog("MonitoredItem#_enqueue_value", self.node.nodeId.toString());
}
self.oldDataValue = dataValue;
var notification = self._makeDataChangeNotification(dataValue);
self._enqueue_notification(notification);
};
MonitoredItem.prototype._makeEventFieldList = function (eventFields) {
var self = this;
assert(_.isArray(eventFields));
return new subscription_service.EventFieldList({clientHandle: self.clientHandle, eventFields: eventFields});
};
MonitoredItem.prototype._enqueue_event = function (eventFields) {
var self = this;
debugLog(" MonitoredItem#_enqueue_event");
var notification = self._makeEventFieldList(eventFields);
self._enqueue_notification(notification);
};
MonitoredItem.prototype._empty_queue = function () {
var self = this;
// empty queue
self.queue = [];
self.overflow = false;
};
MonitoredItem.prototype.__defineGetter__("hasMonitoredItemNotifications", function () {
var self = this;
return self.queue.length > 0;
});
/**
* @method extractMonitoredItemNotifications
* @return {Array.<*>}
*/
MonitoredItem.prototype.extractMonitoredItemNotifications = function () {
var self = this;
if (self.monitoringMode !== MonitoringMode.Reporting) {
return [];
}
var notifications = self.queue;
self._empty_queue();
// apply semantic changed bit if necessary
if (notifications.length > 0 && self.node && self._semantic_version < self.node.semantic_version) {
var dataValue = notifications[notifications.length - 1];
setSemanticChangeBit(dataValue);
self._semantic_version = self.node.semantic_version;
}
return notifications;
};
var timers = {};
function appendToTimer(monitoredItem) {
var samplingInterval = monitoredItem.samplingInterval;
var key = samplingInterval.toString();
assert(samplingInterval > 0);
var _t = timers[key];
if (!_t) {
_t = {
monitoredItems: {},
monitoredItemsCount: 0,
_samplingId: false
};
_t._samplingId = setInterval(function () {
_.forEach(_t.monitoredItems, function (m) {
setImmediate(function () {
m._on_sampling_timer();
});
});
}, samplingInterval);
timers[key] = _t;
}
assert(!_t.monitoredItems[monitoredItem.monitoredItemId]);
_t.monitoredItems[monitoredItem.monitoredItemId] = monitoredItem;
_t.monitoredItemsCount++;
return key;
}
function removeFromTimer(monitoredItem) {
var samplingInterval = monitoredItem.samplingInterval;
assert(samplingInterval > 0);
var key = monitoredItem._samplingId;
var _t = timers[key];
if (!_t) {
console.log("cannot find common timer for samplingInterval", key);
return;
}
assert(_t);
assert(_t.monitoredItems[monitoredItem.monitoredItemId]);
delete _t.monitoredItems[monitoredItem.monitoredItemId];
_t.monitoredItemsCount--;
assert(_t.monitoredItemsCount >= 0);
if (_t.monitoredItemsCount === 0) {
clearInterval(_t._samplingId);
delete timers[key];
}
}
var useCommonTimer = true;
MonitoredItem.prototype._clear_timer = function () {
var self = this;
if (self._samplingId) {
if (useCommonTimer) {
removeFromTimer(self);
} else {
clearInterval(self._samplingId);
}
self._samplingId = 0;
}
};
MonitoredItem.prototype._set_timer = function () {
var self = this;
assert(self.samplingInterval >= minimumSamplingInterval);
assert(!self._samplingId);
if (useCommonTimer) {
self._samplingId = appendToTimer(self);
} else {
// settle periodic sampling
self._samplingId = setInterval(function () {
self._on_sampling_timer();
}, self.samplingInterval);
}
};
MonitoredItem.prototype._adjust_queue_to_match_new_queue_size = function () {
var self = this;
// adjust queue size if necessary
if (self.queueSize < self.queue.length) {
if (self.discardOldest) {
self.queue.splice(0, self.queue.length - self.queueSize);
} else {
var lastElement = self.queue[self.queue.length - 1];
// only keep queueSize first element, discard others
self.queue.splice(self.queueSize);
self.queue[self.queue.length - 1] = lastElement;
}
}
if (self.queueSize <= 1) {
self.overflow = false;
// unset OverFlowBit
if (self.queue.length === 1) {
if (self.queue[0].value) {
if (self.queue[0].value.statusCode.hasOverflowBit) {
self.queue[0].value.statusCode.unset("Overflow | InfoTypeDataValue");
}
}
}
}
assert(self.queue.length <= self.queueSize);
};
MonitoredItem.prototype._adjust_sampling = function (old_samplingInterval) {
var self = this;
if (old_samplingInterval !== self.samplingInterval) {
self._start_sampling();
//xx self._clear_timer(true);
//xx self._set_timer();
}
};
var validateFilter = require("./validate_filter").validateFilter;
MonitoredItem.prototype.modify = function (timestampsToReturn, options) {
assert(options instanceof MonitoringParameters);
var self = this;
var old_samplingInterval = self.samplingInterval;
self.timestampsToReturn = timestampsToReturn || self.timestampsToReturn;
if (old_samplingInterval !== 0 && options.samplingInterval === 0) {
options.samplingInterval = minimumSamplingInterval; // fastest possible
}
self._set_parameters(options);
self._adjust_queue_to_match_new_queue_size();
self._adjust_sampling(old_samplingInterval);
if (options.filter) {
var statusCodeFilter = validateFilter(options.filter, self.itemToMonitor, self.node);
if (statusCodeFilter !== StatusCodes.Good) {
return new MonitoredItemModifyResult({
statusCode: statusCodeFilter
});
}
}
// validate filter
// note : The DataChangeFilter does not have an associated result structure.
var filterResult = null; // new subscription_service.DataChangeFilter
return new MonitoredItemModifyResult({
statusCode: StatusCodes.Good,
revisedSamplingInterval: self.samplingInterval,
revisedQueueSize: self.queueSize,
filterResult: filterResult
});
};
exports.MonitoredItem = MonitoredItem;