QuickOPC User's Guide and Reference
UADataChangeNotificationObservable<TValue> Class
Members  Example 



OpcLabs.EasyOpcUA Assembly > OpcLabs.EasyOpc.UA.Reactive Namespace : UADataChangeNotificationObservable<TValue> Class
The type of OPC values provided.
Push-based notification provider (observable) for changes in OPC Unified Architecture monitored item(s).
Object Model
UADataChangeNotificationObservable<TValue> ClassEasyUAClientSelector Class
Syntax
'Declaration
 
<ComVisibleAttribute(False)>
<TypeConverterAttribute(System.ComponentModel.ExpandableObjectConverter)>
<CLSCompliantAttribute(True)>
<ValueControlAttribute("OpcLabs.BaseLib.Forms.Common.ObjectSerializationControl, OpcLabs.BaseLibForms, Version=5.72.465.1, Culture=neutral, PublicKeyToken=6faddca41dacb409", 
   DefaultReadWrite=False, 
   Export=True, 
   PageId=10001)>
<SerializableAttribute()>
Public NotInheritable Class UADataChangeNotificationObservable(Of TValue) 
   Inherits UAReactive
   Implements LINQPad.ICustomMemberProvider, OpcLabs.BaseLib.ComTypes._Info, OpcLabs.BaseLib.ComTypes._Object2, System.ICloneable, System.IObservable(Of EasyUADataChangeNotificationEventArgs(Of TValue)), System.Runtime.Serialization.ISerializable, System.Xml.Serialization.IXmlSerializable 
[ComVisible(false)]
[TypeConverter(System.ComponentModel.ExpandableObjectConverter)]
[CLSCompliant(true)]
[ValueControl("OpcLabs.BaseLib.Forms.Common.ObjectSerializationControl, OpcLabs.BaseLibForms, Version=5.72.465.1, Culture=neutral, PublicKeyToken=6faddca41dacb409", 
   DefaultReadWrite=false, 
   Export=true, 
   PageId=10001)]
[Serializable()]
public sealed class UADataChangeNotificationObservable<TValue> : UAReactive, LINQPad.ICustomMemberProvider, OpcLabs.BaseLib.ComTypes._Info, OpcLabs.BaseLib.ComTypes._Object2, System.ICloneable, System.IObservable<EasyUADataChangeNotificationEventArgs<TValue>>, System.Runtime.Serialization.ISerializable, System.Xml.Serialization.IXmlSerializable  
[ComVisible(false)]
[TypeConverter(System.ComponentModel.ExpandableObjectConverter)]
[CLSCompliant(true)]
[ValueControl("OpcLabs.BaseLib.Forms.Common.ObjectSerializationControl, OpcLabs.BaseLibForms, Version=5.72.465.1, Culture=neutral, PublicKeyToken=6faddca41dacb409", 
   DefaultReadWrite=false, 
   Export=true, 
   PageId=10001)]
[Serializable()]
generic<typename TValue>
public ref class UADataChangeNotificationObservable sealed : public UAReactive, LINQPad.ICustomMemberProvider, OpcLabs.BaseLib.ComTypes._Info, OpcLabs.BaseLib.ComTypes._Object2, System.ICloneable, System.IObservable<EasyUADataChangeNotificationEventArgs<TValue>>, System.Runtime.Serialization.ISerializable, System.Xml.Serialization.IXmlSerializable  
Type Parameters
TValue
The type of OPC values provided.
Remarks
The notifications will contain OpcLabs.EasyOpc.UA.Generic.UAAttributeData<TValue> for each OPC UA data change. All OPC UA monitored items in one UADataChangeNotificationObservable<TValue> must be assignable to the TValue type. Typically, you will create the observable for items of the same type. You can also choose a common denominator type; for example, if you use System.Object for TValue, you can use it in a type-less manner).

 

OPC DA (Classic) Observables

The DAItemChangedObservable<TValue> class is an observable for changes in OPC Data Access item or multiple items of type TValue. It represents a data stream with information about significant data changes in the subscribed items.

Each significant change is represented by an OnNext message of type EasyDAItemChangedEventArgs<T>. This message is used both for successes, when the Exception property is a null reference and the Vtq property contains valid data, and for failures, when the Exception property is non-null and contains the failure information.

The OnCompleted and OnError messages (methods of the IObserver) are never sent (not even in case of error related to the OPC item), thus the data stream is not terminated. If your application requires, you can process the data stream further, and filter it or split it by success/failure as needed.

