Stream Processing – BangDB = NoSQL + AI + Stream

Stream Processing


Stream processing

As we saw in previous section, all aggregation can be simply enabled using "stat":1/2/3/4/5 option, nothing else and the db will do this in run time continuous manner. However we need to do lot more than just aggregation. For ex;

1. "catr": we would need to comupte new attributes and thier values from existing ones 2. "gpby": groupby in running manner 3. "refr": refer to other streams and get some attributes using the join condition 4. "fltr": filter the stream based on some conditions and send the filtered set of attributes to other stream 5. "join": join the stream with other stream to produce data (set of attributes) to send it to other stream 6. "cepq": run cep query to figure out interesting patterns and send the identified pattern to other stream 7. "enty": keep the long term entity based data computed ready for consumption 8. "cvar": compute covariance or relation between any two attributes 9. "train": train a model and deploy for prediction 10. "pred": predict on stream using a model 11. "notif": notify in case of filter, join cep or pred output
And do all these for every single event data that is ingested into the system, in real time continuous manner. These are very powerful processing for event in real time for every single event, one at a time and not in batch manner. There is another article on why batch processing can't be true real time processing. Now let's go into each one of these one by one.

Processing details

When event (data) is ingested into the BangDB, it uses the schema to process the event in a particular order. One single event or data could go through several processing, which could be recursive as well, dependent on the schema defined.

Here is the order of processing for the event;

preprocess(); put(); postprocess();
First, event goes through pre processing of data, then the data is put into the stream and finally post processing happens. All of these could trigger another event which could go through the same chain of processing.

These triggered events could be;
Put in another stream, due to filter, join etc. a. groupby b. filter c. join d. cep e. entity Put in intermediate buffer, managed by the db Send notification
As you see, put in another stream could trigger again all of the above chain of processing, hence it's recursive in nature
Many of these are sync processing or async, it's upto db to decide how to arrange these for best optimisation of the efficiencies.
For ex; notification send process is always async but the internal computations for cep or join vis a vis maintained data structure are completely up to the db and it decides based some logic for best performance

Next few sections are devoted to all these individual processes

Example of a schema

