Motivation

Running on the edge side, it is common to encounter network connection failure. For rules which sink to external systems, especially remote external systems, it is important to cache the data during failures such as network disconnection and resend once reconnected.

Previously, eKuiper has some degree of support for sink caching. It provides a global configuration to switch cache; system/rule level configuration for memory cache serialization interval. However, the cache is only in memory and a copy to the db(mirror of the memory) and do not define clear resend strategy. It is actually part of the failure recovery mechanism (qos). In this proposal, the cache will be saved in both memory and disk so that the cache capacity becomes much bigger; it will also continuously detect failure recovery status and resend without restarting the rule.

Flow

The cache happens only in sink because that is the only place to send data outside eKuiper. Each sink can have its own cache mechanism configured. The flows in each sink is alike. If cache is enabled, all sink events will go through a two phase process: firstly save all to cache; then delete the cache once received an ack.

  1. Error detection: after sending failure, the sink should identify recoverable failure (network and so on) by returning a specific error type which will return a failure ack so that the cache can be preserved. For successful sink or unrecoverable errors, a success ack will be sent to delete the cache.
  2. Cache mechanism: The cache will be saved in the memory firstly. If it exceeds the memory threshold, the later cache will be saved into the disk. Once the disk cache pass the disk storage threshold, the cache will start to rotate: the earliest cache in the memory will be dropped and replaced by loading the earliest cache in the disk. 
  3. Resend strategy: If an ack is pending, waiting a success ack to send the cache data one by one. Otherwise, when a new data come, send the first data in cache to detect network conditions. If successful, send all caches in order (mem + disk) with a defined interval in chain, which means send the next data when receiving a successful ack.

Configuration

There wil be two levels of sink cache configuration. A global configuration in etc/kuiper.yaml to define the default behavior for all rules. And a rule sink level definition to override the default behaviors.

The configuration items:

  • enableCache: whether to enable sink cache. The cache storage configuration follows the metadata storage defined in etc/kuiper.yaml
  • memoryCacheThreshold: the number of messages to be cached in the memory. For performance reason, the earliest cached messages are stored in the memory in order to resend immediately when the failures are restored. The data here will be lost due to failures like power off.
  • maxDiskCache: the maximum number of messages to be cached in the disk. The disk cache is FIFO. If the disk cache is full, the earliest page of messages will be loaded into the memory cache to replace the old one.
  • bufferPageSize: Buffer page is the unit to read/write to disk batchly to prevent frequent IO. If the page is not full and eKuiper crashed by hardware or software errors, the last page which have not been written to the disk will be lost.
  • resendInterval: the interval for resending the messages after failure recovered to prevent message storm.
  • cleanCacheAtStop: whether to clean all caches when the rule stops to prevent a burst of resend for outdated messages when the rule restarts. If not set to true, the memory cache will be stored into the disk once the rule is stopping. Otherwise, the memory and disk rules are cleaned up.

Internal Configuration for Sqlite

The default storage will be sqlite. The scenario for cache storage have these characterstics:

  1. Sequential writing

  2. Adapt to limited CPU+Memory platform

  3. Async, await mechanism (non-transaction)

  4. Append-only, No Edit

We will use these sqlite configurations by default:

  1. Set Page Size as same as OS’s page size (getconf PAGESIZE)
    PRAGMA page_size = 4096;

  2. Set as WAL mode 
    PRAGMA journal_mode=WAL;

  3. Set synchronous mode as full, so that it won’t corrupt the database file when experiencing power down.
    PRAGMA synchronous=FULL 

  4. Set checkpoint to auto or disable it and mange it by self-define interval. 
    Enable: PRAGMA wal_autocheckpoint; or sqlite3_wal_autocheckpoint(sqlite3 *db, int N);
    Disable: PRAGMA wal_autocheckpoint=N;

Implementation consideration

  • If the disk storage is sqlite, all the caches will be save to `data/cache.db` file. Each sink will have an unique sqlite table to save the cache.
  • Add cached count to the sink metrics
  • Integrate into the checkpoint mechanism
  • Limitation: implement sync mode firstly

功能 1289 数据缓存机制

缘起


运行在边缘端,经常会遇到网络连接故障。对于汇入外部系统的规则,尤其是远程外部系统,在网络断开等故障期间,必须对数据进行缓存,并在重新连接后重新发送。

此前,eKuiper在一定程度上支持 sink 缓存。它提供了一个全局配置来切换缓存开启;系统/规则级配置用于内存缓存的序列化时间间隔。然而,缓存只是在内存中和复制到DB(内存的镜像)中,并没有定义明确的重发策略。它实际上是故障恢复机制(QOS)的一部分。

在新功能中,缓存将同时保存在内存和磁盘中,这样缓存的容量就变得更大了;它还将持续检测故障恢复状态,并在不重新启动规则的情况下重新发送。

流程

缓存只发生在sink中,因为那是eKuiper之外唯一可以发送数据的地方。每个sink都可以配置自己的缓存机制。每个sink的缓存流程是相同的。如果启用了缓存,所有sink的事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到ack后删除缓存。

  • 错误检测:发送失败后,sink应该通过返回特定的错误类型来识别可恢复的失败(网络等),这将返回一个失败的ack,这样缓存就可以被保留下来。对于成功的发送或不可恢复的错误,将发送一个成功的ack来删除缓存。
  • 缓存机制。缓存将首先被保存在内存中。如果超过了内存的阈值,后面的缓存将被保存到磁盘中。一旦磁盘缓存超过磁盘存储阈值,缓存将开始rotate:内存中最早的缓存将被丢弃,并加载磁盘中最早的缓存来代替。
  • 重发策略。如果有一个Ack正在发送中,则等待一个成功的Ack以继续发送下个缓存数据。否则,当有新的数据到来时,发送缓存中的第一个数据以检测网络状况。如果 Ack 成功,按顺序链式发送所有的缓存(mem + disk)。链式发送可定义一个发送间隔,防止形成消息风暴。

配置


Sink 缓存的配置有两个层次。`etc/kuiper.yaml` 中的全局配置,定义所有规则的默认行为。还有一个规则 sink 层的定义,用来覆盖默认行为。

  • enableCache:是否启用sink cache。缓存存储配置遵循 `etc/kuiper.yaml` 中定义的元数据存储的配置。
  • memoryCacheThreshold:要缓存在内存中的消息数量。出于性能方面的考虑,最早的缓存信息被存储在内存中,以便在故障恢复时立即重新发送。这里的数据会因为断电等故障而丢失。
  • maxDiskCache: 缓存在磁盘中的信息的最大数量。磁盘缓存是FIFO。如果磁盘缓存满了,最早的一页信息将被加载到内存缓存中,取代旧的内存缓存。
  • bufferPageSize。缓冲页是批量读/写到磁盘的单位,以防止频繁的IO。如果页面未满,eKuiper因硬件或软件错误而崩溃,最后未写入磁盘的页面将被丢失。
  • resendInterval: 故障恢复后重新发送信息的时间间隔,防止信息风暴。
  • cleanCacheAtStop:是否在规则停止时清理所有缓存,以防止规则重新启动时对过期消息进行大量重发。如果不设置为true,一旦规则停止,内存缓存将被存储到磁盘中。否则,内存和磁盘规则会被清理掉。

Sqlite的内部配置


默认的存储将是sqlite。缓存存储的方案有这些特征。

  1. 顺序写入
  2. 适应有限资源的CPU+Memory平台
  3. 异步,等待机制(非 transaction)
  4. 只追加,不编辑

基于这些特性,我们将默认使用这些sqlite的配置:

  • 将页面大小设置为与操作系统的页面大小相同(getconf PAGESIZE)。
    PRAGMA page_size = 4096。
  • 设置为WAL模式
    PRAGMA journal_mode=WAL。
  • 将同步模式设置为full,这样在经历断电时不会损坏数据库文件。
    PRAGMA synchronous=FULL
  • 将检查点设置为自动或禁用,并通过自定的时间间隔进行管理。
    启用。PRAGMA wal_autocheckpoint; 或者 sqlite3_wal_autocheckpoint(sqlite3 *db, int N)。
    禁用。PRAGMA wal_autocheckpoint=N。

其他实现考虑

  • 如果磁盘存储是sqlite,所有的缓存将被保存到`data/cache.db`文件。每个sink将有一个唯一的sqlite表来保存缓存。
  • 将缓存的计数添加到sink 的 metric 中
  • 融入 checkpoint 系统做故障恢复
  • 限制:第一版仅实现 sync 模式,若 sink 配置为 async 模式,缓存不起作用





  • No labels