Users need to differentiate between newly incoming data and resent data to handle them differently. For instance, they may want to use the newly incoming data for instant calculations and archive the resent data. This proposal suggests several enhancements to support the distinction between newly incoming data and resent data.
We will add properties to sinks including common properties and sink specific properties.
resendQueue
: boolean, default false
. If true
, the sink will send the resent data in separate queue from the normal data. If false
, which is the default case, the sink will send the resent data in the same queue as the normal data in time order.resendPriority
: int, could be -1, 0 and 1, default is 0. If 0, the resent data will be sent with the same priority as the normal data. If 1, the resent data will be sent with higher priority than the normal data. If -1, the resent data will be sent with lower priority than the normal data.resendField
: string, the field name to update and indicate that the data is resent. If not set, the data will be resent as is. If set, the data will be resent with the field set to true. The field will be added if not exist. If the field already exists, it will be updated to true.Each sink may have its own properties to control the resend behavior. Name some as an example:
TODO: There are actually a lot of properties affect the send behavior like dataTemplate, dataEncoding, etc. Shall we use a send config key to configure them as a whole?
resendTopic
: string, default is the same as the normal topic. If set, the resent data will be sent to this topic.resendUrl
: string, default is the same as the normal path. If set, the resent data will be sent to this path.resendPath
: string, default is the same as the normal path. If set, the resent data will be sent to this path.Add collectResend
method to SinkNode
interface:
type Sink interface { // Open Should be sync function for normal case. The container will run it in go func Open(ctx StreamContext) error // Configure Called during initialization. Configure the sink with the properties from rule action definition Configure(props map[string]interface{}) error // Collect Called when each row of data has transferred to this sink Collect(ctx StreamContext, data interface{}) error // Collect Called when each row of data has transferred to this sink CollectResend(ctx StreamContext, data interface{}) error Closable }
Compatibility: If collectResend
is not set, use collect
instead.
If resendQueue
is set, add a cache queue for each sink which means there will be two output channels . The cache queue is a FIFO queue. Otherwise, keep the default behavior that one FIFO queue for all sinks.
collect
method; for cache queue, call collectResend
method.resendPriority
by setting channel priorities.resendField
by updating the data.Take MQTT as an example:
CollectResend
function to collect the resent data.CollectResend
, send the data to the resend topic if resendTopic
is set. Otherwise, send the data to the normal topic.