Users need to differentiate between newly incoming data and resent data to handle them differently. For instance, they may want to use the newly incoming data for instant calculations and archive the resent data. This proposal suggests several enhancements to support the distinction between newly incoming data and resent data.

Use Cases

API Design

We will add properties to sinks including common properties and sink specific properties.

Common Properties

Sink Specific Properties

Each sink may have its own properties to control the resend behavior. Name some as an example:

TODO: There are actually a lot of properties affect the send behavior like dataTemplate, dataEncoding, etc. Shall we use a send config key to configure them as a whole?

MQTT Sink

REST Sink

File sink

Implementation

Sink API

Add collectResend method to SinkNode interface:


type Sink interface { // Open Should be sync function for normal case. The container will run it in go func Open(ctx StreamContext) error // Configure Called during initialization. Configure the sink with the properties from rule action definition Configure(props map[string]interface{}) error // Collect Called when each row of data has transferred to this sink Collect(ctx StreamContext, data interface{}) error // Collect Called when each row of data has transferred to this sink CollectResend(ctx StreamContext, data interface{}) error Closable }


Compatibility: If collectResend is not set, use collect instead.

Sink Cache

If resendQueue is set, add a cache queue for each sink which means there will be two output channels . The cache queue is a FIFO queue. Otherwise, keep the default behavior that one FIFO queue for all sinks.

Sink Node

  1. Switch around 2 output queues. For normal queue, call collect method; for cache queue, call collectResend method.
  2. Support resendPriority by setting channel priorities.
  3. Support resendField by updating the data.

Sink Implementations

Take MQTT as an example:

  1. Add CollectResend function to collect the resent data.
  2. In CollectResend, send the data to the resend topic if resendTopic is set. Otherwise, send the data to the normal topic.