<@795725198980677734> you can use the conversion f...
# general
a
@Diederik you can use the conversion from cyberchef and match with the corresponding method from benthos here: https://docs.redpanda.com/redpanda-connect/guides/bloblang/methods/
d
Any plans to add any further option like this into UMH? Some data modelling tooling?
j
theoretically you can do already a lot of it using either connections & protocol converters (which will auto generate the location for you and make it easy to rename tags) and also through custom data flow components. The last one is still quite difficult to use. What would be some standard cases that we could implement?
d
I agree you can do this with benthos processing but i think that's not the greatest experience/way of working. This way you model everything at the edge HiveMQ for example has a tool to model the broker (read incoming tags on raw topics, model them and publish them again on a structured UNS) Something like that would be nice (Siemens Industrial Edge also had something similar)
A usecase for this would be my application => i add data to the MQTT broker not coming from Benthos/UMH itself, i would like to be able to model this data as well and fit it using a UMH tool into the UMH data layout i currently do this in nodered (read topic, transform it (base64 encoding to readable text, add some keys coming from an other device and publish it on the MQTT broker)
A standard usecase would be decoding (base64, protobuf,... f.e.) of a tag
remove characters/string operations could also be interesting
remember the problem we had with the topic structure this week (dots in the topics confuses the UNS)
endianness for data coming from Modbus sources
avro/XML/json/... converting
i'm aware that all this is already possible with benthos
bit relations (if State : "run" AND speed : "0" add key "Error" : true)
simple calculations (multiply,...)
random number generator (for validation usecases)
date time conversion (UTC to ... kind of stuff)
more advanced things like make the FFT of an long array or something (useful for predictive maintance /bandwidth reduction)
just dreaming again :p no idea if relevant
or feasible
j
this is what we currently think of as "stream processor". Where you take take from the UNS, do a simple calculation, and then write it back. all of them entioned stuff should be a 1 or 2 liner in bloblang > endianness for data coming from Modbus sources --> this is fortunately already possible in the configuration for the modbus input. all options are in there, even the retarded ones. > A usecase for this would be my application => i add data to the MQTT broker not coming from Benthos/UMH itself, i would like to be able to model this data as well and fit it using a UMH tool into the UMH data layout --> this I understand. maybe we can provide a simple "stream processor" template for a custom data flow component, that would allow for that? so that you only need to adjust the processing part, but would not need to worry about the rest?
would you use it then? it would look similar to this:
d
I think this could do I see you would work on the Kafka stream directly As an OT guy i'm still a bit afraid of Kafka but i can see the same thing working on an MQTT MQTT bridge right?
j
# diederik prototype
Copy code
input:
  mqtt:
    urls:
      - tcp://united-manufacturing-hub-mqtt.united-manufacturing-hub.svc.cluster.local:1883
    client_id: stream-processor-example-modbus
    dynamic_client_id_suffix: nanoid
    auto_replay_nacks: false
    topics:
      - umh/v1/enterprise-of-kings/rocky-9-3/wago-modbus-v1/_historian/INFO_DESCRIPTION
Copy code
pipeline:
  processors:
    - bloblang: |
            root = this.INFO_DESCRIPTION.encode("base64")
Copy code
output:
  mqtt:
    urls:
      - tcp://united-manufacturing-hub-mqtt.united-manufacturing-hub.svc.cluster.local:1883
    client_id: stream-processor-example-modbus
    dynamic_client_id_suffix: nanoid
    topic: umh/v1/enterprise-of-kings/rocky-9-3/wago-modbus-v1/_base64/INFO_DESCRIPTION
here a prototype 🙂
i would use MQTT for stream processor, as it allows you to fine granular pick single topics. in kafka you always have to read entire partitions.
kafka and mqtt is bridged using the bridges
in the above example you would not see the _base64 show up in the tag browser, as there is no bridge yet for _base64
wait let me modify it a little bit
d
looks good indeed! 🙂
j
Copy code
input:
  mqtt:
    urls:
      - tcp://united-manufacturing-hub-mqtt.united-manufacturing-hub.svc.cluster.local:1883
    client_id: stream-processor-example-modbus
    dynamic_client_id_suffix: nanoid
    auto_replay_nacks: false
    topics:
      - umh/v1/enterprise-of-kings/rocky-9-3/wago-modbus-v1/_historian/INFO_DESCRIPTION
Copy code
pipeline:
  processors:
    - bloblang: |
        root = {}
        root.INFO_DESCRIPTION = this.INFO_DESCRIPTION.encode("base64")
        root.timestamp_ms = this.timestamp_ms
Copy code
output:
  mqtt:
    urls:
      - tcp://united-manufacturing-hub-mqtt.united-manufacturing-hub.svc.cluster.local:1883
    client_id: stream-processor-example-modbus
    dynamic_client_id_suffix: nanoid
    topic: umh/v1/enterprise-of-kings/rocky-9-3/wago-modbus-v1/_historian/INFO_DESCRIPTION_BASE64
this is likely a better use case
this is how it looks in the tag browser then
d
i think the inverse is actually more logical :p (decoding instead of encoding) but no need to prove that works haha
example usecase i have in mind:
in an upcoming project i have IFM barcode scanners who output their photo's/data on a TCP/IP stream
i would like to read this stream, decode it and put it on the UNS
for now we're thinking Nodered/Flowfuse but i think UMH/Benthos would be just fine for this as well