For OPC Classic, you can create instances of DAItemChangedObservable<TValue> either by using its constructor, or with use of a static DAItemChangedObservable class with several overloads of the Create method. The static DAItemChangedObservable.Create methods use the default underlying EasyDAClient object for OPC reactive extensions. If you need to set some parameters in the client object, you can use the ClientSelector property to specify them.

This approach allows the code be expressed only in terms of pure OPC logic, and be not tied to the actual way it is implemented.

The following code fragment creates the observable using one of the Create methods:

   DAItemChangedObservable<double> ramp = 
        DAItemChangedObservable.Create<double>(
            "", "OPCLabs.KitServer.2", "Demo.Ramp", 1000);

 

It is recommended that you create the instances using the DAItemChangedObservable.Create method unless you have special needs. 

Examples

// Shows how to create an observable for OPC-DA item changes, and subscribe to it.

using System;
using System.Threading;
using OpcLabs.EasyOpc.DataAccess.Reactive;

namespace ReactiveDocExamples
{
    namespace _DAItemChangedObservable
    {
        partial class Subscribe
        {
            public static void Main1()
            {
                Console.WriteLine("Creating observable...");
                DAItemChangedObservable<double> ramp = 
                    DAItemChangedObservable.Create<double>("", "OPCLabs.KitServer.2", "Demo.Ramp", 1000);

                Console.WriteLine("Subscribing...");
                using (ramp.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.Vtq.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to create an observable for OPC-DA item changes, and subscribe to it with percent deadband.

using System;
using System.Threading;
using OpcLabs.EasyOpc.DataAccess.Reactive;

namespace ReactiveDocExamples
{
    namespace _DAItemChangedObservable
    {
        partial class Subscribe
        {
            public static void PercentDeadband()
            {
                const float percentDeadband = 5.0f;
                Console.WriteLine($"Creating observable with {percentDeadband}% deadband...");
                DAItemChangedObservable<double> ramp = 
                    DAItemChangedObservable.Create<double>("", "OPCLabs.KitServer.2", "Simulation.Ramp 0:100 (10 s)", 
                        requestedUpdateRate:100, percentDeadband:percentDeadband);

                Console.WriteLine("Subscribing...");
                using (ramp.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.Vtq.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

OPC UA (Data) Observables

The UADataChangeNotificationObservable<TValue> class is an observable for changes in OPC-UA monitored item or multiple items of type TValue. It represents a data stream with information about significant data changes in the subscribed items.

Each significant change is represented by an OnNext message of type EasyUADataChangeNotificationEventArgs<T>. This message is used both for successes, when the Exception property is a null reference and the AttributeData property contains valid data, and for failures, when the Exception property is non-null and contains the failure information.

The OnCompleted and OnError messages (methods of the IObserver) are never sent (not even in case of error related to the OPC item), thus the data stream is not terminated. If your application requires, you can process the data stream further, and filter it or split it by success/failure as needed.

For OPC-UA, you can create instances of UADataChangeNotificationObservable<TValue> either by using its constructor, or with use of a static UADataChangeNotificationObservable class with several overloads of the Create method. The static UADataChangeNotificationObservable.Create methods use the default underlying EasyUAClient object for OPC reactive extensions. If you need to set some parameters in the client object, you can use the ClientSelector property to specify them.

This approach allows the code be expressed only in terms of pure OPC logic, and be not tied to the actual way it is implemented. 

It is recommended that you create the instances using the UADataChangeNotificationObservable.Create method unless you have special needs.

Examples

// Shows how to create an observable for OPC-UA monitored item changes, and subscribe to it.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Main1()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to observe OPC UA data changes with a specified data change filter.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void DataChangeTrigger()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853");
                // Report a notification if either the StatusCode or the value change. 
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: UADataChangeTrigger.StatusValue));
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to observe OPC UA data changes with a specified absolute deadband.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void AbsoluteDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double absoluteDeadband = 50;
                Console.WriteLine($"Creating observable with absolute deadband {absoluteDeadband}...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: absoluteDeadband));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to observe OPC UA data changes with a specified percent deadband.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void PercentDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double percentDeadband = 5.0;
                Console.WriteLine($"Creating observable with  {percentDeadband}% deadband...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(
                        samplingInterval: 1000, 
                        new UADataChangeFilter(UADeadbandType.Percent, percentDeadband)));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to create an observable for a range of OPC UA array elements, and subscribe to it.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void IndexRangeList()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10933")
                {
                    IndexRangeList = UAIndexRangeList.OneDimension(2, 4)
                };
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments, monitoringParameters: 1000);
                UADataChangeNotificationObservable<Int32[]> observable =
                    UADataChangeNotificationObservable.Create<Int32[]>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows an OPC UA data change observable with specified timeouts.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Timeouts()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);
                // Set fairly short timeouts (failure can thus occur).
                observable.ClientSelector.Isolated = true;
                observable.ClientSelector.IsolatedParameters.SessionParameters.EndpointSelectionTimeout = 1500;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionConnectTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeoutDebug = 3000;

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

 

Example

.NET

.NET

.NET

.NET

.NET

.NET

.NET

// Shows how to create an observable for OPC-UA monitored item changes, and subscribe to it.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Main1()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to observe OPC UA data changes with a specified data change filter.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void DataChangeTrigger()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853");
                // Report a notification if either the StatusCode or the value change. 
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: UADataChangeTrigger.StatusValue));
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to observe OPC UA data changes with a specified absolute deadband.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void AbsoluteDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double absoluteDeadband = 50;
                Console.WriteLine($"Creating observable with absolute deadband {absoluteDeadband}...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: absoluteDeadband));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to observe OPC UA data changes with a specified percent deadband.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void PercentDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double percentDeadband = 5.0;
                Console.WriteLine($"Creating observable with  {percentDeadband}% deadband...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(
                        samplingInterval: 1000, 
                        new UADataChangeFilter(UADeadbandType.Percent, percentDeadband)));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to create an observable for a range of OPC UA array elements, and subscribe to it.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void IndexRangeList()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10933")
                {
                    IndexRangeList = UAIndexRangeList.OneDimension(2, 4)
                };
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments, monitoringParameters: 1000);
                UADataChangeNotificationObservable<Int32[]> observable =
                    UADataChangeNotificationObservable.Create<Int32[]>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows an OPC UA data change observable with specified timeouts.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Timeouts()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);
                // Set fairly short timeouts (failure can thus occur).
                observable.ClientSelector.Isolated = true;
                observable.ClientSelector.IsolatedParameters.SessionParameters.EndpointSelectionTimeout = 1500;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionConnectTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeoutDebug = 3000;

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to continuously transform values of OPC-UA node, and write the results into a second node.
// Requires Microsoft Reactive Extensions (Rx).

using System;
using System.Reactive.Linq;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.Reactive;

namespace ReactiveDocExamples
{
    namespace _UAReactive
    {
        class Composition
        {
            public static void Pipeline()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating source observable...");
                UADataChangeNotificationObservable<int> source =
                    UADataChangeNotificationObservable.Create<int>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=11017", 100);

