bg
bangdb stream manager – Embedded – BangDB = NoSQL + AI + Stream

bangdb stream manager – Embedded

chevron

BangDB Stream Manager

Embedded

C++

Selected

Java

Selected

static bangdb_stream_manager *getInstance(bangdb_database *bdb, bangdb_ml_helper *bmlh);
To get instance of bangdb_stream_manager. It takes bangdb_database as parameter and also bangdb_ml_helper as input parameter. The bangdb_ml_helper could be NULL in case we don’t wish to train and predict on streams.
When successful it returns the instance of the stream manager, else NULL for error.
char *register_schema(const char *schema_json);
BangDB stream works on schema of the set of streams or what we call app. We need to pass the app here which is in json format.
It returns json string with error code as 0 for success else -1 for error. It also contains reason for failure when it fails.
The app/ schema details are covered in the stream section
char *deregister_schema(const char *schema_name);
To de-register an app or schema, simply pass the name of the schame and it will return json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
The app/ schema details are covered in the stream section
char *add_streams(long schemaid, const char *streams);
This is helpful when we wish to add streams in the existing schema or app. The streams input here is json string that contains array of streams to be added. It takes schemaid as input for which the set of streams to be added. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
The app/ schema details are covered in the stream section
// streams = {\"schema\":\"myschema\", \"streams\":[{\"name\":\"mystream\"}]} char *delete_streams(long schemaid, const char *streams);
This deletes the set of streams defined in the streams json containing names of the streams. It takes schemaid as input for which the set of streams to be deleted. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
The app/ schema details are covered in the stream section
char *add_udfs(long schema_id, const char *udfs);
The stream manager may leverage user defined functions for many computing as defined in the schema. Using this API, one can add udfs for given scheamid. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
The know more about udf, please see the udf section
char *del_udfs(long schema_id, const char *udfs);
This API can delete udfs for given scheamid. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
The know more about udf, please see the udf section
char *put(long schemaid, long streamid, const char *doc);
To put event into the stream manager, this api is used. The event is the doc (json document) and schemaid, streamid define the stream for which the event should be put. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
Please see stream section to know more about events
char *put(long schemaid, long streamid, FDT *k, FDT *v);
This is to simply put the document into any given stream for a give schema. Please note there is no stream or event processing that takes place for this api. This is similar to table api.
It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
int get(long schemaid, long streamid, FDT *key, FDT **val);
This is to simply get the document from any given stream for a give schema. This is similar to table api. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
resultset *scan_doc(long schemaid, long streamid, resultset *prev_rs, FDT *k1 = NULL, FDT *k2 = NULL, const char *idx_filter_json = NULL, scan_filter *sf = NULL);
This is to scan the stream for given filters. This is very similar to the table API. It just takes additional schemaid and streamid to identify the given stream.
resultset *scan_proc_doc(long schemaid, long streamid, const char *attr_names_json, resultset *prev_rs, scan_filter *sf = NULL);
This is to scan aggregate, grouby and entity streams.
The attr_names_json defines what to do and for whom this api is being called. The structure of the json is as follows;

for aggr = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "rollup":1}

for entity = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "enty-stream-id":1234}

