QuickOPC User's Guide and Reference
UADataChangeNotificationObservable Class
Members  Example 



OpcLabs.EasyOpcUA Assembly > OpcLabs.EasyOpc.UA.Reactive Namespace : UADataChangeNotificationObservable Class
Static class with methods to create UADataChangeNotificationObservable<TValue> in various ways.
Syntax
'Declaration
 
<ComVisibleAttribute(False)>
Public MustInherit NotInheritable Class UADataChangeNotificationObservable 
'Usage
 
Dim instance As UADataChangeNotificationObservable
[ComVisible(false)]
public static class UADataChangeNotificationObservable 
[ComVisible(false)]
public ref class UADataChangeNotificationObservable abstract sealed 
Remarks

 

OPC DA (Classic) Observables

The DAItemChangedObservable<TValue> class is an observable for changes in OPC Data Access item or multiple items of type TValue. It represents a data stream with information about significant data changes in the subscribed items.

Each significant change is represented by an OnNext message of type EasyDAItemChangedEventArgs<T>. This message is used both for successes, when the Exception property is a null reference and the Vtq property contains valid data, and for failures, when the Exception property is non-null and contains the failure information.

The OnCompleted and OnError messages (methods of the IObserver) are never sent (not even in case of error related to the OPC item), thus the data stream is not terminated. If your application requires, you can process the data stream further, and filter it or split it by success/failure as needed.

For OPC Classic, you can create instances of DAItemChangedObservable<TValue> either by using its constructor, or with use of a static DAItemChangedObservable class with several overloads of the Create method. The static DAItemChangedObservable.Create methods use the default underlying EasyDAClient object for OPC reactive extensions. If you need to set some parameters in the client object, you can use the ClientSelector property to specify them.

This approach allows the code be expressed only in terms of pure OPC logic, and be not tied to the actual way it is implemented.

The following code fragment creates the observable using one of the Create methods:

   DAItemChangedObservable<double> ramp = 
        DAItemChangedObservable.Create<double>(
            "", "OPCLabs.KitServer.2", "Demo.Ramp", 1000);

 

It is recommended that you create the instances using the DAItemChangedObservable.Create method unless you have special needs. 

Examples

// Shows how to create an observable for OPC-DA item changes, and subscribe to it.

using System;
using System.Threading;
using OpcLabs.EasyOpc.DataAccess.Reactive;