{ "schema" : "myschema", "streams" :[ { "name":"mystream", "type":1, "swsz":86400, "inpt":[], "attr":[{"name":"a", "type":5, "kysz":32, "sidx":1, "ridx":1, "stat":1}, {"name":"b", "type":5, "kysz":32, "sidx":1, "stat":2}, {"name":"c", "type":5, "kysz":24, "sidx":1, "ridx":1, "stat":1}, {"name":"d", "type":5, "kysz":48, "sidx":1, "stat":1}, {"name":"e", "type":5, "kysz":32, "sidx":1, "stat":1}, {"name":"f", "type":5, "kysz":32, "stat":2}, {"name":"g", "type":11}, {"name":"h", "type":11} ], "catr":[{"name":"m", "type":9, "opid":3, "stat": 3, "opnm":"myudf1", "iatr":["b", "c", "d"]}, {"name":"n", "type":11, "stat":1, "opnm":"comp_int", "iatr":["g", "h"]}, {"name":"o", "type":5, "opnm":"string_add", "iatr":["a", "b"]}, {"name":"p", "type":5, "opid":3, "opnm":"myudf3", "iatr":["c", "b"]}, {"name":"q", "type":9, "opid":3, "opnm":"myudf3", "iatr":["g", "h"]}, {"name":"mexp", "type":9, "opid":13, "iatr":["((($g+$h)*2)+($g*$h))"]}], "gpby":[{"gpat":["a", "b"], "iatr":"c", "stat":1, "gran":3600, "kysz":48},{"gpat":["a"], "iatr":"d", "stat":2, "gran":86400, "kysz":32}], "refr":[{"name":"myrefr1", "iatr":["r"], "stat":4, "rstm":"prod_desc", "ratr":["p1"], "jqry":{"cond":["f"], "opid":11, "args":["f"]}}, {"name":"myrefr2", "iatr":["s"], "stat":2, "rstm":"prod_desc", "ratr":["p2"], "jqry":{"cond":["f", "g"], "opid":12, "args":["87", "240.9"]}}], "fltr":[{"name":"myfilter1", "fqry":{"name":"{\"query\":[{\"key\":\"h\", \"cmp_op\":0, \"val\":\"$n\"}],\"qtype\":2}", "type":1}, "fatr":["a", "b", "c", "g"], "catr":[{"name":"f1", "type":11, "opid":3, "iatr":["b", "c", "a"]}], "ostm":"filter_stream1", "notf":1}, {"name":"myfilter2", "fqry":{"name":"fltr_udf1", "type":2}, "fatr":["a", "b"], "catr":[{"name":"fltr1", "type":5, "opid":1, "iatr":["fltr1"]}, {"name":"fltr2", "type":11, "opid":13, "iatr":["($g+$h)"]}], "ostm":"filter_stream2", "notf":0}], "join":[{"name":"myjoin1", "type":1, "tloc":30, "iatr":["a", "b", "c", "e"], "rstm":"add_to_cart", "ratr":["j1", "j2", "j3"], "jqry":{"cond":["a", "b"], "opid":11, "opnm":"myrule2", "args":["a", "b"]}, "ostm":"joined_stream1"}], "cepq":[], "enty":[], "cvar":[{}] }, { "name":"prod_desc", "type": 1, "inpt":[], "attr":[{"name":"p1", "type":5, "kysz":24, "sidx":1}, {"name":"p2", "type":5, "kysz":32, "sidx":1}, {"name":"p3", "type":5, "kysz":24, "sidx":1}, {"name":"p4", "type":11, "stat":3}, {"name":"f", "type":5, "kysz":32, "sidx":1}, {"name":"g", "type":11, "stat":3} ] }, { "name":"add_to_cart", "type":1, "inpt":[], "attr":[{"name":"j1", "type":5, "kysz":32, "sidx":1}, {"name":"j2", "type":5, "kysz":32, "sidx":1}, {"name":"j3", "type":5, "kysz":32, "sidx":1}, {"name":"a", "type":5, "kysz":32, "sidx":1}, {"name":"b", "type":5, "kysz":32, "sidx":1}, {"name":"h", "type":11}], "join":[{"name":"myjoin1", "type":1, "tloc":30, "ratr":["a", "b", "c", "e"], "rstm":"mystream", "iatr":["j1", "j2", "j3"], "jqry":{"cond":["a", "b"], "opid":11, "opnm":"myrule2", "args":["a", "b"]}, "ostm":"joined_stream1"}] }, { "name":"joined_stream1", "type":3, "inpt":["mystream", "add_to_cart"], "attr":[{"name":"j1", "type":5, "kysz":32, "sidx":1}, {"name":"j2", "type":5, "kysz":32, "sidx":1}, {"name":"j3", "type":5, "kysz":32, "sidx":1}, {"name":"a", "type":5, "kysz":32, "sidx":1, "ridx":1, "stat":2}, {"name":"b", "type":5, "kysz":32, "sidx":1, "stat":1}, {"name":"c", "type":5, "kysz":24, "sidx":1, "ridx":1, "stat":2}, {"name":"e", "type":5, "kysz":32, "sidx":1, "stat":1} ] }, { "name":"filter_stream1", "type":2, "inpt":["mystream"], "attr":[{"name":"a", "type":5, "kysz":32, "sidx":1, "ridx":1, "stat":1}, {"name":"b", "type":5, "kysz":32, "sidx":1, "stat":1}, {"name":"c", "type":5, "kysz":24, "sidx":1, "ridx":1, "stat":1}, {"name":"g", "type":11}, {"name":"f1", "type":11, "stat":3}] }, { "name":"filter_stream2", "type":2, "inpt":["mystream"], "attr":[{"name":"a", "type":5, "kysz":32, "sidx":1, "ridx":1, "stat":1}, {"name":"b", "type":5, "kysz":32, "sidx":1, "stat":1}, {"name":"fltr1", "type":5, "kysz":32, "sidx":1, "stat":1}, {"name":"fltr2", "type":11}] } ], "udfs" :[{"name":"string_add", "type":2, "srctype":1, "src":"../predictive_analysis_data/bangdb2"}, {"name":"comp_int", "type":2, "srctype":1, "src":"../predictive_analysis_data/bangdb2"}, {"name":"fltr_udf1", "type":2, "srctype":1, "src":"../predictive_analysis_data/bangdb2"}], "notifs" :[{"notifid":12345,"name":"notif1","msg":"users msg","rule":"notification rule/condition","pri":1,"mailto":[],"endpoints":[""],"schemaid":1234,"notif_streamid":4321,"notif_stream_name":"sdf","freq":1,"tags":["a"],"rel_streams":["s1"]}] }