bg
Stream Commands – cli – BangDB = NoSQL + AI + Stream

Stream Commands – cli

chevron

Stream Commands

Queries for Stream using cli

Stream is a feature in BangDB using which user can ingest, parse, process and analyse data in continuous manner. CLI provides lots of commands to deal with streams in simple manner. There are following important steps to set up streams.

1. Create Schema (using cli or can author json schema file separately) 2. Register the schema 3. Ingest and process data

Schema is a json document which defines how the streams will get processed, stored and further the necessary actions may be taken. The structure of json is very simple it is like following
Schema :array of streams array of notification templates array of user defined functions [ UDFs ] Stream :array of attributes array of computed attributes [ attrs that could be computed at run time and added to the stream ] array of referred attributes [ attrs that could be referred and added to the stream ] array groupby that could be computed and updated continuosuly array of joins that could result in data streamed into another derived streams array of filter that could send filtered data to another derived streams array of CEP queries that could run continuously to find patterns, output could be sent to another derived streams array of entity long term stats to be computed continuosly See Stream Processing for more details

Let's see a sample schema
{
"schema" : "myschema",  
"streams" :[
 	 	 {
 	 	 "name":"stream1", "type":1, "swsz":86400,
 	 	 "inpt":[],
 	 	 "attr":[{"name":"a", "type":5, "kysz":32, "sidx":1, "ridx":1, "stat":1},
			 {"name":"b", "type":5, "kysz":32, "stat":2},
			 {"name":"g", "type":11, "stat":3},
			 {"name":"h", "type":11}
			],
 	 	 "catr":[{"name":"m", "type":9, "opnm":"ADD", "stat": 3, "iatr":["b", "a"]}, 
			 {"name":"mexp", "type":9, "opnm":"MATH_EXP", "iatr":["((($g+$h)*2)+($g*$h))"]}],
 	 	 "gpby":[{"gpat":["a", "b"], "iatr":"g", "stat":3, "gran":3600, "kysz":48}],
 	 	 "refr":[{"name":"myrefr1", "iatr":["c"], "rstm":"stream2", "ratr":["p1"], "jqry":{"cond":["b"], "opnm":"ATTR", "args":["b"]}}]
 	 	 },
		{
		"name":"stream2", "type": 1,
		"inpt":[],
		"attr":[{"name":"p1", "type":5, "kysz":24, "sidx":1}, 
			{"name":"b", "type":5, "kysz":32}
			]
		}
 	 	]
}
Other example could be following; This runs CEP in continuous manner to find a pattern where for two stocks, the current value is exactly same but the stocks are different
{
"schema": "test",
"streams": [
	     {  "name":"s1", "type":1, "swsz":86400,
		"inpt":[],
		"attr":[{"name":"stock", "type":5, "kysz":16},
			{"name":"price", "type":9}
		       ],
		"cepq":[{"name":"mystock", "type":1, "tloc":3000, "ratr":["price"], "rstm":"s1", "iatr":["stock", "price"], "jqry":{"cond":["stock", "price"], "opid":11, "args":["stock", "price"], "cmp":["NE", "EQ"], "seq":1}, "fqry":{"name":"{\"query\":[{\"key\":\"price\", \"cmp_op\":0, \"val\":70.5}]}", "type":1}, "cond":[{"name":"NUMT", "val":1, "opid":1}, {"name":"DUR", "val":1000, "opid":0}], "ostm":"s2"}]
	     },
	     {
		"name":"s2", "type":3, "swsz":86400,
		"inpt":["s1"],
		"attr":[{"name":"stock","type":5,"kysz":16},
			{"name":"price","type":9}
		       ]
	     }
	 ]
}
And other could this, which joins two streams in continuous manner. This joins temperature and pressure for any given point and sends the joined data to temp_pressure_stream
{
"schema" : "myschema",  
"streams" :[
 	 	 {
 	 	 "name":"temp_stream", "type":1, "swsz":81600,
 	 	 "inpt":[],
 	 	 "attr":[{"name":"temp", "type":11},
			 {"name":"point", "type":9}
			],
 	 	 "join":[{"name":"temp_pressure_join", "type":3, "tloc":3000, "iatr":["temp", "point"], "rstm":"pressure_stream", "ratr":["pressure"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_stream"}]
 	 	 },
		{
		"name":"pressure_stream", "type":1,
		"inpt":[],
		"attr":[{"name":"pressure", "type":11}, 
			{"name":"point", "type":9}
		       ],
		"join":[{"name":"temp_pressure_join", "type":5, "tloc":3000, "ratr":["temp", "point"], "rstm":"temp_stream", "iatr":["pressure"], "jqry":{"cond":["point"], "opid":11, "args":["point"]}, "ostm":"temp_pressure_stream"}]
		},
		{
		"name":"temp_pressure_stream", "type":3,
		"inpt":["temp_stream", "pressure_stream"],
		"attr":[{"name":"point", "type":9}, 
			{"name":"temp", "type":11}, 
			{"name":"pressure", "type":11}
		       ]
		}
 	 ]
}

To register the given schema

We can write a schema (like above) and directly register it with the server using cli
Suppose the schema is in file "sample_schema.json" then we can register as below
bangdb> register schema sample_schema.json @@@@ time taken to init stream [ myschema__stream2 ] is ... Usage : ------------------------- | Time : 0.003 msec | ------------------------- User : 0.01 msec Sys : 0 msec ------------------------- @@@@ time taken to init stream [ myschema__stream1 ] is ... Usage : ------------------------- | Time : 89.097 msec | ------------------------- User : 89.975 msec Sys : 1 msec ------------------------- success

To de-register the schema, drop the schema

bangdb> deregister schema myschema success

OR, we can create using the workflow provided by the cli, if we issue command "create schema ..."
bangdb> create schema myschema do you wish to read earlier saved schema for editing/adding? [ yes | no ]: no
Select "no" as we are creating from scratch. We should select yes if we have some schema on the disk and we would like to add/edit to it
Then it asks to select the stream name
what's the name of the stream that you wish to add?: stream1
We input the name of the stream "stream1" and the press enter
Post this we must select the type of the stream
what's the type of the stream [ raw/normal primary (1) | fltr output (2) | join/cep output (3) | Entity (4) | skip (5) ] (or Enter for default (1)):1
There are few types of streams, raw is the one where we ingest data and all others are derived streams, i.e. used by the stream processing engine flter output stream is one where filtered data goes, join/cep is the one where joined data goes and entity stream is the one where long term aggregate goes. There are few other types, such as stat, groupby, buffer etc. but they are all abstracted and used by the stream processing engine solely
Since this is the primary/raw stream hence we will select 1 Now, define the size of the sliding window, enter for default one day
what's the size of the sliding window in seconds [ or enter for default 86400 sec (a day) ]:
By now, we have empty stream schema defined, now we must define what goes inside the schema framework
enter 1 to add attributes first
What would you like to add (press Enter when done) [ attr (1) | catr (2) | refr (3) | gpby (4) | fltr (5) | join (6) | entity (7) | cep (8) | notifs (9) ]:1
This is start the sub workflow to add attributes
add attributes... attribute name: a attribute type (press Enter for default(5)) [ string(5) | long(9) | double (11) ]: 5 enable sidx [ 0 | 1 ]: 1 attribute key size in bytes (press Enter for default(24 bytes)): 32 enable ridx [ 0 | 1 ]: 1 enable stat [ none(0) | count (1) | unique count (2) ]: 1 add another attribute ? [ yes | no ]:
to add any attribute, type: we can select type of attributes sidx: to create secondary index on the attribute. Good for query and also if this is going to be used in join or cep or filter then we should enable the sidx key size : only when attribute type is string then we need to select this ridx: reverse index, switch it on if we wish to do search on this stat: this is generic running stats. For string type we can do count or unique count. For double or long we can also select stat as 3, which is complete set of stats [ count, min, max, avg, skey, stddev, excurt etc... ] We can keep following the sub workflow until we have created enough attributes
Now let's also create another stream "stream2" and add two attributes p1 and b as shown above, following the same workflow

let's now create "catr"
When creating "catr", it starts sub workflow for the "catr" itself
add computed attributes (catr)... attribute name (press Enter to end): m attribute type (press Enter for default (5)) [ string(5) | long(9) | double (11) ]: 9 available default ops are [ COPY | ADD | MUL | DIV | PERCENT | SUB | UPPER | LOWER | COPY_VAL | LOG_E | LOG_2 | LOG_10 | MATH_EXP | PRED | TS | YEAR | YEAR_EPOCH | MONTH | MONTH_EPOCH | WEEK | WEEK_MONTH | WEEK_EPOCH | DAY | DAY_WEEK | DAY_MONTH | DAY_EPOCH | HOUR | HOUR_EPOCH | MINUTE | MINUTE_EPOCH | SECOND | ABS ] or Enter custom udf name enter the name of the intended operation from the above default ops (press Enter to end): ADD
The catr is about computing new attribute based on some logic/computations/operations. Here we have 20+ default operations available, if we want to do something beyond these then we can write our own UDF and use the same.

Select "ADD" as the ops here, so we wish to add something.
We can ADD attributes or some fixed val. Here let's say we wish to add the attributes, therefore we need to select attributes that we wish to add
enter the name of the intended operation from the above default ops (press Enter to end): ADD enter the input attributes on which this ops will be performed, (press Enter once done): b enter the input attributes on which this ops will be performed, (press Enter once done): a enter the input attributes on which this ops will be performed, (press Enter once done): enter sequence [ 0 | 1 ], if 1 then it will be done before refer else post refr:1
The sequence is important if let's say we wish to apply operation on attribute which will be referred from other stream. Order ensures we execute catr before or after refer
We can now select "sidx" and "stat" for this computed attribute as well
enable sidx [ 0 | 1 ]: 0 enable stat [ none(0) | count (1) | running stats (3) ]: 0 should add, replace or add only if present [ add (1) | replace (2) | add only if not present (3) ]: 1 add another computed attribute ? [ yes | no ]: yes
Add another computed attribute now. This time we will use Math Expression to compute the attribute, using the existing attributes. Note, we can use the computed attributed as part of input here for another catr and so on
here we should select ((($g+$h)*2)+($g*$h)) as math expression
attribute name (press Enter to end): mexp attribute type (press Enter for default (5)) [ string(5) | long(9) | double (11) ]: 9 available default ops are [ COPY | ADD | MUL | DIV | PERCENT | SUB | UPPER | LOWER | COPY_VAL | LOG_E | LOG_2 | LOG_10 | MATH_EXP | PRED | TS | YEAR | YEAR_EPOCH | MONTH | MONTH_EPOCH | WEEK | WEEK_MONTH | WEEK_EPOCH | DAY | DAY_WEEK | DAY_MONTH | DAY_EPOCH | HOUR | HOUR_EPOCH | MINUTE | MINUTE_EPOCH | SECOND | ABS ] or Enter custom udf name enter the name of the intended operation from the above default ops (press Enter to end): MATH_EXP enter math expression: ((($g+$h)*2)+($g*$h)) enter sequence [ 0 | 1 ], if 1 then it will be done before refr else post refr: 1 enable sidx [ 0 | 1 ]: 0 enable stat [ none(0) | count (1) | running stats (3) ]: 0 should add, replace or add only if present [ add (1) | replace (2) | add only if not present (3) ]: 1 add another computed attribute ? [ yes | no ]: no
Now let's add groupby. It's also a sub-workflow
What would you like to add (press Enter when done) [ attr (1) | catr (2) | refr (3) | gpby (4) | fltr (5) | join (6) | entity (7) | cep (8) | notifs (9) ]: 4 add groupby (gpby)... name of the attribute that would be aggregated: g enter name of groupby attributes (press Enter once done): a enter name of groupby attributes (press Enter once done): b enter name of groupby attributes (press Enter once done): attribute key size in bytes (note gpby is name mangled with aggr and groupby attr names, hence should be properly allocated): 48 granularity for the aggregate (in seconds): 600 enable stat (1,2,3) [ count (1) | unique count (2) | running stat (3) ]: 1 add another gpby ? [ yes | no ]: no
Now let's create "refr", refer attribute in another stream
What would you like to add (press Enter when done) [ attr (1) | catr (2) | refr (3) | gpby (4) | fltr (5) | join (6) | entity (7) | cep (8) | notifs (9) ]: 3 add refers (refr)... refr name: myrefr1 enter name of input attribute that will get created after referring the other stream: c enter name of input attribute that will get created after referring the other stream: enable stat (1,2,3) on this created attribute? [ none(0) | count (1) | unique count (2) | running stat (3) ]: 0 enter refr (other stream) stream name: stream2 enter refr attribute name, the attribute which will get copied into this stream if refer condition is satisfied: p1 enter refr attribute name, the attribute which will get copied into this stream if refer condition is satisfied: enter name of condition attribute need to join two events (press Enter once done): b enter name of condition attribute need to join two events (press Enter once done): available jqry op names tells whether the condition attributes should be compared with another attribute (ATTR), some fixed value (FIXED), math expression (MATH_EXP), both attribute and fixed (HYBRID) or your custom udf name enter opid (operation name)[ ATTR (a) | FIXED (f) | MATH_EXP (m) | HYBRID (h) ]: a enter name of arguments (attribute or fixed val or hybrid or math_exp as opid selected previously) for joining (press Enter once done): b enter name of arguments (attribute or fixed val or hybrid or math_exp as opid selected previously) for joining (press Enter once done): enter the comparison operators [ EQ | NE | GT | LT ] for join (press Enter once done): EQ enter the comparison operators [ EQ | NE | GT | LT ] for join (press Enter once done): enter seq (sequence to tell if strictly consecutive events are required (1) or we may have other events in between (0)) [ or Enter for default (0) ]: 0 should add, replace or add only if present [ add (1) | replace (2) | add only if not present (3) ]: 1 add another refr ? [ yes | no ]: no
That's it to create the first schema (as shown above), now simply commit the changes and it will create the schema
What would you like to add (press Enter when done) [ attr (1) | catr (2) | refr (3) | gpby (4) | fltr (5) | join (6) | entity (7) | cep (8) | notifs (9) ]: add another stream ? [ yes | no ]: no do you wish to register the schema now? [ yes | no ]: yes @@@@ time taken to init stream [ myschema__stream2 ] is ... Usage : ------------------------- | Time : 0.002 msec | ------------------------- User : 0.008 msec Sys : 0 msec ------------------------- @@@@ time taken to init stream [ myschema__stream1 ] is ... Usage : ------------------------- | Time : 100.218 msec | ------------------------- User : 0.884 msec Sys : 1.001 msec ------------------------- success schema [ myschema ] registered successfully
It finally asks if you wish to store the schema on the disk
do you wish to save the schema (locally) for later reference? [ yes | no ]: no done with the schema [ myschema ] processing

see the existing schemas

bangdb> show schemas schema list fetched successfully +----------------+---------+ |name |state | +----------------+---------+ |myschema |1 | +----------------+---------+ fetched [ 1 ] schemas

To see the schema, use following

bangdb> select schema from myschema pretty

To insert event in the streams

bangdb> insert into myschema.stream2 values null {"p1":"P123","b":"b123"} success bangdb> insert into myschema.stream1 values null {"a":"a123","b":"b123","g":10.2,"h":5.5} success

Select data from stream1 now

bangdb> select * from myschema.stream1 scanning for pk range [null : null] and query = null +-------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+ |key |val | +-------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+ |1612193259884284 |{"a":"a123","b":"b123","g":10.2,"h":5.5,"_pk":1612193259884284,"m":7746999651695517567, | | "mexp":87,"c":"P123","_v":1} | +-------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+ total rows retrieved = 1 (1)
As you see, attribute m, mexp and c got added due to "catr" and "refr" respectively

To see the count of events in each stream of a given schema

bangdb> select stream from myschema +-------------+--------+------------------+ |stream |type |num_events | +-------------+--------+------------------+ |stream2. |1 | 1 | +-------------+--------+------------------+ |stream1 |1 | 1 | +-------------+----+----------------------+ successful in getting the streams [ 2 num ] info for schema [ myschema ]

To describe the schema, this will show the dependencies among streams and other entities

bangdb> describe schema myschema pretty { "schema" : "myschema", "schemaid" : 4766750373947953813, "node_info" : [ { "refr-by" : [], "cepq" : [], "gpby" : [], "fltr" : [], "refr" : [], "refr-to" : [], "parent" : [], "type" : 0, "children" : [ "myschema__stream2", "myschema__stream1" ], "node" : "dummy_head", "enty" : [], "joins" : [] }, { "refr" : [], "fltr" : [], "gpby" : [], "cepq" : [], "refr-by" : [ "myschema__stream1" ], "enty" : [], "node" : "myschema__stream2", "joins" : [], "children" : [], "type" : 1, "parent" : [ "dummy_head" ], "refr-to" : [] }, { "refr-to" : [ "myschema__stream2" ], "children" : [ "myschema__stream1__a__b__g" ], "type" : 1, "parent" : [ "dummy_head" ], "joins" : [], "enty" : [], "node" : "myschema__stream1", "refr-by" : [], "cepq" : [], "fltr" : [], "gpby" : [ "g" ], "refr" : [ "myschema__stream2" ] }, { "gpby" : [], "fltr" : [], "refr" : [], "refr-by" : [], "cepq" : [], "enty" : [], "joins" : [], "node" : "myschema__stream1__a__b__g", "refr-to" : [], "parent" : [], "type" : 5, "children" : [] } ] }

To describe stream of any schema

bangdb> describe stream myschema.stream1 {"node":6578472670278090808,"attributes":[{"name":"b","type":5,"kysz":48,"sidx":0,"stat":2,"ridx":0},{"name":"g","type":11,"kysz":8,"sidx":0,"stat":3,"ridx":0},{"name":"m","type":9,"kysz":8,"sidx":0,"stat":0,"ridx":0},{"name":"a","type":5,"kysz":32,"sidx":1,"stat":1,"ridx":1},{"name":"h","type":11,"kysz":8,"sidx":0,"stat":0,"ridx":0},{"name":"c","type":5,"kysz":0,"sidx":0,"stat":0,"ridx":0},{"name":"mexp","type":9,"kysz":8,"sidx":0,"stat":0,"ridx":0}],"parents":[{"name":"dummy_head","stid":0}],"children":[{"name":"myschema__stream1__a__b__g","stid":2601729947950351671}],"catr":[{"name":"m","attr-type":9,"fnr":1,"iatr":["b","a"]},{"name":"mexp","attr-type":9,"fnr":1,"iatr":["g","h"]}],"gpby":[{"gpby-attr":"g","gpby-attr-type":11,"gran":600,"swsz":86400,"gpat":["a","b"]}],"refr-to":[{"name":"myschema__stream2","stid":5641729835205121944}],"refr-by":[],"fltr":[],"joins":[],"refr":[{"refr-name":"myrefr1","refr-type":0,"joins-with":"myschema__stream2","iatr":["c","b"],"ratr":["b","p1"]}],"cepq":[],"enty":[],"name":"myschema__stream1","type":1}
You may also run the above command with "pretty" qualifier

To register notification

This is to create notification template which could be used to send notifications when certain event happens. For example when CEP query is satisfied, when filter is made, when join has happened etc..
Notification creation also create a sub-workflow which is intuitive enough to be able to create the notifications
bangdb> register notification add notification details: notification name: notif1 notification id: 1234 enter notification msg: This is a sample notification frequency in seconds (minimum num of seconds to wait before sending same notification): 60 priority of the notification [1, 2 or 3] (1 is highest): 1 enter the schema name for which this notification may be used?: myschema enter mail ids [whom notificaions will be sent to] (press enter to break):sachin@bangdb.com enter mail ids [whom notificaions will be sent to] (press enter to break):admin@bangdb.com enter mail ids [whom notificaions will be sent to] (press enter to break): enter API end points [whom notificaions will be sent to] (press enter to break):http://192.168.1.3:10101/account enter API end points [whom notificaions will be sent to] (press enter to break): { "schemaid" : 4766750373947953813, "endpoints" : [ "http://192.168.1.3:10101/account" ], "mailto" : [ "sachin@bangdb.com", "admin@bangdb.com" ], "freq" : 60, "pri" : 1, "name" : "notif1", "notifid" : 1234, "msg" : "This is a sample notification" } notification registered successfully

To de-register notification

bangdb> deregister notification 1234

To select the list of registered notifications

bangdb> select * from reg_notif scanning for pk range [null : null] and query = null +--------+------------------------------------------------------------------------------------------------------------------------------------------+ | key | val | +--------+------------------------------------------------------------------------------------------------------------------------------------------+ |1234. | {"name":"notif1","notifid":1234,"msg":"This is a sample notification","freq":60,"pri":1, | | | "schemaid":4766750373947953813,"mailto":["sachin@bangdb.com","admin@bangdb.com"], | | | "endpoints":["http://192.168.1.3:10101/account"]} | +--------+------------------------------------------------------------------------------------------------------------------------------------------+ total rows retrieved = 1 (1)

stats or aggr data selection

While creating the schema we add "stat" : 1 for attr "a" and "stat":2 for attr "b"
stat : 1 is for count, 2 is for unique count and 3 for all stats
bangdb> select aggr(a) from myschema.stream1 the query json is = {"proc-type":6,"attrs":["a"],"rollup":1,"from_ts":1,"to_ts":9223372036854775807} +------+-------------------------------------------------------------------------------------------+ |key |val | +------+-------------------------------------------------------------------------------------------+ |1 |{"fromts":1,"tots":9223372036854775807,"aggr_val":{"cnt":1}} | +------+-------------------------------------------------------------------------------------------+ total rows retrieved = 1 (1)

Plot - we can plot the data in a chart on the terminal or save it on a disk

We can plot data by adding "plot" command at the end of the query along with optional values
The syntax for plot is as follows; plot {"title":"mychart","term":1,"saveto":"file1.png","type":"line","just_plot":1,"attrs":["cnt","min","max"]} Note that the json after plot command is optional as are the details within it For ex; If you don't wish to save the image then avoid "saveto" key etc. title : title of the chart term: whether to print the chart in the terminal or in a window (pop up)? saveto : if you wish to save the chart as png file on the disk type: line | hist | point just_plot: this will not print the data, just the plot (useful when the data is large) attrs: typically stats will have count (cnt), min, max, avg, stdd, skew, kurt etc to print
bangdb> select aggr(a) from myschema.stream1 plot {"type":"line"} the query json is = {"proc-type":6,"attrs":["a"],"rollup":1,"from_ts":1,"to_ts":9223372036854775807} the time range has not been selected, hence limiting the num of data points to 1000 and just_plot =1 Warning: empty y range [0:0], adjusting to [-1:1]
                                                                               
                                bangdb_plot aggr                               
                                                                               
    1 +--------------------------------------------------------------------+   
      |      +      +      +      +      +     +      +      +      +      |   
      |                                                                    |   
      |                                                                    |   
  0.5 |-+                                                                +-|   
      |                                                                    |   
      |                                                                    |   
      |                                                                    |   
      |                                                                    |   
    0 |-+   ***********************************************************  +-|   
      |                                                                    |   
      |                                                                    |   
      |                                                                    |   
 -0.5 |-+                                                                +-|   
      |                                                                    |   
      |                                                                    |   
      |                                                                    |   
      |      +      +      +      +      +     +      +      +      +      |   
   -1 +--------------------------------------------------------------------+   
    12:00  14:00  16:00  18:00  20:00  22:00 24:00  26:00  28:00  30:00  32:00 
            
    
total rows retrieved = 1000 (1000) more data to come, continue .... [y/n]: