Motivation

Running on the edge side, it is common to encounter network connection failure. For rules which sink to external systems, especially remote external systems, it is important to cache the data during failures such as network disconnection and resend once reconnected.

Previously, eKuiper has some degree of support for sink caching. It provides a global configuration to switch cache; system/rule level configuration for memory cache serialization interval. However, the cache is only in memory and a copy to the db(mirror of the memory) and do not define clear resend strategy. It is actually part of the failure recovery mechanism (qos). In this proposal, the cache will be saved in both memory and disk so that the cache capacity becomes much bigger; it will also continuously detect failure recovery status and resend without restarting the rule.

Flow

The cache happens only in sink because that is the only place to send data outside eKuiper. Each sink can have its own cache mechanism configured. The flows in each sink is alike. If cache is enabled, all sink events will go through a two phase process: firstly save all to cache; then delete the cache once received an ack.

  1. Error detection: after sending failure, the sink should identify recoverable failure (network and so on) by returning a specific error type which will return a failure ack so that the cache can be preserved. For successful sink or unrecoverable errors, a success ack will be sent to delete the cache.
  2. Cache mechanism: The cache will be saved in the memory firstly. If it exceeds the memory threshold, the later cache will be saved into the disk. Once the disk cache pass the disk storage threshold, the cache will start to rotate: the earliest cache in the memory will be dropped and replaced by loading the earliest cache in the disk. 
  3. Resend strategy: If an ack is pending, waiting a success ack to send the cache data one by one. Otherwise, when a new data come, send the first data in cache to detect network conditions. If successful, send all caches in order (mem + disk) with a defined interval in chain, which means send the next data when receiving a successful ack.

Configuration

There wil be two levels of sink cache configuration. A global configuration in etc/kuiper.yaml to define the default behavior for all rules. And a rule sink level definition to override the default behaviors.

The configuration items:

Internal Configuration for Sqlite

The default storage will be sqlite. The scenario for cache storage have these characterstics:

  1. Sequential writing

  2. Adapt to limited CPU+Memory platform

  3. Async, await mechanism (non-transaction)

  4. Append-only, No Edit

We will use these sqlite configurations by default:

  1. Set Page Size as same as OS’s page size (getconf PAGESIZE)
    PRAGMA page_size = 4096;

  2. Set as WAL mode 
    PRAGMA journal_mode=WAL;

  3. Set synchronous mode as full, so that it won’t corrupt the database file when experiencing power down.
    PRAGMA synchronous=FULL 

  4. Set checkpoint to auto or disable it and mange it by self-define interval. 
    Enable: PRAGMA wal_autocheckpoint; or sqlite3_wal_autocheckpoint(sqlite3 *db, int N);
    Disable: PRAGMA wal_autocheckpoint=N;

Implementation consideration


功能 1289 数据缓存机制

缘起


运行在边缘端,经常会遇到网络连接故障。对于汇入外部系统的规则,尤其是远程外部系统,在网络断开等故障期间,必须对数据进行缓存,并在重新连接后重新发送。

此前,eKuiper在一定程度上支持 sink 缓存。它提供了一个全局配置来切换缓存开启;系统/规则级配置用于内存缓存的序列化时间间隔。然而,缓存只是在内存中和复制到DB(内存的镜像)中,并没有定义明确的重发策略。它实际上是故障恢复机制(QOS)的一部分。

在新功能中,缓存将同时保存在内存和磁盘中,这样缓存的容量就变得更大了;它还将持续检测故障恢复状态,并在不重新启动规则的情况下重新发送。

流程

缓存只发生在sink中,因为那是eKuiper之外唯一可以发送数据的地方。每个sink都可以配置自己的缓存机制。每个sink的缓存流程是相同的。如果启用了缓存,所有sink的事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到ack后删除缓存。

配置


Sink 缓存的配置有两个层次。`etc/kuiper.yaml` 中的全局配置,定义所有规则的默认行为。还有一个规则 sink 层的定义,用来覆盖默认行为。

Sqlite的内部配置


默认的存储将是sqlite。缓存存储的方案有这些特征。

  1. 顺序写入
  2. 适应有限资源的CPU+Memory平台
  3. 异步,等待机制(非 transaction)
  4. 只追加,不编辑

基于这些特性,我们将默认使用这些sqlite的配置:

其他实现考虑