bg
Working with the Stream – BangDB = NoSQL + AI + Stream

Working with the Stream

chevron

Stream API

Most important

APIs for stream Please refer Stream Manager for list of all APIs in the stream, here we will discuss the most important ones when it comes to ingesting and scanning/retrieving the data.

Overall, following few APIs are most critical and most of the time we will be dealing with these at run time for data processing

public String put(long schemaid, long streamid, String doc) public ResultSet scanDoc(long schemaid, long streamid, ResultSet prev_rs, long k1, long k2, String idx_filter_json, ScanFilter sf) public ResultSet scanProcDoc(long schemaid, long streamid, String query_json, ResultSet prev_rs, ScanFilter sf) public long count(long schemaid, long streamid) public long getGpbyId(long schemaid, long streamid, String gpby_attr_list)
And few important info are as follows;
enum BANGDB_STREAM_TYPE { BANGDB_STREAM_TYPE_NORMAL = 1, // defined in the schema BANGDB_STREAM_TYPE_FILTER, // defined in the schema BANGDB_STREAM_TYPE_JOINED, // defined in the schema BANGDB_STREAM_TYPE_ENTITY, // defined in the schema BANGDB_STREAM_TYPE_GPBY, // not explicitly defined in the schema BANGDB_STREAM_TYPE_AGGR, // not explicitly defined in the schema BANGDB_STREAM_TYPE_INVALID };

Ingesting data

To ingest data into the BangDB, we call put() function. It is very simple and all processing as defined in the schema is abstracted behind this API. Ingestion always happens for normal stream or the main stream as shown above in the enum.
Therefore, when we call this api, we ensure that every single computations will take place and only then it will return. Typically, for a single event put, there could be on an average several dozens of processing that may happen before the call returns, starting from computing new attributes, to referring to other existing attributes, groupbys, filter, join, entity computation, complex event processing, notification and few other necessary ones. Hence it is highly critical that the API is super efficient and highly performant.
Here is how the API is called;

public String put(long schemaid, long streamid, String doc) schemaid : id of the schema to which the stream belongs streamid: id of the stream, to which this event(doc) will be sent doc: event, a json string returns: it returns json string with errcode and also message as required. Errcode 0 is success and -ve is error It's a blocking call and returns once done

Scanning data from main stream

Scanning data means retrieving data from streams. The stream could be normal or other derived ones.
BANGDB_STREAM_TYPE_NORMAL enum defines the normal stream and all events/data is ingested into this stream only. But we can scan data from normal and all other derived streams as well
Also, filter, joined, referral stream, could also be scanned using this method

Therefore, this scan method can work for following types of the streams BANGDB_STREAM_TYPE_NORMAL = 1, BANGDB_STREAM_TYPE_FILTER, BANGDB_STREAM_TYPE_JOINED, This scan is for getting data from normal or raw stream
public ResultSet scanDoc(long schemaid, long streamid, ResultSet prev_rs, long k1, long k2, String idx_filter_json, ScanFilter sf) schemaid: schemaid of the schema to which the stream belongs streamid: stream id of the normal stream from which we wish to take data from prev_rs: all scans in BangDB returns resultset, which is list of key val. The prev_rs is the previously returned resultset for the previous scan call. Scan returns 2MB or whatever is configured, a fixed data or fixed number of rows at a time but since there could be much more data than the configured size hence we need to call scan multiple times to get all the data. Hence all scan is recursive in nature which takes previously returned resultset. k1 : start primary key, in case of stream it's always timestamp in microsec. Pls note Microsec k2: end time stamp in microsec idx_filter_join : filter defined using dataQuery, this is basically filter using secondary keys and text indexes sf : ScanFilter. User may define comparison ops, limit f the data in number of rows and size Returns : resultset, list of ket val

Scanning data from derived streams

We have to bother about only 3 stream types here, namely

entity streams - BANGDB_STREAM_TYPE_ENTITY (4) group by streams - BANGDB_STREAM_TYPE_GPBY (5) aggregate streams - count, unique count and running stats - BANGDB_STREAM_TYPE_AGGR (6)
To scan data from these streams, we use following API
public ResultSet scanProcDoc(long schemaid, long streamid, String query_json, ResultSet prev_rs, ScanFilter sf) schemaid : schema id for the schema to which the stream belongs streamid : This is for the main / normal stream id. Since the aggregate, entity and groupby streams are abstracted hence we use the stream id of the stream to which these belong query_json : quer_json has following basic structure, but it's different for different stream types; for gpby; --------- {"proc-type": 5, "gpby-attrid": 123, "from_ts":123456, "to_ts": 234567, "skey": "*:a1:b1", "ekey": "*:a1:b7","rollup":0} proc-type -> indicates what kind of derived stream, 5 means gpby gpby-attrid -> id of the groupby. User may call following function to get that public String getGpbyName(long schemaid, long streamid, String gpby_attr_list) Here gpby_attr_list has following structure; {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123} Here, user may just provide attrs and gpby-val and it will return gpby-attrid OR simply provide the gpby-name if that's there. Mostly user may call this function with following gpby_attr_list to get the gpby-attid; {"attrs":["a1", "b2", "c3"], "gpby-val":"x"} from_ts -> it could be 0 or start time in timestamp in microsec to_ts -> it could be 0 or end time in timestamp in microsec skey -> the pattern of the key, as required or defined in gpat etc. It can be NULL as well ekey -> same as skey rollup -> if we wish to rollup the data eventually for aggregate; -------------- {"proc-type": 6,"attrs":["a"], "from_ts":sk_ts, "to_ts":ek_ts, "rollup":1} Very similar to the gpby, except that attrs[] contains the attribute name for which the scan is being done for entity; ---------- {"proc-type": 4, "skey:"sk", "ekey":"ek", "enty-stream-id":1234} Here there is no from_ts and to_ts as entity stream is not based on timestamp as it's primary key. It is also not a sliding window based table or stream. The enty-stream-id is nothing but the stream id for the entity stream