namespace ReactiveDocExamples
{
    namespace _DAItemChangedObservable
    {
        partial class Subscribe
        {
            public static void Main1()
            {
                Console.WriteLine("Creating observable...");
                DAItemChangedObservable<double> ramp = 
                    DAItemChangedObservable.Create<double>("", "OPCLabs.KitServer.2", "Demo.Ramp", 1000);

                Console.WriteLine("Subscribing...");
                using (ramp.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.Vtq.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to create an observable for OPC-DA item changes, and subscribe to it with percent deadband.

using System;
using System.Threading;
using OpcLabs.EasyOpc.DataAccess.Reactive;

namespace ReactiveDocExamples
{
    namespace _DAItemChangedObservable
    {
        partial class Subscribe
        {
            public static void PercentDeadband()
            {
                const float percentDeadband = 5.0f;
                Console.WriteLine($"Creating observable with {percentDeadband}% deadband...");
                DAItemChangedObservable<double> ramp = 
                    DAItemChangedObservable.Create<double>("", "OPCLabs.KitServer.2", "Simulation.Ramp 0:100 (10 s)", 
                        requestedUpdateRate:100, percentDeadband:percentDeadband);

                Console.WriteLine("Subscribing...");
                using (ramp.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.Vtq.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

OPC UA (Data) Observables

The UADataChangeNotificationObservable<TValue> class is an observable for changes in OPC-UA monitored item or multiple items of type TValue. It represents a data stream with information about significant data changes in the subscribed items.

Each significant change is represented by an OnNext message of type EasyUADataChangeNotificationEventArgs<T>. This message is used both for successes, when the Exception property is a null reference and the AttributeData property contains valid data, and for failures, when the Exception property is non-null and contains the failure information.

The OnCompleted and OnError messages (methods of the IObserver) are never sent (not even in case of error related to the OPC item), thus the data stream is not terminated. If your application requires, you can process the data stream further, and filter it or split it by success/failure as needed.

For OPC-UA, you can create instances of UADataChangeNotificationObservable<TValue> either by using its constructor, or with use of a static UADataChangeNotificationObservable class with several overloads of the Create method. The static UADataChangeNotificationObservable.Create methods use the default underlying EasyUAClient object for OPC reactive extensions. If you need to set some parameters in the client object, you can use the ClientSelector property to specify them.

This approach allows the code be expressed only in terms of pure OPC logic, and be not tied to the actual way it is implemented. 

It is recommended that you create the instances using the UADataChangeNotificationObservable.Create method unless you have special needs.

Examples

// Shows how to create an observable for OPC-UA monitored item changes, and subscribe to it.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Main1()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to observe OPC UA data changes with a specified data change filter.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void DataChangeTrigger()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853");
                // Report a notification if either the StatusCode or the value change. 
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: UADataChangeTrigger.StatusValue));
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to observe OPC UA data changes with a specified absolute deadband.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void AbsoluteDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double absoluteDeadband = 50;
                Console.WriteLine($"Creating observable with absolute deadband {absoluteDeadband}...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: absoluteDeadband));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to observe OPC UA data changes with a specified percent deadband.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void PercentDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double percentDeadband = 5.0;
                Console.WriteLine($"Creating observable with  {percentDeadband}% deadband...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(
                        samplingInterval: 1000, 
                        new UADataChangeFilter(UADeadbandType.Percent, percentDeadband)));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to create an observable for a range of OPC UA array elements, and subscribe to it.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void IndexRangeList()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10933")
                {
                    IndexRangeList = UAIndexRangeList.OneDimension(2, 4)
                };
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments, monitoringParameters: 1000);
                UADataChangeNotificationObservable<Int32[]> observable =
                    UADataChangeNotificationObservable.Create<Int32[]>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows an OPC UA data change observable with specified timeouts.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Timeouts()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);
                // Set fairly short timeouts (failure can thus occur).
                observable.ClientSelector.Isolated = true;
                observable.ClientSelector.IsolatedParameters.SessionParameters.EndpointSelectionTimeout = 1500;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionConnectTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeoutDebug = 3000;

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

 

Example

.NET

.NET

.NET

.NET

.NET

.NET

.NET

.NET

// Shows how to create an observable for OPC-UA monitored item changes, and subscribe to it.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Main1()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to observe OPC UA data changes with a specified data change filter.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void DataChangeTrigger()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853");
                // Report a notification if either the StatusCode or the value change. 
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: UADataChangeTrigger.StatusValue));
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to observe OPC UA data changes with a specified absolute deadband.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void AbsoluteDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double absoluteDeadband = 50;
                Console.WriteLine($"Creating observable with absolute deadband {absoluteDeadband}...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: absoluteDeadband));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to observe OPC UA data changes with a specified percent deadband.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void PercentDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double percentDeadband = 5.0;
                Console.WriteLine($"Creating observable with  {percentDeadband}% deadband...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(
                        samplingInterval: 1000, 
                        new UADataChangeFilter(UADeadbandType.Percent, percentDeadband)));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to create an observable for a range of OPC UA array elements, and subscribe to it.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void IndexRangeList()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10933")
                {
                    IndexRangeList = UAIndexRangeList.OneDimension(2, 4)
                };
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments, monitoringParameters: 1000);
                UADataChangeNotificationObservable<Int32[]> observable =
                    UADataChangeNotificationObservable.Create<Int32[]>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows an OPC UA data change observable with specified timeouts.

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Timeouts()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);
                // Set fairly short timeouts (failure can thus occur).
                observable.ClientSelector.Isolated = true;
                observable.ClientSelector.IsolatedParameters.SessionParameters.EndpointSelectionTimeout = 1500;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionConnectTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeoutDebug = 3000;

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to continuously transform values of OPC-UA node, and write the results into a second node.
// Requires Microsoft Reactive Extensions (Rx).

using System;
using System.Reactive.Linq;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.Reactive;

namespace ReactiveDocExamples
{
    namespace _UAReactive
    {
        class Composition
        {
            public static void Pipeline()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating source observable...");
                UADataChangeNotificationObservable<int> source =
                    UADataChangeNotificationObservable.Create<int>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=11017", 100);

                Console.WriteLine("Creating processed observable (takes valid input values and take modulo 1000)...");
                IObservable<int> processed = source
                    .Where(e => e.Exception is null)
                    .Select(e => e.TypedAttributeData.TypedValue % 1000);

                Console.WriteLine("Creating observer to write values into OPC node...");
                UAWriteValueObserver<int> observer =
                    UAWriteValueObserver.Create<int>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10389");

                Console.WriteLine("Monitoring changes of the target OPC node using traditional means...");
                int handle = EasyUAClient.SharedInstance.SubscribeDataChange(
                    endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10389",
                    100, (_, e) => Console.WriteLine(e.AttributeData));

                Console.WriteLine("Subscribing the observer to the processed observable...");
                using (processed.Subscribe(observer))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10 * 1000);

                    Console.WriteLine("Unsubscribing the observer from the processed observable...");
                }

