Currently, we only support to join the snapshot of stream called table in memory. That is only suitable for small amount of data. When doing enrichment, it is desirable to join a permanent data source like database. The goal of this feature is to support join data from a permanent place and do the lookup on demand without loading all the data into the memory.
insert
/append
. For change data capture (CDC) scenarios, the sink can write out bounded or unbounded streams with insert, update, and delete rows.Updatable table for MQTT: do we support such source or let it read from a memory sink? Or create a permanent
memory table which can be accessed by all rules
Another problem: for scan source, how to deal with table/stream from the same topic. We can hardly guarantee the events arrived at the same time.
table
and generate lookup command(e.g. a SQL on db) on demand.insert
, delete
and update
for available sink like SQL sink and memory sink.Provide a way to bind the table
to a physical data source like db table. This is useful for enriching stream data.
Apparently, only a few of sources is suitable as lookup table which requires the source itself is queryable. The potential supported sources are:
Suitable shipped source plugins:
We can reuse the table statements to create tables, but now we have two table types: permanent and temporary. The usage is almost the same as previous table.
CREATE STREAM demoStream() WITH (DATASOURCE="demo", FORMAT="json", TYPE="mqtt")
CREATE TABLE myTable() WITH (DATASOURCE=\"myTable\", TYPE=\"sql\", KIND=\"lookup\")
SELECT * FROM demoStream INNER JOIN myTable on demoStream.id = myTable.id
Discussion: Do we provide option to use scan for table if the source supports lookup?
KIND
to specify if it is permanent or not.// Lookup receive lookup values to construct the query and return query results
Lookup(ctx StreamContext, lookupValues []interface{}) ([]interface{}, error)
Validations to add:
LookupSource
instance can be used as a permanent table.Unlike temporary table, permanent table will keep running until it is dropped implicitly. Temporary table will only run if the rule is running.
Discussion: Lookup cache TTL setting?
SELECT * FROM table WHERE id=?
id
field of the stream. Firstly, query lookup cache by this value. The lookup cache is a map of the db lookup return result indexed by the id. If hit and not pass TTL, just return the result. Lookup cache is implemented in the table runtime.Some source type itself do not support lookup, but it is still valuable to have a temporary
snapshot of data for joining as we have already provided. The current approach has two limitations:
Propose to use memory permanent table as a conversion middleware for stream/table conversion. Once the memory sink supports updatable, these two limitations are addressed.
Discussion: Do we pursuit for direct permanent support for scan sources?
Just add the mechanism to identify LookupSource
.
Discussion: Make this as low priority?
We provide the API to query the table which converts into a call to the lookup method.
For temporary table, there is no handle to access the table, it is hard to implement yet.
Provide a general mechanism to update the sink "table". Similar to lookup source, only a few sinks are "updatable" naturally. The sink must support insert, update and delete.
Flink implementation Each event has a RowType: INSERT, UPDATE, DELETE
Add a change type as a verb to the sink result. It is also the switch to indicate if the sink is updatable.
{
"actions": [
{
"sql": {
"database": "mydb",
"table": "mytable",
"changeTypeCol": "verb"
}
}
]
}
In this example, the data send to sink should have a field named verb
whose value is in ["INSERT","UPDATE","DELETE"]. An example data is as below:
{
"verb": "UPDATE",
"id": 5,
"name": "aaa"
}
Discussion: Verb as a field or a meta? During the data transformation, the meta may not exist.