On this page

Query streams to retrieve data

Query streams to retrieve data

Method : POST
URI : /db/<dbname>/query
Body :

{"sql":"sql query..."}

We can use the same API [ please see API # 7 ]
However the sql query will have some specific changes for streams. Some of the examples are here

a. to get few rows from a stream [ there are different kinds of streams, input, filter, join, entity ]the sql query structure looks like, note where condition, limit etc are optional
select * from <schema_name>.<stream_name> where <condFitions> <limit> <limit_number>

b. to get few rows from running statistics/aggregated streams for a particular attributethe sql query structure looks like;
select aggr(attribute_name) from <schema_name>.<stream_name> where <conditions> <limit> <limit_number>

c. to get few rows from running statistics/aggregated for groupby attributes

the sql query structure looks like;
select aggr(attribute_name) from <schema_name>.<stream_name> where <conditions> groupby <limit> <limit_number>

For examples, let’s register another schema which does little more than previous one. Below if the schema

{
   "schema":"website",
   "streams":[
      {
         "name":"visitor",
         "type":1,
         "swsz":86400,
         "inpt":[
            
         ],
         "attr":[
            {
               "name":"vid",
               "type":5,
               "sidx":1,
               "stat":"UCOUNT"
            },
            {
               "name":"prod",
               "type":5,
               "kysz":24
            },
            {
               "name":"catid",
               "type":5,
               "kysz":24,
               "stat":"COUNT"
            },
            {
               "name":"pgid",
               "type":5,
               "kysz":48,
               "sidx":1,
               "gran":300,
               "stat":"UCOUNT"
            },
            {
               "name":"price",
               "type":11,
               "stat":"RSTAT"
            },
            {
               "name":"items",
               "type":9,
               "stat":"RSTAT"
            }
         ],
         "catr":[
            {
               "name":"pred_sales",
               "type":11,
               "opnm":"PRED",
               "algo":"SVM",
               "attr_type":"HYB",
               "model":"sales_model",
               "iatr":[
                  "vid",
                  "pgid",
                  "catid",
                  "pgid",
                  "items",
                  "price"
               ]
            }
         ],
         "gpby":[
            {
               "gpat":[
                  "catid",
                  "pgid"
               ],
               "iatr":"vid",
               "stat":"COUNT",
               "gran":300,
               "kysz":64
            },
            {
               "gpat":[
                  "catid"
               ],
               "iatr":"vid",
               "stat":"UCOUNT",
               "gran":300,
               "kysz":64
            },
            {
               "gpat":[
                  "catid"
               ],
               "iatr":"price",
               "stat":"RSTAT",
               "gran":300,
               "kysz":64
            }
         ],
         "enty":[
            {
               "name":"prod_details_entity1",
               "opid":1,
               "ratr":[
                  "nview"
               ],
               "iatr":[
                  "vid"
               ],
               "pkatr":[
                  {
                     "name":"pk1",
                     "type":5,
                     "kysz":24,
                     "opid":1,
                     "iatr":[
                        "prod"
                     ]
                  }
               ],
               "rstm":"prod_details",
               "doc_buf_factor":50,
               "sync_ts":60
            },
            {
               "name":"prod_details_entity2",
               "opid":2,
               "ratr":[
                  "uvisit"
               ],
               "iatr":[
                  "vid"
               ],
               "pkatr":[
                  {
                     "name":"pk1",
                     "opid":1,
                     "type":5,
                     "iatr":[
                        "prod"
                     ]
                  }
               ],
               "rstm":"prod_details",
               "doc_buf_factor":50,
               "sync_ts":60
            },
            {
               "name":"prod_details_entity3",
               "opid":3,
               "ratr":[
                  "sales"
               ],
               "iatr":[
                  "price"
               ],
               "pkatr":[
                  {
                     "name":"pk1",
                     "opid":1,
                     "type":5,
                     "iatr":[
                        "prod"
                     ]
                  }
               ],
               "rstm":"prod_details",
               "doc_buf_factor":50,
               "sync_ts":60
            },
            {
               "name":"prod_details_entity4",
               "opid":3,
               "ratr":[
                  "total_items"
               ],
               "iatr":[
                  "items"
               ],
               "pkatr":[
                  {
                     "name":"pk1",
                     "opid":1,
                     "type":5,
                     "iatr":[
                        "prod"
                     ]
                  }
               ],
               "rstm":"prod_details",
               "doc_buf_factor":50,
               "sync_ts":60
            }
         ],
         "rels":[
            {
               "sub":"vid",
               "obj":"prod",
               "rel":"BUYS",
               "sub_props":[
                  "catid"
               ],
               "obj_props":[
                  "items"
               ],
               "rel_props":[
                  "price"
               ]
            }
         ]
      },
      {
         "name":"prod_details",
         "type":4,
         "inpt":[
            "visitor"
         ],
         "attr":[
            {
               "name":"pk1",
               "type":5,
               "kysz":24,
               "pk":1
            },
            {
               "name":"nview",
               "type":9,
               "stat":"COUNT"
            },
            {
               "name":"uvisit",
               "type":9,
               "stat":"COUNT"
            },
            {
               "name":"sales",
               "type":11,
               "stat":"RSTAT"
            },
            {
               "name":"total_items",
               "type":9,
               "stat":"RSTAT"
            }
         ]
      }
   ]
}

Explanation of the schema.

a. We have a website and we wish to capture few data points for some analysis.
b. We are capturing vid (visitor id), pgid (page id that the user is on), prod (product id), catid (category of the page/product), price (total cost) and items (num of items)
c. We wish to compute running statistics of unique visitor count, category count etc… (see the “stat” attribute)
d. We further wish to compute running groupby aggregations, for ex; unique count of visitors group by catid (category) and pgid (page)
e. We also wish to predict the total sales using “catr” (computed attribute) using “sales_model”, which is trained using SVM algo with set of attributes /fields (vid, pgid, catid, items)
f. It’s often very common to query for total items sold so far since beginning, total sales since beginning etc.. Although common but these are pretty compute intensive jobs and takes so much time that we end up running it once in a day or so. Within BangDB we can “enty” (entity) which maintains such values always ready. More so, we can do running statistics also on this. Here we wish to keep several such entities like, total number of views so far, total sales so far etc.
g. We also have graph triple defined in “rels” such that as data in inserted into the stream, the graph

Let’s register the schema using the API as defined above [ POST /stream ]

Now, let’s insert some events into the stream using the API

curl -H "Content-Type: application/json" -d'{"vid":"v1","prod":"p1","catid":"c1","pgid":"pg1","price":123.45,"items":3}' -X POST http://192.168.1.105:18080/stream/website/visitor

curl -H "Content-Type: application/json" -d'{"vid":"v2","prod":"p1","catid":"c1","pgid":"pg1","price":43.27,"items":2}' -X POST http://192.168.1.105:18080/stream/website/visitor

curl -H "Content-Type: application/json" -d'{"vid":"v3","prod":"p2","catid":"c1","pgid":"pg2","price":67.98,"items":2}' -X POST http://192.168.1.105:18080/stream/website/visitor

curl -H "Content-Type: application/json" -d'{"vid":"v3","prod":"p1","catid":"c1","pgid":"pg1","price":27.98,"items":1}' -X POST http://192.168.1.105:18080/stream/website/visitor

curl -H "Content-Type: application/json" -d'{"vid":"v3","prod":"p3","catid":"c2","pgid":"pg3","price":71.65,"items":2}' -X POST http://192.168.1.105:18080/stream/website/visitor

curl -H "Content-Type: application/json" -d'{"vid":"v2","prod":"p3","catid":"c2","pgid":"pg3","price":41.65,"items":1}' -X POST http://192.168.1.105:18080/stream/website/visitor

curl -H "Content-Type: application/json" -d'{"vid":"v1","prod":"p3","catid":"c2","pgid":"pg3","price":42.65,"items":1}' -X POST http://192.168.1.105:18080/stream/website/visitor

curl -H "Content-Type: application/json" -d'{"vid":"v1","prod":"p2","catid":"c1","pgid":"pg2","price":47.05,"items":1}' -X POST http://192.168.1.105:18080/stream/website/visitor

curl -H "Content-Type: application/json" -d'{"vid":"v1","prod":"p1","catid":"c1","pgid":"pg1","price":54.75,"items":2}' -X POST http://192.168.1.105:18080/stream/website/visitor

curl -H "Content-Type: application/json" -d'{"vid":"v1","prod":"p2","catid":"c2","pgid":"pg2","price":51.50,"items":1}' -X POST http://192.168.1.105:18080/stream/website/visitor

Now we have inserted 10 events for visitor v1,v2 and v3. Let’s now run the query

a. Query Streams – to get few rows from a stream

[ there are different kinds of streams, input, filter, join, entity ]

i. Select all the events from the stream

curl -H "Content-Type: application/json" -d'{"sql":"select * from website.visitor"}' -X POST http://192.168.1.105:18080/db/mydb/query

Response

{
   "rows":[
      {
         "k":1648485283906300,
         "v":"{\"vid\":\"v1\",\"prod\":\"p2\",\"catid\":\"c2\",\"pgid\":\"pg2\",\"price\":51.5,\"items\":1,\"_pk\":1648485283906300,\"_v\":1}"
      },
      {
         "k":1648485230554236,
         "v":"{\"vid\":\"v1\",\"prod\":\"p1\",\"catid\":\"c1\",\"pgid\":\"pg1\",\"price\":54.75,\"items\":2,\"_pk\":1648485230554236,\"_v\":1}"
      },
      {
         "k":1648485192696760,
         "v":"{\"vid\":\"v1\",\"prod\":\"p2\",\"catid\":\"c1\",\"pgid\":\"pg2\",\"price\":47.05,\"items\":1,\"_pk\":1648485192696760,\"_v\":1}"
      },
      {
         "k":1648485151719474,
         "v":"{\"vid\":\"v1\",\"prod\":\"p3\",\"catid\":\"c2\",\"pgid\":\"pg3\",\"price\":42.65,\"items\":1,\"_pk\":1648485151719474,\"_v\":1}"
      },
      {
         "k":1648485133970000,
         "v":"{\"vid\":\"v2\",\"prod\":\"p3\",\"catid\":\"c2\",\"pgid\":\"pg3\",\"price\":41.65,\"items\":1,\"_pk\":1648485133970000,\"_v\":1}"
      },
      {
         "k":1648485109020973,
         "v":"{\"vid\":\"v3\",\"prod\":\"p3\",\"catid\":\"c2\",\"pgid\":\"pg3\",\"price\":71.65000000000001,\"items\":2,\"_pk\":1648485109020973,\"_v\":1}"
      },
      {
         "k":1648484176864142,
         "v":"{\"vid\":\"v3\",\"prod\":\"p1\",\"catid\":\"c1\",\"pgid\":\"pg1\",\"price\":27.98,\"items\":1,\"_pk\":1648484176864142,\"_v\":1}"
      },
      {
         "k":1648484143576439,
         "v":"{\"vid\":\"v3\",\"prod\":\"p2\",\"catid\":\"c1\",\"pgid\":\"pg2\",\"price\":67.98,\"items\":2,\"_pk\":1648484143576439,\"_v\":1}"
      },
      {
         "k":1648484107856603,
         "v":"{\"vid\":\"v2\",\"prod\":\"p1\",\"catid\":\"c1\",\"pgid\":\"pg1\",\"price\":43.27,\"items\":2,\"_pk\":1648484107856603,\"_v\":1}"
      },
      {
         "k":1648484074927522,
         "v":"{\"vid\":\"v1\",\"prod\":\"p1\",\"catid\":\"c1\",\"pgid\":\"pg1\",\"price\":123.45,\"items\":3,\"_pk\":1648484074927522,\"_v\":1}"
      }
   ],
   "num_items":10,
   "more_data_to_come":0,
   "switch_done":1
}

ii. to count number of rows in the stream

curl -H "Content-Type: application/json" -d'{"sql":"select count(*) from website.visitor"}' -X POST http://192.168.1.105:18080/db/mydb/query

Response

{
   "retval": 10
}

Total count is 10

iii. select only 2 events

curl -H "Content-Type: application/json" -d'{"sql":"select * from website.visitor limit 2"}' -X POST http://192.168.1.105:18080/db/mydb/query
{
   "rows":[
      {
         "k":1648485283906300,
         "v":"{\"vid\":\"v1\",\"prod\":\"p2\",\"catid\":\"c2\",\"pgid\":\"pg2\",\"price\":51.5,\"items\":1,\"_pk\":1648485283906300,\"_v\":1}"
      },
      {
         "k":1648485230554236,
         "v":"{\"vid\":\"v1\",\"prod\":\"p1\",\"catid\":\"c1\",\"pgid\":\"pg1\",\"price\":54.75,\"items\":2,\"_pk\":1648485230554236,\"_v\":1}"
      }
   ],
   "levk":1648485230554236,
   "num_items":2,
   "more_data_to_come":1,
   "switch_done":1
}

Note: if you see, “more_data_to_come” is set to 1, since there are more events in the system

iv. select events where visitor is “v2” and page id is “pg1”

curl -H "Content-Type: application/json" -d'{"sql":"select * from website.visitor where vid = \"v2\" and pgid = \"pg1\""}' -X POST http://192.168.1.105:18080/db/mydb/query
{
   "rows":[
      {
         "k":1648484107856603,
         "v":"{\"vid\":\"v2\",\"prod\":\"p1\",\"catid\":\"c1\",\"pgid\":\"pg1\",\"price\":43.27,\"items\":2,\"_pk\":1648484107856603,\"_v\":1}"
      }
   ],
   "sec_buf":"EgAAAF9iYW5nZGJfZHJpdmVyX3NpXwAwBAAAAHBnaWQAAAAAAAAAAAA=",
   "num_items":1,
   "more_data_to_come":0,
   "switch_done":1
}

v. select events where price items are 3 or more

curl -H "Content-Type: application/json" -d'{"sql":"select * from website.visitor where items >= 3"}' -X POST http://192.168.1.105:18080/db/mydb/query

Response

{
   "rows":[
      {
         "k":1648484074927522,
         "v":"{\"vid\":\"v1\",\"prod\":\"p1\",\"catid\":\"c1\",\"pgid\":\"pg1\",\"price\":123.45,\"items\":3,\"_pk\":1648484074927522,\"_v\":1}"
      }
   ],
   "num_items":1,
   "more_data_to_come":0,
   "switch_done":1
}

And so on…

vi. To select data from entity stream

curl -H "Content-Type: application/json" -d' {"sql":"select * from website.prod_details"}' -X POST http://192.168.1.105:18080/db/mydb/query

Response

{
   "rows":[
      {
         "k":"p1",
         "v":"{\"nview\":{\"cnt\":4},\"total_items\":{\"cnt\":4,\"sum\":8,\"min\":1,\"max\":3,\"avg\":2,\"stdd\":0.816496580927726,\"skew\":0,\"kurt\":1441.5},\"uvisit\":{\"cnt\":3},\"sales\":{\"cnt\":4,\"sum\":249.45,\"min\":27.98,\"max\":123.45,\"avg\":62.3625,\"stdd\":42.17547737331098,\"skew\":1.598788206225121,\"kurt\":2.786407659857769}}"
      },
      {
         "k":"p2",
         "v":"{\"nview\":{\"cnt\":3},\"total_items\":{\"cnt\":3,\"sum\":4,\"min\":1,\"max\":2,\"avg\":1.333333333333333,\"stdd\":0.5773502691896258,\"skew\":1.73205080756887,\"kurt\":0},\"uvisit\":{\"cnt\":2},\"sales\":{\"cnt\":3,\"sum\":166.53,\"min\":47.05,\"max\":67.98,\"avg\":55.51,\"stdd\":11.02616433761082,\"skew\":1.420104552713263,\"kurt\":0}}"
      },
      {
         "k":"p3",
         "v":"{\"nview\":{\"cnt\":3},\"total_items\":{\"cnt\":3,\"sum\":4,\"min\":1,\"max\":2,\"avg\":1.333333333333333,\"stdd\":0.5773502691896258,\"skew\":1.73205080756887,\"kurt\":0},\"uvisit\":{\"cnt\":3},\"sales\":{\"cnt\":3,\"sum\":155.95,\"min\":41.65,\"max\":71.65000000000001,\"avg\":51.98333333333334,\"stdd\":17.03917055884273,\"skew\":1.725341767699333,\"kurt\":0}}"
      }
   ],
   "num_items":3,
   "more_data_to_come":0,
   "switch_done":0
}

As you see, we get for each product (prod), various entities’ values since beginning (count or running stats).

"total_items" = 3 //for p3,
"uvisit" (unique visit) = 3 //for p3,
"sales" (running stats for sales) = {\"cnt\":3,\"sum\":155.95,\"min\":41.65,\"max\":71.65000000000001,\"avg\":51.98333333333334,\"stdd\":17.03917055884273,\"skew\":1.725341767699333,\"kurt\":0}

vii. to count total items / rows in the entity prod_details

curl -H "Content-Type: application/json" -d' {"sql":"select count(*) from website.prod_details"}' -X POST http://192.168.1.105:18080/db/mydb/query

Response

{
   "retval": 3
}

The count is 3

Now, let’s select some aggregated data. We have running statistics set on various attributes like “vid”, “catid”, “price” “items” etc… (wherever “stat” is set)

b. Query Streams – to get few rows from running statistics/aggregated streams for a particular attribute

curl -H "Content-Type: application/json" -d'{"sql":"select aggr(vid) from website.visitor where st >= 1 and et <= 2648490388199000"}
' -X POST http://192.168.1.105:18080/db/mydb/query

The “st” and “et” are start time and end time in microsec

{
   "rows":[
      {
         "k":1648490220000000,
         "v":0
      },
      {
         "k":1648490160000000,
         "v":0
      },
      {
         "k":1648490100000000,
         "v":0
      },
      {
         "k":1648490040000000,
         "v":0
      },
      {
         "k":1648489980000000,
         "v":2
      },
      {
         "k":1648489920000000,
         "v":3
      }
   ],
   "num_items":6,
   "more_data_to_come":0,
   "switch_done":0
}

As we see there are row for every single min as the running statistics happen with 60 sec gran. Hence one row for every single minute. But we can roll it for as many minute as required. For example, let’s rollup completely, since beginning

curl -H "Content-Type: application/json" -d'{"sql":"select aggr(vid) from website.visitor where st >= 1 and et <= 2648490388199000 rollup 1"}' -X POST http://192.168.1.105:18080/db/mydb/query

Response

{
   "rows":[
      {
         "k":1648490875815366,
         "v":"{\"fromts\":1,\"tots\":2648490388199000,\"aggr_val\":{\"cnt\":3}}"
      }
   ],
   "num_items":1,
   "more_data_to_come":0,
   "switch_done":0
}

This tells that there are 3 unique vid (visitors), since we are doing UCOUNT on vid therefore it’s correct

We can rollup now every 5 min by using “rollup 5” (since lowest granularity is single minute, hence 5 times of minute would give us 5 minute)

curl -H "Content-Type: application/json" -d'{"sql":"select aggr(vid) from website.visitor where st >= 1 and et <= 2648490388199000 rollup 5"}' -X POST http://192.168.1.105:18080/db/mydb/query

Response

{
   "rows":[
      {
         "k":1648490460000000,
         "v":0
      },
      {
         "k":1648490160000000,
         "v":0
      },
      {
         "k":1648491060000000,
         "v":3
      }
   ],
   "num_items":3,
   "more_data_to_come":0,
   "switch_done":0
}

If you see we have all 3 unique visitors in first 5 min, as we inserted all data at a time.

c. Query Streams – to get few rows from running statistics/aggregated for groupby attributes

Let’s now see the aggregated values from groupby

curl -H "Content-Type: application/json" -d' {"sql":"select aggr(vid) from website.visitor groupby catid:pgid"}' -X POST http://192.168.1.105:18080/db/mydb/query

Response

{
   "rows":[
      {
         "k":"c1:pg2",
         "v":2
      },
      {
         "k":"c1:pg1",
         "v":4
      },
      {
         "k":"c2:pg3",
         "v":3
      },
      {
         "k":"c2:pg2",
         "v":1
      }
   ],
   "num_items":4,
   "more_data_to_come":0,
   "switch_done":0
}

Here you get COUNT for each catid and pgid group.

We can add filter here, for example querying only for group catid c1 and pgid pg1

curl -H "Content-Type: application/json" -d' {"sql":"select aggr(vid) from website.visitor where skey = \"c1:pg1\" groupby catid:pgid"}' -X POST http://192.168.1.105:18080/db/mydb/query

Response

{
   "rows":[
      {
         "k":"c1:pg1",
         "v":4
      }
   ],
   "num_items":1,
   "more_data_to_come":0,
   "switch_done":0
}

we can also use rollup here, similar to what we did in previous section. If you see the schema, granularity for groupby aggregation is 300 sec (5 min), hence each row individually will come for 5 min. If we rollup 1 then we will get aggregated values for each group since beginning (one row for each group). If we aggregate with rollup 5 then we will get one row for every 5*5 = 25 min [ since granularity is 5 min ]

Was this article helpful to you? Yes No