bg

BangDB Stream Manager API

chevron

BangDBStreamManager Type

Client API for building apps

BangDBStreamManager provides ways to create necessary constructs to deal with streams in timeseries manner. It allows users to create stream, ingest data, define processing logic so that continuous ingestion and analysis can go on in automated manner.

C++

Selected

Java

Selected

To get the instance of BangDBStream, call the constructor. It takes BangDBEnv object reference, see BangDBEnv for more details

BangDBStreamManager(BangDBEnv *env);
BangDB Stream works on a schema which user must define and register with the Stream manager in order to be able to receive data in the stream and also process the events as defined in the schema.

See BangDB Stream for more information

To register a schema.
char *registerSchema(const char *schema_json); The schema/ app is in json format and contains details of stream operation. It returns NULL for serious error or json doc with errcode less than 0 with information on why the registration failed. If successful then errcode is set to 0 in the returned doc Users should call delete[] to free the memory
To de-register / delete a existing schema
char *deregisterSchema(const char *schema_name, bool cleanclose = true); schema_name is the name given to the schema by the user. If successful then errcode is set to 0 in the returned doc Else for error it could return NULL or errcode set to -1 Users should call delete[] to free the memory
To add streams to an existing schema.
char *addStreams(long schemaid, const char *streams); The streams input here is a json string that contains an array of streams to be added.it takes schemaid as input for which the set of streams to be added. schemaid is a unique id associated with a particular schema. If successful then errcode is set to 0 in the returned doc Else for error it could return NULL or errcode set to -1 Users should call delete[] to free the memory
To delete streams from an existing schema
char *deleteStreams(long schemaid, const char *streams); If successful then errcode is set to 0 in the returned doc Else for error it could return NULL or errcode set to -1 Users should call delete[] to free the memory
To set stream state
char *setStreamState(const char *schema, const char *stream, short st); If successful then errcode is set to 0 in the returned doc Else for error it could return NULL or errcode set to -1 Users should call delete[] to free the memory
To get stream state
int getStreamState(const char *schema, const char *stream); The state of stream could be ON or OFF, hence it returns 1 or 0 respectively. For error it returns -1
To add user defined functions for computing in the schemas
char *addUdfs(long schema_id, const char *udfs); If successful then errcode is set to 0 in the returned doc Else for error it could return NULL or errcode set to -1 Users should call delete[] to free the memory
To delete udf from a given schema using udf name and schema id
char *delUdfs(long schema_id, const char *udfs); If successful then errcode is set to 0 in the returned doc Else for error it could return NULL or errcode set to -1 Users should call delete[] to free the memory
This list all the user defined functions present in the Database
char *getUdfList(); If successful then it returns the list else NULL Users should call delete[] to free the memory
To get the list all Registered notifications. These are notification templates to send the event notifications. These are not actual notifications. Please see Notification to know more about notification template and also dealing with it

Note that this is very similar to scanDoc present in the table. Infact it's exactly same as that. It takes the query filter as one of the arguments (idx_filter_json).

