Connectivity Software User's Guide and Reference
Rapid Toolkit for Sparkplug Custom Data Provision Model
Rapid Toolkit for Sparkplug > Concepts > Developing Sparkplug Edge Nodes > Rapid Toolkit for Sparkplug Data Provision And Consumption Models > Rapid Toolkit for Sparkplug Custom Data Provision Model
In This Topic

General

In the custom data provision model, you write an independently running code that assembles the payload to be published at the time of your choice, and then calls the PublishDataPayload Method on the edge node or device. Alternatively, you can update individual metric data in the edge node and device, and let Rapid Toolkit for Sparkplug assemble and publish the payload.

The main difference from the pull and push data provision models is in the fact that there is no periodic polling made by Rapid Toolkit for Sparkplug. The execution flow is fully under the control of your code.

The Custom Data Provision Model is selected by setting the PublishingInterval Property on the edge node or device to Timeout.Infinite (-1).  

Publishing Data Payload

One approach to use in the custom data provision model is to take responsibility for assembling the Sparkplug payload to be published, when the edge node or device has data to be published, and then passing the assembled payload to Rapid Toolkit for Sparkplug. With this approach, your code does the following steps when it wants to publish a data payaload:

  1. Create an instance of the SparkplugPayload Class, which holds a collection of Sparkplug metrics (data, and optionally metadata).
  2. Populate the above created instance of the payload with metric data and metadata.
  3. Call the PublishDataPayload Method on the edge node or device object, passing it the Sparkplug payload instance with the metric data and metadata.

An example of this approach, together with code for processing publish errors, is here: Examples - Sparkplug Edge Node - Handle publishing errors.

Reporting by Exception

If you do not want to repeat publishing values that have not changes, the approach described above requires your code to keep track of the values that were published, compare them with the new values, and only include in the payload the metrics whose values have changed. Rapid Toolkit for Sparkplug can help with this and make the code shorter and easier to understand, with support for reporting by exception.

Reporting by exception is turned on by setting the ReportByException Property on the edge node or device object to true. With it, your code then calls the UpdateReadData Method on the metric object (SparkplugMetric Class) whenever it has new data that it wants to publish for the metric. If the metric value has changes, Rapid Toolkit for Sparkplug then creates the payload with the metric data, and publishes it.

When this is done without additional measures, each metric change publishes a new Sparkplug payload with just one metric. Of course, in many cases this is not what you want, especially if the nature of your edge node/device is such that it collects data for multiple metrics together, and also wants to publish them together, in a single Sparkplug payload. To group the metric updates together, and let Rapid Toolkit for Sparkplug assemble the payload containing all metrics that have changes, your code needs to lock the publishing before it starts updating the metric data, and unlock the publishing when it is done with updating. When the publishing is unlocked, if at least one metric has changed, Rapid Toolkit for Sparkplug wull publish the metrics that have changed, in a single Sparkplug payload.

The publishing is locked by calling the LockPublishing Method on the edge node or device. The publishing is unlocked by calling the UnlockPublishing Method on the same object. The calls to LockPublishing Method and UnlockPublishing Method must be paired under all conditions.

The following example shows the custom data provision model with reporting by exception, and with the use of locking an unlocking methods.

.NET

// This example shows how to turn off the polling by the component, and instead manually publish the data by reporting when
// they have changed.
//
// You can use any Sparkplug application, including our SparkplugCmd utility and the SparkplugApplicationConsoleDemo
// program, to subscribe to the edge node data. 
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
// Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
// Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
// a commercial license in order to use Online Forums, and we reply to every post.

using System;
using System.Threading;
using OpcLabs.EasySparkplug;
using Timer = System.Timers.Timer;