                Console.WriteLine("Creating processed observable (takes valid input values and take modulo 1000)...");
                IObservable<int> processed = source
                    .Where(e => e.Exception is null)
                    .Select(e => e.TypedAttributeData.TypedValue % 1000);

                Console.WriteLine("Creating observer to write values into OPC node...");
                UAWriteValueObserver<int> observer =
                    UAWriteValueObserver.Create<int>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10389");

                Console.WriteLine("Monitoring changes of the target OPC node using traditional means...");
                int handle = EasyUAClient.SharedInstance.SubscribeDataChange(
                    endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10389",
                    100, (_, e) => Console.WriteLine(e.AttributeData));

                Console.WriteLine("Subscribing the observer to the processed observable...");
                using (processed.Subscribe(observer))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10 * 1000);

                    Console.WriteLine("Unsubscribing the observer from the processed observable...");
                }

                Console.WriteLine("Finalizing monitoring...");
                EasyUAClient.SharedInstance.UnsubscribeMonitoredItem(handle);

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
Inheritance Hierarchy

System.Object
   OpcLabs.BaseLib.Object2
      OpcLabs.BaseLib.Info
         OpcLabs.EasyOpc.UA.Reactive.UAReactive
            OpcLabs.EasyOpc.UA.Reactive.UADataChangeNotificationObservable<TValue>

Requirements

Target Platforms: .NET Framework: Windows 10 (selected versions), Windows 11 (selected versions), Windows Server 2016, Windows Server 2022; .NET: Linux, macOS, Microsoft Windows

See Also