for gpby = query_json = {"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}
for gpby if gpby-attrid is provided then "attrs", "gpby-val", "gpby-name" not required, basically last 3 are to compute "gpby-attrid" only

enum BANGDB_STREAM_TYPE { BANGDB_STREAM_TYPE_NORMAL = 1, // defined in the schema BANGDB_STREAM_TYPE_FILTER, // defined in the schema BANGDB_STREAM_TYPE_JOINED, // defined in the schema BANGDB_STREAM_TYPE_ENTITY, // defined in the schema BANGDB_STREAM_TYPE_GPBY, // not explicitly defined in the schema BANGDB_STREAM_TYPE_AGGR, // not explicitly defined in the schema BANGDB_STREAM_TYPE_INVALID };
only BANGDB_STREAM_TYPE_ENTITY, BANGDB_STREAM_TYPE_GPBY and BANGDB_STREAM_TYPE_AGGR are supported
For aggr, we can also define rollup (1 means ON 0 means OFF)
It returns resultset for success or NULL for error
Please see stream section to get more info on these.
long get_gpby_name(long schemaid, long streamid, const char *gpby_attr_list, char **out_json);
Groupby name is mangled by the stream manager. Therefore to get the actual name, we can call this api. The gpby_attr_list provides the necessary information for the computation of the name. It looks something like this;

gpby_attr_list = {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}

if gpby-attrid field is already present, it returns same long value with NULL in out_json
It returns 0 for success and -1 for error.
Please see stream section to get more info on these.
long count(long schemaid, long streamid);
This returns count of events for given schema and stream ids, else -1 for error.
int get_schema_ddl_state(long schemaid);
This returns if the schema is currently ready for taking events. It returns the 1 if it’s not ready and 0 if ready
long get_schemaid(const char *schema_name, bool check_valid = true);
This returns schemaid for the given schema_name. If check_valid is false then it will return potential name else the actual existing name.
For success it returns the id else -1 for error
long get_streamid(const char *schema_name, const char *stream_name, bool check_valid = true);
This returns streamid for the given schema_name and stream_name. If check_valid is false then it will return potential name else the actual existing name.
For success it returns the id else -1 for error
char *get_schema_str(const char *schema_name);
This returns the entire schema or app json else errcode as -1 with the reason for failure
char *get_schema_from_metadata(const char *schema_name);
This returns the entire schema or app json else errcode as -1 with the reason for failure. Please note this is same as previous api, except it reads from table and then returns whereas previous api will return from the cache
char *get_schema_dep_graph(long schema_id, bool bfs = true);
The schema is structured as graph within the stream manager. This api will return the graph for the given schema. The bfs defines breadth first travel if set as true and it will use dfs (depth first). As of now only bfs as true is supported.
Upon error it will return json string with errcode as -1 with the reasons for failure. Please see stream section to know more on the graph
char *get_stream_dep_graph(long schema_id, long stream_id, bool only_dep = false);
This api will return the graph for the given stream for a schema.
only_dep = true means return the depth part else returns the node information. Upon error it will return json string with errcode as -1 with the reasons for failure.
Please see stream section to know more on the graph
void reset_ml_helper(bangdb_ml_helper *bmlh);
This to reset the ml helper at run time. This could be used for various reasons but mostly supported for changes in the cluster config for ml
void close_bangdb_stream_manager(bool force = false);
This will close the stream manager. Please note if force is set to be false then it will simply reduce the reference count and if the count is 0 then it will close the stream manager. But if force is set to be as false then it will simply close the stream manager irrespective of how many references are there or not

public BangDBStreamManager(BangDBDatabase bdb)
To get instance of bangdb_stream_manager. It takes bangdb_database as parameter.
public String registerSchema(String schema_json)
BangDB stream works on schema of the set of streams or what we call app. We need to pass the app here which is in json format. It returns json string with error code as 0 for success else -1 for error. It also contains reason for failure when it fails.
The app/ schema details are covered in the stream section
public String deregisterSchema(String schema_name)
To de-register an app or schema, simply pass the name of the schame and it will return json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. .
The app/ schema details are covered in the stream section
public String addStreams(long schemaid, String streams)
This is helpful when we wish to add streams in the existing schema or app. The streams input here is json string that contains array of streams to be added. It takes schemaid as input for which the set of streams to be added. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. .
The app/ schema details are covered in the stream section
public String deleteStreams(long schemaid, String streams)
This deletes the set of streams defined in the streams json containing names of the streams. It takes schemaid as input for which the set of streams to be deleted. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. .
The app/ schema details are covered in the stream section
public int getSchemaDDLState(long schemaid)
This returns if the schema is currently ready for taking events. It returns the 1 if it’s not ready and 0 if ready
public long getSchemaid(String schema_name, boolean check_valid)
This returns schemaid for the given schema_name. If check_valid is false then it will return potential name else the actual existing name. .
For success it returns the id else -1 for error
public long getStreamid(String schema_name, String stream_name, boolean check_valid)
This returns streamid for the given schema_name and stream_name. If check_valid is false then it will return potential name else the actual existing name. .
For success it returns the id else -1 for error
public String put(long schemaid, long streamid, String doc)
To put event into the stream manager, this api is used. The event is the doc (json document) and schemaid, streamid define the stream for which the event should be put. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails. .
Please see stream section to know more about events
public String put(long schemaid, long streamid, long k, String v)
This is to simply put the document into any given stream for a give schema. Please note there is no stream or event processing that takes place for this api. This is similar to table api. .
It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
Public String get(long schemaid, long streamid, long k)
This is to simply get the document from any given stream for a give schema. This is similar to table api. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
public ResultSet scanDoc(long schemaid, long streamid, ResultSet prev_rs, long k1, long k2, String idx_filter_json, ScanFilter sf)
This is to scan the stream for given filters. This is very similar to the table API. It just takes additional schemaid and streamid to identify the given stream.
public ResultSet scanProcDoc(long schemaid, long streamid, String attr_names_json, ResultSet prev_rs, ScanFilter sf)
This is to scan aggregate, grouby and entity streams. The attr_names_json defines what to do and for whom this api is being called. The structure of the json is as follows; .
.
for aggr = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "rollup":1}
for entity = query_json = {"proc-type": 6,"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "enty-stream-id":1234}
for gpby = query_json = {"attrs":["a", "b", ...], "option" : 1, "skey:"sk", "ekey":"ek", "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123}
for gpby if gpby-attrid is provided then "attrs", "gpby-val", "gpby-name" not required, basically last 3 are to compute "gpby-attrid" only

enum BANGDB_STREAM_TYPE {   BANGDB_STREAM_TYPE_NORMAL = 1, // defined in the schema   BANGDB_STREAM_TYPE_FILTER, // defined in the schema   BANGDB_STREAM_TYPE_JOINED, // defined in the schema   BANGDB_STREAM_TYPE_ENTITY, // defined in the schema   BANGDB_STREAM_TYPE_GPBY, // not explicitly defined in the schema   BANGDB_STREAM_TYPE_AGGR, // not explicitly defined in the schema   BANGDB_STREAM_TYPE_INVALID };
only BANGDB_STREAM_TYPE_ENTITY, BANGDB_STREAM_TYPE_GPBY and BANGDB_STREAM_TYPE_AGGR are supported
For aggr, we can also define rollup (1 means ON 0 means OFF)
It returns resultset for success or NULL for error
public String getGpbyName(long schemaid, long streamid, String gpby_attr_list)
Groupby name is mangled by the stream manager. Therefore to get the actual name, we can call this api. The gpby_attr_list provides the necessary information for the computation of the name. It looks something like this;
gpby_attr_list = {"attrs":["a1", "b2", "c3"], "gpby-val":"x", "gpby-name":"a1_b2_c3", "gpby-attrid":123} if gpby-attrid field is already present, it returns same long value with NULL in out_json
It returns 0 for success and -1 for error.
Please see stream section to get more info on these.
public long getGpbyId(long schemaid, long streamid, String gpby_attr_list)
This api is similar to getGpbyName except that it returns gpbyId or -1 for error
public long count(long schemaid, long streamid)
This returns count of events for given schema and stream ids, else -1 for error.
public String addUdfs(long schema_id, String udfs)
The stream manager may leverage user defined functions for many computing as defined in the schema. Using this API, one can add udfs for given scheamid. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
The know more about udf, please see the udf section
public String delUdfs(long schema_id, String udfs)
This API can delete udfs for given scheamid. It returns json string with errcode as 0 for success or -1 for error. It also contains reason for failure when it fails.
The know more about udf, please see the udf section
public String getSchemaDepGraph(long schema_id, boolean bfs)
The schema is structured as graph within the stream manager. This api will return the graph for the given schema. The bfs defines breadth first travel if set as true and it will use dfs (depth first). As of now only bfs as true is supported.
Upon error it will return json string with errcode as -1 with the reasons for failure.
Please see stream section to know more on the graph
public String getStreamDepGraph(long schema_id, long stream_id, boolean only_dep)
This api will return the graph for the given stream for a schema. only_dep = true means return the depth part else returns the node information. Upon error it will return json string with errcode as -1 with the reasons for failure.
Please see stream section to know more on the graph
public void resetMlHelper(BangDBMLHelper bmlh)
This to reset the ml helper at run time. This could be used for various reasons but mostly supported for changes in the cluster config for ml
public synchronized void closeBangdbStreamManager(boolean force)
This will close the stream manager. Please note if force is set to be false then it will simply reduce the reference count and if the count is 0 then it will close the stream manager. But if force is set to be as false then it will simply close the stream manager irrespective of how many references are there or not