namespace SparkplugDocExamples.EdgeNode._EasySparkplugEdgeNode
{
     class ReportByException
    {
        static public void Main1()
        {
            // Note that the default port for the "mqtt" scheme is 1883.
            var hostDescriptor = new SparkplugHostDescriptor("mqtt://localhost");

            // Instantiate the edge node object.
            var edgeNode = new EasySparkplugEdgeNode(hostDescriptor, "easyGroup", "easySparkplugDemo");

            // Configure the edge node so that we will publish data fully manually.
            edgeNode.PublishingInterval = Timeout.Infinite;
            edgeNode.ReportByException = true;
            
            // Hook the SystemConnectionStateChanged event to handle system connection state changes.
            edgeNode.SystemConnectionStateChanged += (sender, eventArgs) =>
            {
                // Display the new connection state (such as when the connection to the broker succeeds or fails).
                Console.WriteLine($"{nameof(EasySparkplugEdgeNode.SystemConnectionStateChanged)}: {eventArgs}");
            };

            // Define metrics.
            var random = new Random();
            SparkplugMetric myMetric1 = SparkplugMetric.CreateIn(edgeNode, "MyMetric1").ValueType<int>();
            SparkplugMetric myMetric2 = SparkplugMetric.CreateIn(edgeNode, "MyMetric2").ValueType<int>();
            SparkplugMetric myMetric3 = SparkplugMetric.CreateIn(edgeNode, "MyMetric3").ValueType<int>();

            // Start the edge node.
            Console.WriteLine("The edge node is starting...");
            edgeNode.Start();

            Console.WriteLine("The edge node is started.");
            Console.WriteLine();

            // Create a timer for publishing the data, and start it.
            var timer = new Timer { AutoReset = true };
            timer.Elapsed += (sender, eventArgs) =>
            {
                // Do not publish individual updates, but rather lock the publishing so that we can make multiple updates.
                edgeNode.LockPublishing();
                try
                {
                    // Update some of the metrics (in this example, with random data).
                    if (random.Next(2) != 0)
                        myMetric1.UpdateReadData(random.Next());
                    if (random.Next(2) != 0)
                        myMetric2.UpdateReadData(random.Next());
                    if (random.Next(2) != 0)
                        myMetric3.UpdateReadData(random.Next());
                }
                finally
                {
                    // At this point, the edge node will publish the data for all metrics that have been updated.
                    edgeNode.UnlockPublishing();
                }
                
                // Set the next interval to a random value between 0 and 3 seconds.
                timer.Interval = random.Next(3 * 1000); 
            };
            timer.Start();
            
            // Let the user decide when to stop.
            Console.WriteLine("Press Enter to stop the edge node...");
            Console.ReadLine();
            
            // Stop the timer.
            timer.Stop();

            // Stop the edge node.
            Console.WriteLine("The edge node is stopping...");
            edgeNode.Stop();

            Console.WriteLine("The edge node is stopped.");
        }
    }
}
' This example shows how to turn off the polling by the component, and instead manually publish the data by reporting when
' they have changed.
'
' You can use any Sparkplug application, including our SparkplugCmd utility and the SparkplugApplicationConsoleDemo
' program, to subscribe to the edge node data.
'
' Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
' Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
' Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
' a commercial license in order to use Online Forums, and we reply to every post.

Imports System.Threading
Imports OpcLabs.EasySparkplug
Imports Timer = System.Timers.Timer

Namespace Global.SparkplugDocExamples.EdgeNode._EasySparkplugEdgeNode
    Class ReportByException
        Public Shared Sub Main1()
            ' Note that the default port for the "mqtt" scheme is 1883.
            Dim hostDescriptor = New SparkplugHostDescriptor("mqtt://localhost")

            ' Instantiate the edge node object.
            Dim edgeNode = New EasySparkplugEdgeNode(hostDescriptor, "easyGroup", "easySparkplugDemo")

            ' Configure the edge node so that we will publish data fully manually.
            edgeNode.PublishingInterval = Timeout.Infinite
            edgeNode.ReportByException = True

            ' Hook the SystemConnectionStateChanged event to handle system connection state changes.
            AddHandler edgeNode.SystemConnectionStateChanged,
                Sub(sender, eventArgs)
                    ' Display the new connection state (such as when the connection to the broker succeeds or fails).
                    Console.WriteLine($"{NameOf(EasySparkplugEdgeNode.SystemConnectionStateChanged)}: {eventArgs}")
                End Sub

            ' Define a metric providing random integers.
            Dim random = New Random()
            Dim myMetric1 As SparkplugMetric = SparkplugMetric.CreateIn(edgeNode, "MyMetric1").ValueType(Of Integer)()
            Dim myMetric2 As SparkplugMetric = SparkplugMetric.CreateIn(edgeNode, "MyMetric2").ValueType(Of Integer)()
            Dim myMetric3 As SparkplugMetric = SparkplugMetric.CreateIn(edgeNode, "MyMetric3").ValueType(Of Integer)()

            ' Start the edge node.
            Console.WriteLine("The edge node is starting...")
            edgeNode.Start()

            Console.WriteLine("The edge node is started.")
            Console.WriteLine()

            ' Create a timer for publishing the data, and start it.
            Dim timer = New Timer With {.AutoReset = True}
            AddHandler timer.Elapsed,
                Sub(sender, EventArgs)
                    ' Do not publish individual updates, but rather lock the publishing so that we can make multiple updates.
                    edgeNode.LockPublishing()
                    Try
                        ' Update some of the metrics (in this example, with random data).
                        If random.Next(2) <> 0 Then
                            myMetric1.UpdateReadData(random.Next())
                        End If
                        If random.Next(2) <> 0 Then
                            myMetric2.UpdateReadData(random.Next())
                        End If
                        If random.Next(2) <> 0 Then
                            myMetric3.UpdateReadData(random.Next())
                        End If
                    Finally
                        ' At this point, the edge node will publish the data for all metrics that have been updated.
                        edgeNode.UnlockPublishing()
                    End Try

                    ' Set the next interval to a random value between 0 and 3 seconds.
                    timer.Interval = random.Next(3 * 1000)
                End Sub
            timer.Start()

            ' Let the user decide when to stop.
            Console.WriteLine("Press Enter to stop the edge node...")
            Console.ReadLine()

            ' Stop the timer.
            timer.Stop()

            ' Stop the edge node.
            Console.WriteLine("The edge node is stopping...")
            edgeNode.Stop()

            Console.WriteLine("The edge node is stopped.")
        End Sub
    End Class
