APIs

Show:
"use strict";
/**
 * @module opcua.client
 */

const util = require("util");
const _ = require("underscore");

const EventEmitter = require("events").EventEmitter;
const subscription_service = require("node-opcua-service-subscription");
const read_service = require("node-opcua-service-read");

const StatusCodes = require("node-opcua-status-code").StatusCodes;
const assert = require("node-opcua-assert").assert;
const TimestampsToReturn = read_service.TimestampsToReturn;

const AttributeIds = require("node-opcua-data-model").AttributeIds;

const resolveNodeId = require("node-opcua-nodeid").resolveNodeId;
const ObjectTypeIds = require("node-opcua-constants").ObjectTypeIds;
const ModifyMonitoredItemsRequest = subscription_service.ModifyMonitoredItemsRequest;
const MonitoredItemModifyRequest = subscription_service.MonitoredItemModifyRequest;


function ClientMonitoredItemBase(subscription, itemToMonitor, monitoringParameters) {
    assert(subscription.constructor.name === "ClientSubscription");
    const self = this;
    self.itemToMonitor = new read_service.ReadValueId(itemToMonitor);
    self.monitoringParameters = new subscription_service.MonitoringParameters(monitoringParameters);
    self.subscription = subscription;
    self.monitoringMode = subscription_service.MonitoringMode.Reporting;
    assert(self.monitoringParameters.clientHandle === 4294967295, "should not have a client handle yet");
}
util.inherits(ClientMonitoredItemBase, EventEmitter);


ClientMonitoredItemBase.prototype._notify_value_change = function (value) {
    const self = this;
    /**
     * Notify the observers that the MonitoredItem value has changed on the server side.
     * @event changed
     * @param value
     */
     try {
       self.emit("changed", value);
     }
     catch(err) {
       console.log("Exception raised inside the event handler called by ClientMonitoredItem.on('change')",err);
       console.log("Please verify the application using this node-opcua client");
     }
};

ClientMonitoredItemBase.prototype._prepare_for_monitoring = function () {

    const self = this;
    assert(self.subscription.subscriptionId !== "pending");
    assert(self.monitoringParameters.clientHandle === 4294967295, "should not have a client handle yet");
    self.monitoringParameters.clientHandle = self.subscription.nextClientHandle();
    assert(self.monitoringParameters.clientHandle > 0 && self.monitoringParameters.clientHandle !== 4294967295);

    // If attributeId is EventNotifier then monitoring parameters need a filter.
    // The filter must then either be DataChangeFilter, EventFilter or AggregateFilter.
    // todo can be done in another way?
    // todo implement AggregateFilter
    // todo support DataChangeFilter
    // todo support whereClause
    if (self.itemToMonitor.attributeId === AttributeIds.EventNotifier) {

        //
        // see OPCUA Spec 1.02 part 4 page 65 : 5.12.1.4 Filter
        // see                 part 4 page 130: 7.16.3 EventFilter
        //                     part 3 page 11 : 4.6 Event Model
        // To monitor for Events, the attributeId element of the ReadValueId structure is the
        // the id of the EventNotifierAttribute

        // OPC Unified Architecture 1.02, Part 4 5.12.1.2 Sampling interval page 64:
        // "A Client shall define a sampling interval of 0 if it subscribes for Events."
        // toDO

        // note : the EventFilter is used when monitoring Events.
        self.monitoringParameters.filter = self.monitoringParameters.filter || new subscription_service.EventFilter({});

        const filter = self.monitoringParameters.filter;
        if (filter._schema.name !== "EventFilter") {
            return {
                error: "Mismatch between attributeId and filter in monitoring parameters : " +
                "Got a " + filter._schema.name + " but a EventFilter object is required when itemToMonitor.attributeId== AttributeIds.EventNotifier"
            };
        }

    } else if (self.itemToMonitor.attributeId === AttributeIds.Value) {
        // the DataChangeFilter and the AggregateFilter are used when monitoring Variable Values

        // The Value Attribute is used when monitoring Variables. Variable values are monitored for a change
        // in value or a change in their status. The filters defined in this standard (see 7.16.2) and in Part 8 are
        // used to determine if the value change is large enough to cause a Notification to be generated for the
        // to do : check 'DataChangeFilter'  && 'AggregateFilter'
    } else {
        if (self.monitoringParameters.filter) {
            return {
                error: "Mismatch between attributeId and filter in monitoring parameters : " +
                "no filter expected when attributeId is not Value  or  EventNotifier"
            };
        }
    }
    return {
        error: null,
        itemToMonitor: self.itemToMonitor,
        monitoringMode: self.monitoringMode,
        requestedParameters: self.monitoringParameters
    };

};
ClientMonitoredItemBase.prototype._after_create = function (monitoredItemResult) {


    const self = this;
    self.statusCode = monitoredItemResult.statusCode;
    /* istanbul ignore else */
    if (monitoredItemResult.statusCode === StatusCodes.Good) {


        self.result = monitoredItemResult;
        self.monitoredItemId = monitoredItemResult.monitoredItemId;
        self.monitoringParameters.samplingInterval = monitoredItemResult.revisedSamplingInterval;
        self.monitoringParameters.queueSize = monitoredItemResult.revisedQueueSize;
        self.filterResult = monitoredItemResult.filterResult;


        self.subscription._add_monitored_item(self.monitoringParameters.clientHandle, self);
        /**
         * Notify the observers that the monitored item is now fully initialized.
         * @event initialized
         */
        self.emit("initialized");

    } else {

        /**
         * Notify the observers that the monitored item has failed to initialized.
         * @event err
         * @param statusCode {StatusCode}
         */
        const err = new Error(monitoredItemResult.statusCode.toString());
        self.emit("err", err.message);
        self.emit("terminated");
    }
};

