Requirement

eKuiper sink can send data out to external systems. By default, the sink produce data for each event. But this could be a problem if the data throughput is large:

In order to save bandwidth with higher compression ratio and boost performance, we would like to introduce batch send and compression in sink.


Design

For batch send, we can achieve by two approaches:

  1. Use window to batch data (supported now)

    1. Pros: apply for all sinks

    2. Cons:

      1. Not suitable for continuous query semantically, thus may make the SQL more complex even no window is needed

      2. Cannot control in sink level, for example, cannot save locally in real time while publish to the cloud in batch

  2. Set batch property for sink (to be implemented)

    1. Flexible

Usage

Add new properties into [sink common properties](https://ekuiper.org/docs/en/latest/guide/sinks/overview.html#common-properties ).

Use case

  1. Publish to mqtt for every 10 seconds defined by window in protobuf format and compress by gzip

{
	"id": "rule1",
	"sql": "SELECT * FROM demo GROUP BY TumblingWindow(ss, 10)",
	"actions": [{
      "mqtt": {
        "server": "tcp://yourserver:1883",
        "topic": "mytopic",
        "format":"protobuf",
        "schema":"myschema.message",
        "compressionType": "gzip"
      }
	}]
}

2. Publish to mqtt for every 100 events or 100 seconds and compress by gzip; But publish to local mqtt for each event

{
	"id": "rule1",
	"sql": "SELECT * FROM demo",
	"actions": [{
      "mqtt": {
        "server": "tcp://cloud:1883",
        "topic": "remote",
        "sendSingle":"true",
        "batchSize": 100,
        "lingerInterval": 100000,
        "compressionType": "gzip"
      }
	},{
		"mqtt": {
        "server": "tcp://local:1883",
        "topic": "local",
        "sendSingle":"true"
      }
	}]
}

? Adapt to file format, currently splited by lines \n
? MQTT package size check

Implementation

  1. compression: Just like format, feed the batch and compression properties into the transform GenTransform(internal/topo/transform/template.go) function. Then each sink can use context ctx.TransformOutput(data) to do the transformation which will have compression is property set.

  2. batch: In SinkNode(internal/topo/node/sink_node.go), accumulate by strategy before passing on to the sink implementation. Need to consider state saving.

  3. In each sink implementations, if batch/compression is not supported, return error in the validate function. If supported, check if collect logic needs to be changed.

Source consideration (Future)

“Drinking Our Own Champagne”, the compressed data produced by sink should be abled to be received by our source. This will add new properties like: