Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Running on the edge side, it is common to encounter network connection failure. For rules which sink to external systemsystems, especially remote external systemsystems, it is important to cache the data during failures such as network disconnection and resend once reconnected.

...

The cache happens only in sink because that is the only place to send data outside eKuiper. Each sink can have its own cache mechanism configured. The flows in each sink is alike. If cache is enabled, all sink events will go through a two phase process: firstly save all to cache; then delete the cache once received an ack.

  1. Error detection: after sending failure, the sink should identify recoverable failure (network and so on) by returning a specific error type which will trigger the cachingwill return a failure ack so that the cache can be preserved. For successful sink or unrecoverable errors, a success ack will be sent to delete the cache.
  2. Cache mechanism: memory cache store the earliest failed data.After exceeding The cache will be saved in the memory firstly. If it exceeds the memory threshold, new cache store to disk. After exceeding the disk the later cache will be saved into the disk. Once the disk cache pass the disk storage threshold, the memory cache as will start to rotate: the earliest events cache in the memory will be dropped and read in replaced by loading the earliest page of cache in the disk. 
  3. Resend strategy: when new data comes in (discuss how to trigger?) If an ack is pending, waiting a success ack to send the cache data one by one. Otherwise, when a new data come, send the first data from the memory in cache to detect network conditions. If successful, send all caches in order (mem + disk) with a defined interval in chain, which means send the next data when receiving a successful ack.

Configuration

There wil be two levels of sink cache configuration. A global configuration in etc/kuiper.yaml to define the default behavior for all rules. And a rule sink level definition to override the default behaviors.

...

  • enableCache: whether to enable sink cache.
  • pageSize: Page is the unit to read/write to disk batchly to prevent frequent IO. If the page is not full and eKuiper crashed by hardware or software errors, the last page will be lost.
  • The cache storage configuration follows the metadata storage defined in etc/kuiper.yaml
  • memoryCacheThresholdmemoryCacheSize: the number of messages to be cached in one page. The maximum cache size in memory is two pages. The memory will keep two section of pages: the first page of messages to be resent and the last messages which cannot fill a page yet. the memory. For performance reason, the earliest cached messages are stored in the memory in order to resend immediately when the failures are restored. The data here will be lost due to failures like power off.
  • maxDiskCache Once the page is full, it will be dumped to the disk.diskCacheSize: the maximum number of messages to be cached in the disk. The disk cache is FIFO. If the disk cache is full, the earliest page of messages will be droppedloaded into the memory cache to replace the old one.
  • bufferPageSize: Buffer page is the unit to read/write to disk batchly to prevent frequent IO. If the page is not full and eKuiper crashed by hardware or software errors, the last page which have not been written to the disk will be lost.
  • resendInterval: the interval for resending the messages after failure recovered to prevent message storm.
  • cleanCacheAtStop: whether to clean all caches when the rule stops to prevent a burst of resend for outdated messages when the rule restarts. If not set to true, the memory cache will be stored into the disk once the rule is stopping. Otherwise, the memory and disk rules are cleaned up.

Internal Configuration for Sqlite

The default storage will be sqlite.

Implementation consideration

  • The If the disk storage is sqlite, all the caches will be sqlitesave to `data/cache.db` file. Each sink will have a an unique sqlite table to save the cache.
  • Add cached count to the sink metrics

...