ClientMonitoredItemBase._toolbox_monitor = function (subscription, timestampsToReturn, monitoredItems, done) {
    assert(_.isFunction(done));
    const itemsToCreate = [];
    for (let i = 0; i < monitoredItems.length; i++) {

        const monitoredItem = monitoredItems[i];
        const itemToCreate = monitoredItem._prepare_for_monitoring(done);
        if (_.isString(itemToCreate.error)) {
            return done(new Error(itemToCreate.error));
        }
        itemsToCreate.push(itemToCreate);
    }

    const createMonitorItemsRequest = new subscription_service.CreateMonitoredItemsRequest({
        subscriptionId: subscription.subscriptionId,
        timestampsToReturn: timestampsToReturn,
        itemsToCreate: itemsToCreate
    });

    assert(subscription.session);
    subscription.session.createMonitoredItems(createMonitorItemsRequest, function (err, response) {

        /* istanbul ignore next */
        if (err) {
            //xx console.log("ClientMonitoredItemBase#_toolbox_monitor:  ERROR in createMonitoredItems ".red, err.message);
            //xx  console.log("ClientMonitoredItemBase#_toolbox_monitor:  ERROR in createMonitoredItems ".red, err);
            //xx  console.log(createMonitorItemsRequest.toString());
        } else {
            assert(response instanceof subscription_service.CreateMonitoredItemsResponse);

            for (let i = 0; i < response.results.length; i++) {
                const monitoredItemResult = response.results[i];
                const monitoredItem = monitoredItems[i];
                monitoredItem._after_create(monitoredItemResult);
            }
        }
        done(err);
    });

};
ClientMonitoredItemBase._toolbox_modify = function (subscription, monitoredItems, parameters, timestampsToReturn, callback) {

    assert(callback === undefined || _.isFunction(callback));

    const itemsToModify = monitoredItems.map(function (monitoredItem) {
        const clientHandle = monitoredItem.monitoringParameters.clientHandle;
        return new MonitoredItemModifyRequest({
            monitoredItemId: monitoredItem.monitoredItemId,
            requestedParameters: _.extend(_.clone(parameters), {clientHandle: clientHandle})
        });
    });
    const modifyMonitoredItemsRequest = new ModifyMonitoredItemsRequest({
        subscriptionId: subscription.subscriptionId,
        timestampsToReturn: timestampsToReturn,
        itemsToModify: itemsToModify
    });

    subscription.session.modifyMonitoredItems(modifyMonitoredItemsRequest, function (err, response) {

        /* istanbul ignore next */
        if (err) {
            return callback(err);
        }
        assert(response.results.length === monitoredItems.length);

        const res = response.results[0];

        /* istanbul ignore next */
        if (response.results.length === 1 && res.statusCode !== StatusCodes.Good) {
            return callback(new Error("Error" + res.statusCode.toString()));
        }
        callback(null, response.results);
    });
};
ClientMonitoredItemBase._toolbox_setMonitoringMode = function (subscription, monitoredItems, monitoringMode, callback) {

    const monitoredItemIds = monitoredItems.map(function (monitoredItem) {
        return monitoredItem.monitoredItemId;
    });

    const setMonitoringModeRequest = {
        subscriptionId: subscription.subscriptionId,
        monitoringMode: monitoringMode,
        monitoredItemIds: monitoredItemIds
    };
    subscription.session.setMonitoringMode(setMonitoringModeRequest, function (err, results) {
        if (!err) {
            monitoredItems.forEach(function (monitoredItem) {
                monitoredItem.monitoringMode = monitoringMode;
            });
        }
        if (callback) {
            callback(err, results ? results[0] : null);
        }
    });

};

exports.ClientMonitoredItemBase = ClientMonitoredItemBase;