End Namespace

 

Using Disposable Lock Object

Making sure that calls to LockPublishing Method and UnlockPublishing Method are always paired in your code, including exceptional cases, is demanding and can be error prone. It is possible to make this pairing a little bit easier, using the Disposable Lock Object. This is an object that makes the locking when created, implements the IDisposable Interface, and makes the unlocking when disposed of (the IDisposable.Dispose method).

Many .NET languages have specialized code constructs to deal with IDisposable objects that assure that the object is properly disposed. These constructs, when properly used, make it easier to write code that guarantees that the methods calls are properly paired. In C#, the code construct is the using statement. In VB.NET, the code construct is a similar Using statement. In the end, using the disposable lock object does the same thing as calling the methods.

The following example shows the custom data provision model with reporting by exception, and with the use of the disposable lock object.

.NET

// This example shows how to lock the publishing and obtain a disposable object which unlocks the publishing
// when it is disposed.
//
// You can use any Sparkplug application, including our SparkplugCmd utility and the SparkplugApplicationConsoleDemo
// program, to subscribe to the edge node data.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
// Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
// Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
// a commercial license in order to use Online Forums, and we reply to every post.

using System;
using System.Threading;
using OpcLabs.EasySparkplug;
using Timer = System.Timers.Timer;

namespace SparkplugDocExamples.EdgeNode._EasySparkplugEdgeNode
{
     class DisposableLockPublishing
    {
        static public void Main1()
        {
            // Note that the default port for the "mqtt" scheme is 1883.
            var hostDescriptor = new SparkplugHostDescriptor("mqtt://localhost");

            // Instantiate the edge node object.
            var edgeNode = new EasySparkplugEdgeNode(hostDescriptor, "easyGroup", "easySparkplugDemo");

            // Configure the edge node so that we will publish data fully manually.
            edgeNode.PublishingInterval = Timeout.Infinite;
            edgeNode.ReportByException = true;
            
            // Hook the SystemConnectionStateChanged event to handle system connection state changes.
            edgeNode.SystemConnectionStateChanged += (sender, eventArgs) =>
            {
                // Display the new connection state (such as when the connection to the broker succeeds or fails).
                Console.WriteLine($"{nameof(EasySparkplugEdgeNode.SystemConnectionStateChanged)}: {eventArgs}");
            };

            // Define metrics.
            var random = new Random();
            SparkplugMetric myMetric1 = SparkplugMetric.CreateIn(edgeNode, "MyMetric1").ValueType<int>();
            SparkplugMetric myMetric2 = SparkplugMetric.CreateIn(edgeNode, "MyMetric2").ValueType<int>();
            SparkplugMetric myMetric3 = SparkplugMetric.CreateIn(edgeNode, "MyMetric3").ValueType<int>();

            // Start the edge node.
            Console.WriteLine("The edge node is starting...");
            edgeNode.Start();

            Console.WriteLine("The edge node is started.");
            Console.WriteLine();

            // Create a timer for publishing the data, and start it.
            var timer = new Timer { AutoReset = true };
            timer.Elapsed += (sender, eventArgs) =>
            {
                // Lock/unlock the publishing so that we can make multiple updates without them being published immediately.
                using (edgeNode.DisposableLockPublishing())
                {
                    // Update some of the metrics (in this example, with random data).
                    if (random.Next(2) != 0)
                        myMetric1.UpdateReadData(random.Next());
                    if (random.Next(2) != 0)
                        myMetric2.UpdateReadData(random.Next());
                    if (random.Next(2) != 0)
                        myMetric3.UpdateReadData(random.Next());

                    // Upon leaving the 'using' block, the publishing will be unlocked, and any updated data published.
                }

                // Set the next interval to a random value between 0 and 3 seconds.
                timer.Interval = random.Next(3 * 1000); 
            };
            timer.Start();
            
            // Let the user decide when to stop.
            Console.WriteLine("Press Enter to stop the edge node...");
            Console.ReadLine();
            
            // Stop the timer.
            timer.Stop();

            // Stop the edge node.
            Console.WriteLine("The edge node is stopping...");
            edgeNode.Stop();

            Console.WriteLine("The edge node is stopped.");
        }
    }
}
' This example shows how to lock the publishing and obtain a disposable object which unlocks the publishing
' when it is disposed.
'
' You can use any Sparkplug application, including our SparkplugCmd utility and the SparkplugApplicationConsoleDemo
' program, to subscribe to the edge node data.
'
' Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
' Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
' Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
' a commercial license in order to use Online Forums, and we reply to every post.

