- Status: Design
- Author: Jiyong Huang
- Discussion: https://github.com/lf-edge/ekuiper/issues/1637
Requirement
eKuiper sink can send data out to external systems. By default, the sink produce data for each event. But this could be a problem if the data throughput is large:
...
{
"id": "rule1",
"sql": "SELECT * FROM demo GROUP BY TumblingWindow(ss, 10)",
"actions": [{
"mqtt": {
"server": "tcp://cloud:1883",
"topic": "remote",
"sendSingle":"true",
"batchSize": 100,
"lingerInterval": 100000,
"compressionType": "gzip"
}
},{
"mqtt": {
"server": "tcp://local:1883",
"topic": "local",
"sendSingle":"true"
}
}]
}
? Adapt to file format, currently splited by lines \n
? MQTT package size check
Implementation
compression: Just like format, feed the batch and compression properties into the transform GenTransform(internal/topo/transform/template.go) function. Then each sink can use context ctx.TransformOutput(data) to do the transformation which will have compression is property set.
batch: In SinkNode(internal/topo/node/sink_node.go), accumulate by strategy before passing on to the sink implementation. Need to consider state saving.
In each sink implementations, if batch/compression is not supported, return error in the validate function. If supported, check if collect logic needs to be changed.
...