bg
Stream Join – BangDB = NoSQL + AI + Stream

Stream Join

chevron

Stream join

It's important to join the streams and join them continuously as data streams in to different streams. However, this is not as simple as jining two tables which have data pretty much static in nature. Here the data is coming in, with different timestamps and then we may wish to join the streams where data from one stream could be moving faster than the other one. Most of time there won't be same number of events coming in from two different streams.

Further, depending upon use cases, we may wish to join any two data based on some condition or wish to chose the latest event in one or both the streams etc. Therefore, it's important that the db support more than one kind of join.

Broadly, there are three ways to join;

Simple join - join two latest data streams Active join - One of the two participating streams will be active join stream, other will be passive Passive Join - One of the two participating streams will be passive join stream, other will be active
There are few types of joins defined and they should be used for different use cases; Following are the types of joins supported in the BangDB;
join join_type = 1 means only once join. [ no active passive ] join_type = 2 means passive join join_type = 3 means passive join but join with the latest one join_type = 4 means active join join_type = 5 means active join but with the latest one join_type = 6 means simple cep join (2,4) and (3,5) go together

Simple join

Here the two streams joins the data based on the condition with the latest data from the slower stream and last non-joined data from the faster stream.
So if we have two streams; s1 and s2 and we have data coming in like following;

join_type = 1, defined in both streams s1 s2 t11 v11 p1 t21 v21 p2 t22 v22 p1 t23 v23 p1 t24 v24 p1 t12 v12 p2 t25 v25 p1 t26 v26 p2 t13 v13 p1 Here if we join stream in "simple manner" then following will be the events in joined stream s3 s3 t22 v11 v22 p1 t12 v12 v21 p2 t13 v13 v23 p1 ... As you notice, both streams are joining actively, and they join with the latest event that are not joined yet and once joined the events are not used further
Once event (t11, v11) and (t21, v22) are joined, even though we got (t23, v23) it waited until next event in s1 was received. It didn't join with the older event of s1. Therefore, once an event has been joined, the same event is not used for next or subsequent join. Stream manager waits for next event and then it joins with the earliest non-joined event of the other stream.

Active passive join

Here we have one stream which does active join and the other stream which simply participates passively in the join process. Here we have two types of such joins, one is where the join happens with only latest events whenever possible and the other one is where join happens not necessarily with the latest but the available ones. Let's see examples for each to get the clarity;

For the same data pattern, let's say s2 is the active stream, therefore only this will initiate join. This join will invalidate data only if new data is not arrived join_type = 3 for passive, and 5 for active stream joined stream will have following data; s3 t22 v11 v22 p1 t23 v11 v23 p1 t24 v11 v24 p1 t25 v11 v25 p1 t26 v12 v26 p2 t24 v12 v24 ...
As you see, the data kept joining with the other (passive) stream latest data, the moment newer data arrived in the active stream
Now let's say, we wish to join with s2 as the active stream, but with invalidation of the previous data with new data no matter what the condition might be join_type = 2 for passive, and 4 for active stream s3 t22 v11 v22 p1 t23 v11 v23 p1 t24 v11 v24 p1 t26 v12 v26 p2 t24 v12 v24 ... If you notice, you will find this is very similar to the previous one, except we don't have event (t25, v11, v25, p1) in this joined stream.
Now, let see some of the examples here for more clarity;

Examples

Let's take example of IOT scenario. There are three sensors for temperature, pressure and vibration.
We wish to join these streams such that for each point we get all these three values as they arrive.
Here we will use join_type = 1, that's simple join, which means both streams would actively join with each other, but once joined they will not use the same event data for next join

Example 1

Let's define the sample stream first

