Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The default storage will be sqlite. The scenario for cache storage have these characterstics:

  1. Sequential writing

  2. Adapt to limited CPU+Memory platform

  3. Async, await mechanism (non-transaction)

  4. Append-only, No Edit

We will use these sqlite configurations by default:

  1. Set Page Size as same as OS’s page size (getconf PAGESIZE)
    PRAGMA page_size = 4096;

  2. Set as WAL mode 
    PRAGMA journal_mode=WAL;

  3. Set synchronous mode as full, so that it won’t corrupt the database file when experiencing power down.
    PRAGMA synchronous=FULL 

  4. Set checkpoint to auto or disable it and mange it by self-define interval. 
    Enable: PRAGMA wal_autocheckpoint; or sqlite3_wal_autocheckpoint(sqlite3 *db, int N);
    Disable: PRAGMA wal_autocheckpoint=N;

Implementation consideration

  • If the disk storage is sqlite, all the caches will be save to `data/cache.db` file. Each sink will have an unique sqlite table to save the cache.
  • Add cached count to the sink metrics

...

功能 1289 数据缓存机制

缘起


运行在边缘端,经常会遇到网络连接故障。对于汇入外部系统的规则,尤其是远程外部系统,在网络断开等故障期间,必须对数据进行缓存,并在重新连接后重新发送。

此前,eKuiper在一定程度上支持 sink 缓存。它提供了一个全局配置来切换缓存开启;系统/规则级配置用于内存缓存的序列化时间间隔。然而,缓存只是在内存中和复制到DB(内存的镜像)中,并没有定义明确的重发策略。它实际上是故障恢复机制(QOS)的一部分。

在新功能中,缓存将同时保存在内存和磁盘中,这样缓存的容量就变得更大了;它还将持续检测故障恢复状态,并在不重新启动规则的情况下重新发送。

流程

缓存只发生在sink中,因为那是eKuiper之外唯一可以发送数据的地方。每个sink都可以配置自己的缓存机制。每个sink的缓存流程是相同的。如果启用了缓存,所有sink的事件都会经过两个阶段:首先是将所有内容保存到缓存中;然后在收到ack后删除缓存。

  • 错误检测:发送失败后,sink应该通过返回特定的错误类型来识别可恢复的失败(网络等),这将返回一个失败的ack,这样缓存就可以被保留下来。对于成功的发送或不可恢复的错误,将发送一个成功的ack来删除缓存。
  • 缓存机制。缓存将首先被保存在内存中。如果超过了内存的阈值,后面的缓存将被保存到磁盘中。一旦磁盘缓存超过磁盘存储阈值,缓存将开始rotate:内存中最早的缓存将被丢弃,并加载磁盘中最早的缓存来代替。
  • 重发策略。如果有一个Ack正在发送中,则等待一个成功的Ack以继续发送下个缓存数据。否则,当有新的数据到来时,发送缓存中的第一个数据以检测网络状况。如果 Ack 成功,按顺序链式发送所有的缓存(mem + disk)。链式发送可定义一个发送间隔,防止形成消息风暴。

配置


Sink 缓存的配置有两个层次。`etc/kuiper.yaml` 中的全局配置,定义所有规则的默认行为。还有一个规则 sink 层的定义,用来覆盖默认行为。

  • enableCache:是否启用sink cache。缓存存储配置遵循 `etc/kuiper.yaml` 中定义的元数据存储的配置。
  • memoryCacheThreshold:要缓存在内存中的消息数量。出于性能方面的考虑,最早的缓存信息被存储在内存中,以便在故障恢复时立即重新发送。这里的数据会因为断电等故障而丢失。
  • maxDiskCache: 缓存在磁盘中的信息的最大数量。磁盘缓存是FIFO。如果磁盘缓存满了,最早的一页信息将被加载到内存缓存中,取代旧的内存缓存。
  • bufferPageSize。缓冲页是批量读/写到磁盘的单位,以防止频繁的IO。如果页面未满,eKuiper因硬件或软件错误而崩溃,最后未写入磁盘的页面将被丢失。
  • resendInterval: 故障恢复后重新发送信息的时间间隔,防止信息风暴。
  • cleanCacheAtStop:是否在规则停止时清理所有缓存,以防止规则重新启动时对过期消息进行大量重发。如果不设置为true,一旦规则停止,内存缓存将被存储到磁盘中。否则,内存和磁盘规则会被清理掉。

Sqlite的内部配置


默认的存储将是sqlite。缓存存储的方案有这些特征。

  1. 顺序写入
  2. 适应有限资源的CPU+Memory平台
  3. 异步,等待机制(非 transaction)
  4. 只追加,不编辑

基于这些特性,我们将默认使用这些sqlite的配置:

  • 将页面大小设置为与操作系统的页面大小相同(getconf PAGESIZE)。
    PRAGMA page_size = 4096。
  • 设置为WAL模式
    PRAGMA journal_mode=WAL。
  • 将同步模式设置为full,这样在经历断电时不会损坏数据库文件。
    PRAGMA synchronous=FULL
  • 将检查点设置为自动或禁用,并通过自定的时间间隔进行管理。
    启用。PRAGMA wal_autocheckpoint; 或者 sqlite3_wal_autocheckpoint(sqlite3 *db, int N)。
    禁用。PRAGMA wal_autocheckpoint=N。

其他实现考虑

  • 如果磁盘存储是sqlite,所有的缓存将被保存到`data/cache.db`文件。每个sink将有一个唯一的sqlite表来保存缓存。
  • 将缓存的计数添加到sink 的 metric 中
  • 融入 checkpoint 系统做故障恢复