                Console.WriteLine("Finalizing monitoring...");
                EasyUAClient.SharedInstance.UnsubscribeMonitoredItem(handle);

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to query real-time OPC UA data using Trill. The query looks for event sequences where the previous value is
// less than 42, and the current value is greater than or equal to 42.

using System;
using System.Reactive.Linq;
using Microsoft.StreamProcessing;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.Generic;
using OpcLabs.EasyOpc.UA.Reactive;

namespace SimpleTrillApplication
{
    public sealed class Program
    {
        public static void Main(string[] args)
        {
            // Define which server we will work with.
            UAEndpointDescriptor endpointDescriptor =
                "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

            Console.WriteLine("Creating source observable...");
            // Create a data change observable for specified OPC UA node value.
            IObservable<UAAttributeData<byte>> sourceObservable = UADataChangeNotificationObservable
                .Create<byte>(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10846", 200)
                .Where(eventArgs => eventArgs.Succeeded)    // ignore events that carry failures
                .Select(eventArgs => eventArgs.TypedAttributeData);

            Console.WriteLine("Creating input streamable...");
            // Load the observable as a stream in Trill, injecting a punctuation every second. Because we use
            // FlushPolicy.FlushOnPunctuation, this will also flush the data every second.
            IObservableIngressStreamable<UAAttributeData<byte>> inputStreamable =
                sourceObservable.Select(attributeData => 
                        StreamEvent.CreateStart(attributeData.SourceTimestamp.Ticks, attributeData))
                    .ToStreamable(
                        DisorderPolicy.Drop(),
                        FlushPolicy.FlushOnPunctuation,
                        PeriodicPunctuationPolicy.Time((ulong)TimeSpan.FromSeconds(1).Ticks));

            // We will be building a query that takes a stream of UAAttributeData<byte> events.
            Console.WriteLine("Creating the query...");
            // Query: look for pattern of [value < 42] --> [value >= 42]
            IStreamable<Empty, Tuple<byte, byte>> query = inputStreamable
                .AlterEventDuration(TimeSpan.FromSeconds(10).Ticks)
                .Detect(
                    default(Tuple<byte, byte>), // register to store the value
                    pattern => pattern
                        .SingleElement(e => e.TypedValue < 42, (ts, ev, reg) => new Tuple<byte, byte>(ev.TypedValue, 0))
                        .SingleElement(e => e.TypedValue >= 42, (ts, ev, reg) => new Tuple<byte, byte>(reg.Item1, ev.TypedValue)));

            Console.WriteLine("Running the query for 20 seconds...");
            query.ToStreamEventObservable()
                .Take(TimeSpan.FromSeconds(20))
                .ForEachAsync(streamEvent => Console.WriteLine(streamEvent)).Wait();

            Console.WriteLine("Finished.");
        }



        // Example output:
        //Creating source observable...
        //Creating input streamable...
        //Creating the query...
        //Running the query for 20 seconds...
        //[Punctuation: 637080168490000000]
        //[Interval: 637080168493266740 - 637080168591235679, (17, 76)]
        //[Punctuation: 637080168500000000]
        //[Interval: 637080168509585008 - 637080168607485434, (28, 197)]
        //[Punctuation: 637080168510000000]
        //[Punctuation: 637080168520000000]
        //[Punctuation: 637080168530000000]
        //[Punctuation: 637080168540000000]
        //[Interval: 637080168540136670 - 637080168638105248, (23, 108)]
        //[Punctuation: 637080168550000000]
        //[Interval: 637080168556412772 - 637080168654381015, (20, 157)]
        //[Punctuation: 637080168560000000]
        //[Interval: 637080168560631254 - 637080168658599730, (36, 53)]
        //[Punctuation: 637080168570000000]
        //[Interval: 637080168572974888 - 637080168670943594, (34, 113)]
        //[Punctuation: 637080168580000000]
        //[Interval: 637080168589224991 - 637080168687193550, (25, 176)]
        //[Punctuation: 637080168590000000]
        //[Punctuation: 637080168600000000]
        //[Interval: 637080168605474719 - 637080168703443726, (1, 161)]
        //[Punctuation: 637080168610000000]
        //[Interval: 637080168611568222 - 637080168709537000, (32, 222)]
        //[Punctuation: 637080168620000000]
        //[Punctuation: 637080168630000000]
        //[Punctuation: 637080168640000000]
        //[Interval: 637080168644224600 - 637080168742193693, (24, 87)]
        //[Punctuation: 637080168650000000]
        //[Punctuation: 637080168660000000]
        //[Interval: 637080168660475491 - 637080168758444908, (40, 230)]
        //[Punctuation: 637080168670000000]
        //Finished.
    }
}
Inheritance Hierarchy

System.Object
   OpcLabs.EasyOpc.UA.Reactive.UADataChangeNotificationObservable

Requirements

Target Platforms: .NET Framework: Windows 10 (selected versions), Windows 11 (selected versions), Windows Server 2016, Windows Server 2022; .NET: Linux, macOS, Microsoft Windows

See Also