Motivation

Currently, we only support to join the snapshot of stream called table in memory. That is only suitable for small amount of data. When doing enrichment, it is desirable to join a permanent data source like database. The goal of this feature is to support join data from a permanent place and do the lookup on demand without loading all the data into the memory.

Concepts

Updatable table for MQTT: do we support such source or let it read from a memory sink? Or create a permanent memory table which can be accessed by all rules

Another problem: for scan source, how to deal with table/stream from the same topic. We can hardly guarantee the events arrived at the same time.

Proposals

  1. Permanent lookup source for table: support to bind physical table as table and generate lookup command(e.g. a SQL on db) on demand.
  2. Updatable sink: supports insert, delete and update for available sink like SQL sink and memory sink.

Permanent Lookup Table

Provide a way to bind the table to a physical data source like db table. This is useful for enriching stream data.

Apparently, only a few of sources is suitable as lookup table which requires the source itself is queryable. The potential supported sources are:

Suitable shipped source plugins:

Usage

We can reuse the table statements to create tables, but now we have two table types: permanent and temporary. The usage is almost the same as previous table.

  1. Create a stream.
    CREATE STREAM demoStream() WITH (DATASOURCE="demo", FORMAT="json", TYPE="mqtt")
    
  2. Create permanent table. In the source configuration yaml, the database connection must be configured. This can map to a real table in the db.
    CREATE TABLE myTable() WITH (DATASOURCE=\"myTable\", TYPE=\"sql\", KIND=\"lookup\")
    
  3. Join table with a stream
    SELECT * FROM demoStream INNER JOIN myTable on demoStream.id = myTable.id
    
New API

Discussion: Do we provide option to use scan for table if the source supports lookup?

  1. In table definition, support a property KIND to specify if it is permanent or not.
  2. Add lookup source interface.
    // Lookup receive lookup values to construct the query and return query results
     Lookup(ctx StreamContext, lookupValues []interface{}) ([]interface{}, error)
    

Validations to add:

Unlike temporary table, permanent table will keep running until it is dropped implicitly. Temporary table will only run if the rule is running.

Flow

Discussion: Lookup cache TTL setting?

  1. When creating a rule which refers to a permanent table, parse the rule logic to generate the lookup query template. In this example, in SQL sink, the query template is SELECT * FROM table WHERE id=?
  2. When running, driven by the stream events. Each event has a lookup value. In this example, the lookup value is id field of the stream. Firstly, query lookup cache by this value. The lookup cache is a map of the db lookup return result indexed by the id. If hit and not pass TTL, just return the result. Lookup cache is implemented in the table runtime.
  3. If not hit, query the DB by SQL template with the new event value and return. This is implemented in the source side.
  4. Save the new return value to cache.

Stream to permanent table

Some source type itself do not support lookup, but it is still valuable to have a temporary snapshot of data for joining as we have already provided. The current approach has two limitations:

Propose to use memory permanent table as a conversion middleware for stream/table conversion. Once the memory sink supports updatable, these two limitations are addressed.

Discussion: Do we pursuit for direct permanent support for scan sources?

Plugin support

Just add the mechanism to identify LookupSource.

Query the table

Discussion: Make this as low priority?

We provide the API to query the table which converts into a call to the lookup method.

For temporary table, there is no handle to access the table, it is hard to implement yet.

Updatable Sink

Provide a general mechanism to update the sink "table". Similar to lookup source, only a few sinks are "updatable" naturally. The sink must support insert, update and delete.

Usage

Flink implementation Each event has a RowType: INSERT, UPDATE, DELETE

Add a change type as a verb to the sink result. It is also the switch to indicate if the sink is updatable.

{
   "actions": [
      {
         "sql": {
            "database": "mydb",
            "table": "mytable",
            "changeTypeCol": "verb"
         }
      }
   ]
}

In this example, the data send to sink should have a field named verb whose value is in ["INSERT","UPDATE","DELETE"]. An example data is as below:

{
   "verb": "UPDATE",
   "id": 5,
   "name": "aaa"
}

Discussion: Verb as a field or a meta? During the data transformation, the meta may not exist.