To see more information on how to scan the table, see DataQuery section
ResultSet *scanRegisteredNotif(ResultSet *prev_rs, FDT *pk_skey = NULL, FDT *pk_ekey = NULL, const char *idx_filter_json = NULL, ScanFilter *sf = NULL); If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error
To get the list of generated notifications, users may scan in the usual way. Note that the query filter can still be used for the scan. See DataQuery to know more about scan
ResultSet *scanNotification(ResultSet *prev_rs, FDT *pk_skey = NULL, FDT *pk_ekey = NULL, const char *idx_filter_json = NULL, ScanFilter *sf = NULL); If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error
To insert events into the stream. The event is the doc (json document)
char *put(long schemaid, long streamid, const char *doc); streamid is a unique numerical id associated with a particular stream. It returns json with errcode set to -1 for error else 0 for success. User should check for NULL as well User should delete the memory of returned data by calling delete[]
To get the events from any given stream from a given schema
char *put(long schemaid, long streamid, long k, const char *v); It returns json with errcode set to -1 for error else 0 for success. User should check for NULL as well User should delete the memory of returned data by calling delete[]
To scan the stream for a given filter condition. Users may scan the stream in the usual way. Note that the query filter (idx_filter_json) can still be used for the scan. See DataQuery to know more about scan
ResultSet *scanDoc(long schemaid, long streamid, ResultSet *prev_rs, FDT *pk_skey = NULL, FDT *pk_ekey = NULL, const char *idx_filter_json = NULL, ScanFilter *sf = NULL);
To scan aggregate, groupby and entity streams users should call this API. This takes a special argument attr_names_json, which defines what kind of data is being scanned. This again is a recursive scan and is used similar to other scans (db) . See DataQuery to know more about how to use scan effectively
ResultSet *scanProcDoc(long schemaid, long streamid, const char *attr_names_json, ResultSet *prev_rs, ScanFilter *sf = NULL); The attr_names_json defines what to do and for whom this is being called. The structure of the json is -for aggr = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "rollup":1} -for entity = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "enty-stream-id":1234} -for gpby = query_json = {"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123} It returns ResultSet, which could be iterated to go over the events. Else it returns NULL for error.
To count the number of events for a given filter condition
long countProc(long schemaid, long streamid, const char *attr_names_json, ScanFilter *sf = NULL); It returns -1 for error
To get groupby operation name. Check out more about it in Stream
long getGpbyName(long schemaid, long streamid, const char *gpby_attr_list, char **out_json); To get the GpbyName is the mangled name given by the stream manager to a particular groupby operation. The gpby_tatr_list provides the necessary information for the computation of the name. gpby_attr_list = {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123} It returns -1 for error else 0 for success. The out_json contains the name of the groupby
To count total number of events present in the given stream
long count(long schemaid, long streamid); It returns -1 for error else the count
To get number of events present for a given condition or filter query (idx_filter_json)
long count(long schemaid, long streamid, FDT *pk_skey, FDT *pk_ekey, const char *idx_filter_json = NULL, ScanFilter *sf = NULL); It returns -1 for error else the count
To get count of event pushed into the raw streams
ResultSet *scanUsage(ResultSet *prev_rs, long fromts, long tots, int rollup, ScanFilter *sf = NULL); It returns Resultset for success or NULL for error
To get the schema id for an existing schema
long getSchemaid(const char *schema_name, bool check_valid = true); It returns -1 for error else the schemaid
To get the stream id for a stream in an existing schema
long getStreamid(const char *schema_name, const char *stream_name, bool check_valid = true); It returns -1 for error or streamid
To get the entire schema (json structure). This API returns from the Stream memory and not from the stored metadata.
char *getSchemaStr(const char *schema_name); It returns NULL for error or json with errocde non-zero for other errors. For success it returns the schema. User should delete the memory of returned data by calling delete[]
To get the entire schema (json structure) from metadata. Usually both this and previous schema would be the same, but in some cases they could be different.
char *getSchemaFromMetadata(const char *schema_name); User should delete the memory of returned data by calling delete[]
To get a dependency graph for a given schema, users may call this API. This returns json doc defining the entire dependency graph for the scehma
char *getSchemaDepGraph(long schema_id, bool bfs = true); The schema is structured as a graph within the stream manager. This api will return the graph for the given schema. It returns NULL for error User should delete the memory of returned data by calling delete[]
To get dependency graph for a given stream
char *getStreamDepGraph(long schema_id, long stream_id, bool only_dep = false); This api will return the graph for the given stream for a schema.Please see stream section to know more on the graph It returns NULL for error User should delete the memory of returned data by calling delete[]
To get list of all schemas present in the database
char *getSchemaList(); This returns json doc with the list of all the schema or NULL for error. It may set errcode as -1 as well for some errors User should delete the memory of returned data by calling delete[]
To close the stream manager in the end
void closeBangdbStreamManager(CloseType ctype = DEFAULT_AT_CLIENT); ClosedType is enum with following values; DEFAULT_AT_CLIENT, CONSERVATIVE_AT_SERVER, OPTIMISTIC_AT_SERVER, CLEANCLOSE_AT_SERVER, SIMPLECLOSE_AT_SERVER, DEFAULT_AT_SERVER; Please see more on this at bangdb common

To get the instance of BangDBStream, call the constructor. It takes BangDBEnv object reference, see BangDBEnv for more details

public BangDBStreamManager(BangDBEnv bdbenv) BangDB Stream works on a schema which the user must define and register with the Stream manager in order to be able to receive data in the stream and also process the events as defined in the schema.

See BangDB Stream for more information

To register a schema/app
public String registerSchema(String schema_json) The schema/ app is in json format and contains details of stream operation. It returns NULL for serious error or json doc with errcode less than 0 with information on why the registration failed. If successful then errcode is set to 0 in the returned doc
To drop or delete a existing schema
public String deregisterSchema(String schema_name) To de-register an app or schema, simply pass the name of the scheme. If successful then errcode is set to 0 in the returned doc else for error it could return NULL or errcode set to -1
To drop or delete a existing schema
public String deregisterSchema(String schema_name, boolean cleanclose)
To add streams to a schema
public String addStreams(long schemaid, String streams) The DB assigns a unique id to every registered schema which is represented by schemaid and streams input here is json string that contains an array of streams to be added. It takes schemaid as input for which the set of streams to be added. If successful then errcode is set to 0 in the returned doc Else for error it could return NULL or errcode set to -1
To delete stream
public String deleteStreams(long schemaid, String streams) This deletes the set of streams defined in the streams json containing names of the streams. It takes schemaid as input for which the set of streams to be deleted. If successful then errcode is set to 0 in the returned doc else for error it could return NULL or errcode set to -1.
To check if the schema is currently ready for taking events
public int getSchemaDDLState(long schemaid) This returns if the schema is currently ready for taking events. It returns the 1 if it’s not ready and 0 if ready
To get schemaid for the given schema_name
public long getSchemaid(String schema_name, boolean check_valid) This returns schemaid for the given schema_name. If check_valid is false then it will return a potential name else the actual existing name.For success it returns the id else -1 for error
To get stream id for a given schema_name and stream_name
public long getStreamid(String schema_name, String stream_name, boolean check_valid) This returns streamid for the given schema_name and stream_name. If check_valid is false then it will return a potential name else the actual existing name. For success it returns the id else -1 for error
To get status of a stream
public int getStreamState(String schema_name, String stream_name) The state of the stream could be ON or OFF, hence it returns 1 or 0 respectively. For error it returns -1
To put events into the stream
public String put(long schemaid, long streamid, String doc) The event is the doc (json document) and schemaid, streamid define the stream for which the event should be put.It returns json string with errcode as 0 for success or -1 for error. It also contains reasons for failure when it fails. .
To put events into any given stream for a given schema
public String put(long schemaid, long streamid, long k, String v) This is to simply put the document into any given stream for a given schema. Please note there is no stream or event processing that takes place for this api. This is similar to table api. . It returns a json string with errcode as 0 for success or -1 for error. It also contains the reason for failure when it fails.
To scan a stream for a given filter
public ResultSet scanDoc(long schemaid, long streamid, ResultSet prev_rs, long k1, long k2, String idx_filter_json, ScanFilter sf)
To scan aggregate,groupby and entity stream
public ResultSet scanProcDoc(long schemaid, long streamid, String attr_names_json, ResultSet prev_rs, ScanFilter sf) The attr_names_json defines what to do and for whom this api is being called. The structure of the json is as follows; . . for aggr = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "rollup":1} for entity = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "enty-stream-id":1234} for gpby = query_json = {"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}. In gpby if gpby-attrid is provided then "attrs", "gpby-val", "gpby-name" not required, basically last 3 are to compute "gpby-attrid" only
To get the list all Registered notifications. These are notification templates to send the event notifications. These are not actual notifications. Please see Notification to know more about notification template and also dealing with it

Note that this is very similar to scanDoc present in the table. Infact it's exactly same as that. It takes the query filter as one of the arguments (idx_filter_json).

To see more information on how to scan the table, see DataQuery section
public ResultSet scanRegisteredNotif(ResultSet prev_rs, long k1, long k2, String idx_filter_json, ScanFilter sf) If successful, it returns resultset reference which could be iterated to read key and value. It returns NULL for error
To get count of events in the raw streams
public ResultSet scanUsage(ResultSet prev_rs, long fromts, long tots, int rollup, ScanFilter sf) It returns Resultset for success and NULL for error
To get count of events in given stream
public long countProc(long schemaid, long streamid, String attr_names_json, ScanFilter sf) This returns count of events for given schema and stream ids, else -1 for error.
To get the name of a groupby operation. Check out more about it in Stream
public String getGpbyName(long schemaid, long streamid, String gpby_attr_list) public long getGpbyId(long schemaid, long streamid, String gpby_attr_list) Groupby name is mangled by the stream manager. Therefore to get the actual name, we can call this api. The gpby_attr_list provides the necessary information for the computation of the name. It looks something like this; gpby_attr_list = {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123} It returns -1 for error else 0 for success. The out_json contains the name of the groupby
To get count of events for a given schema and stream
public long count(long schemaid, long streamid) It returns -1 for error else the count
To get count of events based on filter condition for a given schema and stream
public long count(long schemaid, long streamid, long psk, long pek, String filter_json, ScanFilter sf) It returns -1 for error else the count
Adding UDF to perform computations on stream
public String addUdfs(long schema_id, String udfs) The stream manager may leverage user defined functions for many computing as defined in the schema. Using this API, one can add udfs for a given scheamid. It returns a json string with errcode as 0 for success or -1 for error. It also contains the reason for failure when it fails.
To delete an UDF
public String delUdfs(long schema_id, String udfs) This API can delete udfs for a given scheamid. It returns a json string with errcode as 0 for success or -1 for error. It also contains the reason for failure when it fails.
To compile a UDF
public String compileUdf(String code)
To get the list all UDF present
public String getUDFList() It returns -1 for error else 0 for success. The result is a json containing list of udf’s
To get the list of all registered schemas
public String getSchemaList() This returns json doc with the list of all the schema or NULL for error. It may set errcode as -1 as well for some errors
To get a dependency graph for a given schema, users may call this API. This returns json doc defining the entire dependency graph for the schema
public String getSchemaDepGraph(long schema_id, boolean bfs) The schema is structured as a graph within the stream manager. This api will return the graph for the given schema. The bfs defines breadth first travel if set as true and it will use dfs (depth first). It returns NULL for error
To get the entire schema (json structure) from metadata. Usually both this and previous schema would be the same, but in some cases they could be different.
public String getSchemaStr(String schemaName, short from_meta) Here, from_meta = 0 means from memory, else 1 means, get from the meta store
To get graphical structure for a given stream
public String getStreamDepGraph(long schema_id, long stream_id, boolean only_dep) This api will return the graph for the given stream for a schema.Please see stream section to know more on the graph It returns NULL for error
To reset the ml helper atr run time
public void resetMlHelper(BangDBMLHelper bmlh)
To close the stream manager
public synchronized void closeBangdbStreamManager(CloseType closetype) Here, CloseType is an enum with values DEFAULT_AT_CLIENT, CONSERVATIVE_AT_SERVER, OPTIMISTIC_AT_SERVER, CLEANCLOSE_AT_SERVER, SIMPLECLOSE_AT_SERVER, DEFAULT_AT_SERVER