Imports System.Threading
Imports OpcLabs.EasySparkplug
Imports Timer = System.Timers.Timer

Namespace Global.SparkplugDocExamples.EdgeNode._EasySparkplugEdgeNode
    Class DisposableLockPublishing
        Public Shared Sub Main1()
            ' Note that the default port for the "mqtt" scheme is 1883.
            Dim hostDescriptor = New SparkplugHostDescriptor("mqtt://localhost")

            ' Instantiate the edge node object.
            Dim edgeNode = New EasySparkplugEdgeNode(hostDescriptor, "easyGroup", "easySparkplugDemo")

            ' Configure the edge node so that we will publish data fully manually.
            edgeNode.PublishingInterval = Timeout.Infinite
            edgeNode.ReportByException = True

            ' Hook the SystemConnectionStateChanged event to handle system connection state changes.
            AddHandler edgeNode.SystemConnectionStateChanged,
                Sub(sender, eventArgs)
                    ' Display the new connection state (such as when the connection to the broker succeeds or fails).
                    Console.WriteLine($"{NameOf(EasySparkplugEdgeNode.SystemConnectionStateChanged)}: {eventArgs}")
                End Sub

            ' Define metrics.
            Dim random = New Random()
            Dim myMetric1 As SparkplugMetric = SparkplugMetric.CreateIn(edgeNode, "MyMetric1").ValueType(Of Integer)()
            Dim myMetric2 As SparkplugMetric = SparkplugMetric.CreateIn(edgeNode, "MyMetric2").ValueType(Of Integer)()
            Dim myMetric3 As SparkplugMetric = SparkplugMetric.CreateIn(edgeNode, "MyMetric3").ValueType(Of Integer)()

            ' Start the edge node.
            Console.WriteLine("The edge node is starting...")
            edgeNode.Start()

            Console.WriteLine("The edge node is started.")
            Console.WriteLine()

            ' Create a timer for publishing the data, and start it.
            Dim timer = New Timer() With {.AutoReset = True}
            AddHandler timer.Elapsed,
                Sub(sender, eventArgs)
                    ' Lock/unlock the publishing so that we can make multiple updates without them being published immediately.
                    Using edgeNode.DisposableLockPublishing()
                        ' Update some of the metrics (in this example, with random data).
                        If (random.Next(2) <> 0) Then
                            myMetric1.UpdateReadData(random.Next())
                        End If
                        If (random.Next(2) <> 0) Then
                            myMetric2.UpdateReadData(random.Next())
                        End If
                        If (random.Next(2) <> 0) Then
                            myMetric3.UpdateReadData(random.Next())
                        End If

                        ' Upon leaving the 'using' block, the publishing will be unlocked, and any updated data published.
                    End Using

                    ' Set the next interval to a random value between 0 and 3 seconds.
                    timer.Interval = random.Next(3 * 1000)
                End Sub
            timer.Start()

            ' Let the user decide when to stop.
            Console.WriteLine("Press Enter to stop the edge node...")
            Console.ReadLine()

            ' Stop the timer.
            timer.Stop()

            ' Stop the edge node.
            Console.WriteLine("The edge node is stopping...")
            edgeNode.Stop()

            Console.WriteLine("The edge node is stopped.")
        End Sub
    End Class
End Namespace

 

 

Sparkplug is a trademark of Eclipse Foundation, Inc. "MQTT" is a trademark of the OASIS Open standards consortium. Other related terms are trademarks of their respective owners. Any use of these terms on this site is for descriptive purposes only and does not imply any sponsorship, endorsement or affiliation.

See Also