AMPQ or MTTQ Protocol

Hi , new for the RapidM2M. Planning to stream data to Microsoft azure event hub over AMPQ. Does anyone have any idea on how to achieve that? What is the procedure I should follow if I want to continuously transmit data to IOT devices like microsoft event hub? Is the default Http push export realistic to transmit a lot of data to event hubs

The rapidM2M backend supports a so called backend logic. These are node.js scripts executed directly on the backend with an event driven interface to the backend’s data bus & storage.

By this way you can

  • register for data updates from any site/device and container (e.g. #config, #histdata, site status…),
  • filter and reduce raw data
  • re-map data values
  • re-organize data structures
  • and finally forward the compacted & optimized data to any data sink

For interfacing to your data sink you may use any npm module that fits to your needs.

The downside of this very flexible feature is, that it is not available on our cloud system for security reasons. You need a private edition of the rapidM2M backend to get the necessary rights.

Of course, the http push export you mentioned may be a fast first step to get some data out, but it is very limited in it’s functionality and verbosity.

A viable intermediate solution for small projects / proof of concepts is to run a node.js service your own. This service may bridge between our cloud backend’s REST API and the AMQP / MQTT broker.

Lateron, when your project grows, it will be very easy to integrate the bridge code into your IoTApp and move it to your private backend that way.

PS: Above approach is available in both directions (device to broker and vice versa)

1 Like

There are two ways to fetch the sensor data (histdata) in a backend logic (BLO). Our backend API (BAPI) provides these different concepts:

Event driven (Websocket)
You can subscribe to a histdata-channel and listen to initialize, update and delete events.
Example:

BAPI.subscribe('histdata0', { select: ['fieldX', 'fieldY'] })
  .onUpdate((data, context) => {
    BAPI.log.info(`Histdata0 update event ${context._uid} - ${JSON.stringify(data)}`);
    // data contains one record with stamp, fieldX, fieldY
    // context contains the site to which the data belongs
  })
  .onDelete((_uid) => {
    BAPI.log.info(`Histdata0 delete event ${_uid}`);
  })
  .onInitialized(() => {
    BAPI.log.info(`Init of hist0Cache finished!`);
    // Only called once on BLO startup
  });

HTTP Rest
You can use BAPI.get() to fetch the histdata records with a HTTP GET request. Example:

try {
  // the youngest histdata0 record of mySite
  const youngestRecord = await BAPI.get(`1/customers/myCustomer/sites/mySite/histdata0/youngest`, { 'select': ['fieldX', 'fieldY'] });
  // youngestRecord contains one record with stamp, fieldX, fieldY
 
  // an array with histdata0 records of mySite in a specific time-window
  const recordArr = await BAPI.get(`1/customers/myCustomer/sites/mySite/histdata0`, { 'select': ['fieldX', 'fieldY'], 'from': '202107121030', 'until': '202107121045' });
  // recordArr contains multiple records with stamp, fieldX, fieldY from the choosen time-window
  // keep the time-window small for better performance (day by day requests in a loop instead of requesting whole months or years)
}
catch (err) {
  BAPI.log.error(err);
}

It depends on your applcation which concept you use. For the communication with Azure Event Hub, I would prefer this npm package from Microsoft: @azure/event-hubs - npm

1 Like

First of all Thank you so much for your help, I managed to subscribe to the histdata and transmit message to Microsoft Event hub. Do you have , sample code/documentation on how I can achieve the bullet-points quoted in this reply. I want to * filter and reduce raw data

  • re-map data values

The codes are extremely specific to the requirements of the application, so it’s hard do provide out of the box examples. Please consider the guidelines below to solve typical situations:

To get rid of unused fields
you may use the subscription’s select:[…] parameter.

All other filter & reduce operations
simply use regular javascript code inside the onUpdate(…) handler.

When aggregating multiple recordings…

…it’s established practice to keep intermediate values in-memory (const aggregates[<site_uid>]).

In case of a BLO restart it may be necessary to catch-up data collected during BLO outage. Therefore it’s recommended to remember the last finalized aggregation in some histdata container (BAPI.post(’…/histdata8’, …)) and catch-up aggregation using BAPI.get(…) upon the very first onUpdate().

In certain cases it may be of advantage to completely replace the BAPI.subscribe(…) with periodically calling BAPI.get( '/1/sites/<site_uid>/histdata0/<interval>/<timezone>', { from:...., until:..., select:[...]}. This call does already all the basic aggregation work (min,max,avg,med) for you, optionally considering some local timezone (including daylight saving!). But be aware of data which drops-in significantly delayed (for example after a cellular outage).

The sophisticated approach is to trigger the BAPI.get(’…histdata0//’,…) calls whenever the BAPI.subscribe(‘histdata0’,…).onUpdate(…) reports a new record opening a new aggregation period. But be aware, that there may be a gap between current and the new aggregation period…