Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Requirement

eKuiper sink can send data out to external systems. By default, the sink produce data for each event. But this could be a problem if the data throughput is large:

...

{
	"id": "rule1",
	"sql": "SELECT * FROM demo GROUP BY TumblingWindow(ss, 10)",
	"actions": [{
      "mqtt": {
        "server": "tcp://cloud:1883",
        "topic": "remote",
        "sendSingle":"true",
        "batchSize": 100,
        "lingerInterval": 100000,
        "compressionType": "gzip"
      }
	},{
		"mqtt": {
        "server": "tcp://local:1883",
        "topic": "local",
        "sendSingle":"true"
      }
	}]
}

? Adapt to file format, currently splited by lines \n
? MQTT package size check

Implementation

  1. compression: Just like format, feed the batch and compression properties into the transform GenTransform(internal/topo/transform/template.go) function. Then each sink can use context ctx.TransformOutput(data) to do the transformation which will have compression is property set.

  2. batch: In SinkNode(internal/topo/node/sink_node.go), accumulate by strategy before passing on to the sink implementation. Need to consider state saving.

  3. In each sink implementations, if batch/compression is not supported, return error in the validate function. If supported, check if collect logic needs to be changed.

...