{ "schema" : "myschema", "streams" :[ { "name":"temp_stream", "type":1, "swsz":81600, "inpt":[], "attr":[{"name":"temp", "type":11}, {"name":"point", "type":9} ], "join":[{"name":"temp_pressure_join", "type":1, "tloc":300, "iatr":["temp", "point"], "rstm":"pressure_stream", "ratr":["pressure"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_stream"}] }, { "name":"pressure_stream", "type":1, "inpt":[], "attr":[{"name":"pressure", "type":11}, {"name":"point", "type":9} ], "join":[{"name":"temp_pressure_join", "type":1, "tloc":300, "ratr":["temp", "point"], "rstm":"temp_stream", "iatr":["pressure"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_stream"}] }, { "name":"vibration_stream", "type":1, "inpt":[], "attr":[{"name":"vibration", "type":9}, {"name":"point", "type":9} ], "join":[{"name":"temp_pressure_vibration_join", "type":1, "tloc":300, "ratr":["temp", "pressure", "point"], "rstm":"temp_pressure_stream", "iatr":["vibration"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_vibration_stream"}] }, { "name":"temp_pressure_stream", "type":3, "inpt":["temp_stream", "pressure_stream"], "attr":[{"name":"point", "type":9}, {"name":"temp", "type":11}, {"name":"pressure", "type":11} ], "join":[{"name":"temp_pressure_vibration_join", "type":1, "tloc":300, "iatr":["temp", "pressure", "point"], "rstm":"vibration_stream", "ratr":["vibration"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_vibration_stream"}] }, { "name":"temp_pressure_vibration_stream", "type":3, "inpt":["temp_pressure_stream", "vibration_stream"], "attr":[{"name":"point", "type":9}, {"name":"temp", "type":11}, {"name":"pressure", "type":11}, {"name":"vibration", "type":9} ], "cepq":[{"name":"temp_press_vib_cond", "type":6, "tloc":86400, "fqry":{"name":"{\"query\":[{\"key\":\"temp\", \"cmp_op\":0, \"val\":71.1},{\"joinop\":0},{\"key\":\"pressure\", \"cmp_op\":0, \"val\":11.0},{\"joinop\":0},{\"key\":\"vibration\", \"cmp_op\":0, \"val\":35}]}", "type":1}, "notf":12345}] } ], "notifs":[ {"notifid":12345,"name":"notif1","msg":"users msg","rule":"notification rule/condition","pri":1,"mailto":[],"endpoints":["http://192.168.1.49:10102/iot"],"schemaid":1234,"notif_streamid":4321,"notif_stream_name":"sdf","freq":1,"tags":["a"],"rel_streams":["s1"]} ] }
Three streams, they each measure temp, pressure and vibration for the point p. temp_stream and pressure_stream are joining with each other to send data to temp_pressure_stream. Further, vibration_stream and temp_pressure_stream are joining together to send data to temp_pressure_vibration_stream.

The join condition for all of these streams are s1.point = s2.point, i.e. join on point. Finally, from the temp_pressure_vibration_stream notification is sent when temp greater than 71.1 AND pressure greater than 11.0 AND vibration is greater than 35.
Note that "type" in each join is set to 1, which denotes simple join

Now, let's push some data in following order;
put [ temp_stream ] : {"temp":70.1, "point":1} put [ pressure_stream ] : {"pressure":10.2, "point":1} put [ pressure_stream ] : {"pressure":11.1, "point":1} put [ vibration_stream ] : {"vibration":30, "point":1} put [ pressure_stream ] : {"pressure":11.5, "point":1} put [ vibration_stream ] : {"vibration":40, "point":1} put [ pressure_stream ] : {"pressure":11.8, "point":1} put [ temp_stream ] : {"temp":71.1, "point":1} put [ vibration_stream ] : {"vibration":50, "point":1} put [ temp_stream ] : {"temp":71.2, "point":1} put [ vibration_stream ] : {"vibration":60, "point":1} Now let's scan and see data from different streams. Here are the data in temp stream [ temp_stream ] event = {"temp":70.10000000000001,"point":1,"_pk":1584944534256620,"_v":1} [ temp_stream ] event = {"temp":71.10000000000001,"point":1,"_pk":1584944534362863,"_v":1} [ temp_stream ] event = {"temp":71.2,"point":1,"_pk":1584944534396958,"_v":1} Here are the data in pressure stream [ pressure_stream ] event = {"pressure":10.2,"point":1,"_pk":1584944534266887,"_v":1} [ pressure_stream ] event = {"pressure":11.1,"point":1,"_pk":1584944534280460,"_v":1} [ pressure_stream ] event = {"pressure":11.5,"point":1,"_pk":1584944534308709,"_v":1} [ pressure_stream ] event = {"pressure":11.8,"point":1,"_pk":1584944534332148,"_v":1} Here are the data in vibration stream [ vibration_stream ] event = {"vibration":30,"point":1,"_pk":1584944534296090,"_v":1} [ vibration_stream ] event = {"vibration":40,"point":1,"_pk":1584944534320131,"_v":1} [ vibration_stream ] event = {"vibration":50,"point":1,"_pk":1584944534384031,"_v":1} [ vibration_stream ] event = {"vibration":60,"point":1,"_pk":1584944534397786,"_v":1} Here are the data in temp_pressure stream [ temp_pressure_stream ] event = {"pressure":10.2,"_pk":1584944534266887, "temp":70.10000000000001, "point":1,"_jpk1":1584944534256620,"_v":1} [ temp_pressure_stream ] event = {"temp":71.10000000000001, "point":1,"_pk":1584944534362863, "pressure":11.1,"_jpk1":1584944534280460,"_v":1} [ temp_pressure_stream ] event = {"temp":71.2,"point":1,"_pk":1584944534396958, "pressure":11.5,"_jpk1":1584944534308709,"_v":1} Here are the data in temp_pressure_vibration stream [ temp_pressure_vibration_stream ] {"vibration":30,"_pk":1584944534296090, "temp":70.10000000000001,"pressure":10.2,"point":1,"_jpk1":1584944534266887,"_v":1} {"temp":71.10000000000001,"pressure":11.1,"point":1,"_pk":1584944534362863, "vibration":40,"_jpk1":1584944534320131,"_v":1} {"temp":71.2,"pressure":11.5,"point":1,"_pk":1584944534396958, "vibration":50,"_jpk1":1584944534384031,"_v":1}
Finally there is notification sent out from the following joined stream;

{"notifid":12345,"name":"notif1","msg":"users msg","rule":"notification rule/condition","pri":1,"mailto":[],"endpoints":["http://192.168.1.49:10102/iot"],"schemaid":1234,"notif_streamid":4321,"notif_stream_name":"sdf","freq":1,"tags":["a"],"rel_streams":["s1"],"count":1,"dur":0,"notif_event":{"temp":71.2,"pressure":11.5,"point":1,"_pk":1584944534396958, "vibration":50,"_jpk1":1584944534384031,"_v":1},"count":1,"count_this_notif":1,"dur":0}

Example 2

Again same IOT scenario, but we will limit to two streams for clarity. These two streams, namely temp and pressure will join with each other using active, passive rule (3, 5) where pressure stream is active stream and temp is passive one.

Let's define the schema first;

{ "schema" : "myschema", "streams" :[ { "name":"temp_stream", "type":1, "swsz":81600, "inpt":[], "attr":[{"name":"temp", "type":11}, {"name":"point", "type":5, "kysz":48, "sidx":1, "stat":1} ], "join":[{"name":"temp_pressure_join", "type":3, "tloc":3000, "iatr":["temp", "point"], "rstm":"pressure_stream", "ratr":["pressure"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_stream"}] }, { "name":"pressure_stream", "type":1, "inpt":[], "attr":[{"name":"pressure", "type":11}, {"name":"point", "type":5, "kysz":48, "sidx":1, "stat":1} ], "join":[{"name":"temp_pressure_join", "type":5, "tloc":3000, "ratr":["temp", "point"], "rstm":"temp_stream", "iatr":["pressure"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_stream"}] }, { "name":"temp_pressure_stream", "type":3, "inpt":["temp_stream", "pressure_stream"], "attr":[{"name":"point", "type":5, "kysz":48, "sidx":1, "stat":1}, {"name":"temp", "type":11}, {"name":"pressure", "type":11} ] } ] }
Now pump data;
put [ temp_stream ] : {"temp":70.1, "point":1} put [ pressure_stream ] : {"pressure":10.2, "point":2} put [ pressure_stream ] : {"pressure":11.1, "point":1} put [ pressure_stream ] : {"pressure":11.5, "point":1} put [ pressure_stream ] : {"pressure":11.8, "point":1} put [ temp_stream ] : {"temp":71.1, "point":2} put [ pressure_stream ] : {"pressure":11.9, "point":1} put [ pressure_stream ] : {"pressure":12.1, "point":2} put [ temp_stream ] : {"temp":71.2, "point":1} put [ pressure_stream ] : {"pressure":12.5, "point":1} Here is the data for temp stream -[ temp_stream ] event = {"temp":70.10000000000001,"point":1,"_pk":1584945351349346,"_v":1} -[ temp_stream ] event = {"temp":71.10000000000001,"point":2,"_pk":1584945351429690,"_v":1} -[ temp_stream ] event = {"temp":71.2,"point":1,"_pk":1584945351491986,"_v":1} Here is the data for pressure stream -[ pressure_stream ] event = {"pressure":10.2,"point":2, "_pk":1584945351359800,"_v":1} -[ pressure_stream ] event = {"pressure":11.1,"point":1, "_pk":1584945351372994,"_v":1} -[ pressure_stream ] event = {"pressure":11.5,"point":1, "_pk":1584945351388063,"_v":1} -[ pressure_stream ] event = {"pressure":11.8,"point":1, "_pk":1584945351399370,"_v":1} -[ pressure_stream ] event = {"pressure":11.9,"point":1, "_pk":1584945351450466,"_v":1} -[ pressure_stream ] event = {"pressure":12.1,"point":2, "_pk":1584945351461066,"_v":1} -[ pressure_stream ] event = {"pressure":12.5,"point":1, "_pk":1584945351523036,"_v":1} Here is the data for joined temp_pressure stream -[ temp_pressure_stream ] event = {"pressure":11.1,"_pk":1584945351372994, "temp":70.10000000000001,"point":1,"_jpk1":1584945351349346,"_v":1} -[ temp_pressure_stream ] event = {"pressure":11.5,"_pk":1584945351388063, "temp":70.10000000000001,"point":1,"_jpk1":1584945351349346,"_v":1} -[ temp_pressure_stream ] event = {"pressure":11.8,"_pk":1584945351399370, "temp":70.10000000000001,"point":1,"_jpk1":1584945351349346,"_v":1} -[ temp_pressure_stream ] event = {"pressure":11.9,"_pk":1584945351450466, "temp":70.10000000000001,"point":1,"_jpk1":1584945351349346,"_v":1} -[ temp_pressure_stream ] event = {"pressure":12.1,"_pk":1584945351461066, "temp":71.10000000000001,"point":2,"_jpk1":1584945351429690,"_v":1} -[ temp_pressure_stream ] event = {"pressure":12.5,"_pk":1584945351523036, "temp":71.2,"point":1,"_jpk1":1584945351491986,"_v":1}

Example3

Same as previous problem, let's see the output. First deifne the schema

{ "schema" : "myschema", "streams" :[ { "name":"temp_stream", "type":1, "swsz":81600, "inpt":[], "attr":[{"name":"temp", "type":11}, {"name":"point", "type":9} ], "join":[{"name":"temp_pressure_join", "type":2, "tloc":300, "iatr":["temp", "point"], "rstm":"pressure_stream", "ratr":["pressure"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_stream"}] }, { "name":"pressure_stream", "type":1, "inpt":[], "attr":[{"name":"pressure", "type":11}, {"name":"point", "type":9} ], "join":[{"name":"temp_pressure_join", "type":4, "tloc":300, "ratr":["temp", "point"], "rstm":"temp_stream", "iatr":["pressure"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_stream"}] }, { "name":"temp_pressure_stream", "type":3, "inpt":["temp_stream", "pressure_stream"], "attr":[{"name":"point", "type":9}, {"name":"temp", "type":11}, {"name":"pressure", "type":11} ] } ] }
And here is the output;
put [ temp_stream ] : {"temp":70.1, "point":1} put [ pressure_stream ] : {"pressure":10.2, "point":2} put [ pressure_stream ] : {"pressure":11.1, "point":1} put [ pressure_stream ] : {"pressure":11.5, "point":1} put [ pressure_stream ] : {"pressure":11.8, "point":1} put [ temp_stream ] : {"temp":71.1, "point":2} put [ pressure_stream ] : {"pressure":11.9, "point":1} put [ pressure_stream ] : {"pressure":12.1, "point":2} put [ temp_stream ] : {"temp":71.2, "point":1} put [ pressure_stream ] : {"pressure":12.5, "point":1} Here is the data in the temp stream -[ temp_stream ] event = {"temp":70.10000000000001,"point":1,"_pk":1584946101515817,"_v":1} -[ temp_stream ] event = {"temp":71.10000000000001,"point":2,"_pk":1584946101611668,"_v":1} -[ temp_stream ] event = {"temp":71.2,"point":1,"_pk":1584946101674582,"_v":1} Here is the data in the pressure stream -[ pressure_stream ] event = {"pressure":10.2,"point":2,"_pk":1584946101526004,"_v":1} -[ pressure_stream ] event = {"pressure":11.1,"point":1,"_pk":1584946101540463,"_v":1} -[ pressure_stream ] event = {"pressure":11.5,"point":1,"_pk":1584946101562172,"_v":1} -[ pressure_stream ] event = {"pressure":11.8,"point":1,"_pk":1584946101579064,"_v":1} -[ pressure_stream ] event = {"pressure":11.9,"point":1,"_pk":1584946101632106,"_v":1} -[ pressure_stream ] event = {"pressure":12.1,"point":2,"_pk":1584946101643448,"_v":1} -[ pressure_stream ] event = {"pressure":12.5,"point":1,"_pk":1584946101705836,"_v":1} ----------scan for stream [ pressure_stream : 836447675855282773 ] fetched = 7 -[ temp_pressure_stream ] event = {"pressure":11.1,"_pk":1584946101540463, "temp":70.10000000000001,"point":1,"_jpk1":1584946101515817,"_v":1} -[ temp_pressure_stream ] event = {"pressure":11.5,"_pk":1584946101562172, "temp":70.10000000000001,"point":1,"_jpk1":1584946101515817,"_v":1} -[ temp_pressure_stream ] event = {"pressure":11.8,"_pk":1584946101579064, "temp":70.10000000000001,"point":1,"_jpk1":1584946101515817,"_v":1} -[ temp_pressure_stream ] event = {"pressure":12.1,"_pk":1584946101643448, "temp":71.10000000000001,"point":2,"_jpk1":1584946101611668,"_v":1} -[ temp_pressure_stream ] event = {"pressure":12.5,"_pk":1584946101705836, "temp":71.2,"point":1,"_jpk1":1584946101674582,"_v":1}