当前 eKuiper 支持在 Create Stream 的时候指定数据结构类型等。然而该方式有几个问题:

  • 额外性能消耗。当前的 schema 没有与数据原本的格式 schema 关联,因此在数据解码之后,需要再额外进行一次 validation/转换;而且该过程基于反射动态完成,性能较差。例如,使用 protobuf 等强schema时,经 protobuf 解码之后的数据应当已经符合格式,不应再进行转换。

  • Schema 定义繁琐。同样无法利用数据本身格式的 schema,而是需要额外配置。

  • Schemaless 时,没有 schema 信息提供给 Flow editor.

本 PIP 旨在梳理 Schema 定义方式,在兼容原有配置的基础上,支持高效而且便捷的 Schema 配置方式,并力求提供 Schema API,方便 UI 使用。

总体设计

Schema 包含两个部分,一个是数据编解码的 Schema,强类型的编解码方式都有自带 Schema,例如 Protobuf,Avro 等;另一种是 eKuiper source 层的 Schema。

整体上,我们将支持3种递进的 Schema 方式:

  1. Schemaless,用户无需定义任何形式的 schema,主要用于弱结构化数据流,数据结构经常变化的情况。目前运行时已支持。可添加 test schema API,完成运行时 Schema 推断,自动生成 schema 定义。使用该方式,Flow Editor 将无法做 schema 推断。

  2. Schema hint,用户在 eKuiper source 层定义 schema。多用于弱类型的编码方式,例如最常用的 JSON。用户的数据有固定或大致固定的格式,通过 source 定义出来。与当前实现的区别主要是运行时不再做数据的验证转换。该 schema 定义可用于 SQL validation;Flow Editor 字段选择框。

  3. Dynamic schema,用户使用强类型编解码方式,并通过 schemaId 提供其 Schema。例如 Protobuf 方式下,动态解析 pb 文件。该方法较为灵活但无法完全发挥强类型的性能优势。需要支持自动 merge schema,供 Flow Editor 使用。

  4. Static schema,用户将Schema 生成静态代码,作为插件添加到系统中。该方法性能最好但较为繁琐,适合性能要求高的场景。需要用户自行编写实现了编解码接口和 schema descriptor 接口的代码。针对特定的格式,我们可提供自动代码生成和编译打包工具/API。

Source 如何定义数据结构

当前,创建流或者表时,有3个部分与 Schema 相关:

create stream protoDemo (
  id int, 
  name string
  ) WITH (FORMAT="protobuf", DATASOURCE="protoDemo",SCHEMAID="helloworld.HelloReply"
  1. 流的数据结构定义,即()部分。该部分为可选,若放空则为 schemaless

  2. FORMAT: 即 serilization format,定义数据的解码方式。当前支持 JSON,BINARY,PROTOBUF。这个格式实际上跟 schema 是相对独立,不一定要强绑定。但是,有些格式例如 protobuf,本身是需要有个 schema的,即数据结构描述。

  3. SCHEMAID: 即数据本身的结构描述,区别于 1 流的结构描述。理想情况下,若用户使用强类型带 schema 的解码方式如 protobuf,则数据的 schema 应当能自动应用到流中,而无需用户再定义。

新的版本中将继续沿用这些定义方式,只是添加更多类型支持。

Stream schema

定义方法保持不变。实现上,schema 主要作为元数据使用,可辅助实现 SQL 语法验证,Flow editor 中的节点数据结构推断从而可在节点属性中做字段选择。数据校验为可选且建议关闭,因为校验性能损耗大。

仍然支持不定义 schema;若后续定义了 schemaId,则stream schema 可自动推断。

Format

仍然支持原有的类型。新的版本主要更新有:

  1. 添加更多解码类型,如 custom (自定义),delemited,avro

  2. 每种解码类型均支持静态方式。用户需实现编解码接口,并编译为 go 插件 so,提前通过 schema registry 注册。

Serilization Format

Schema Keys

Parse Schema Descriptor

json

optional

json schema (TBD)

binary

n/a

n/a

protobuf

required

proto

custom

required, provided by the extended code

optional, provided by the extended code

delimited (TBD)

n/a, must have stream schema def or the first line should have def

n/a

avro (TBD)

required

avro

Schema Registry

添加功能,允许注册静态 schema。即添加可选的 soFile 属性,用于指定静态解析代码。若存在 soFile,则使用插件系统载入 so 文件,进行解析;否则动态解析 proto 文件。

###
POST http://{{host}}/schemas/protobuf
Content-Type: application/json

{
  "name": "schema2",
  "file": "file:///C:/repos/go/src/github.com/lfedge/ekuiper/internal/schema/test/test2.proto",
  "soFile": "file:///C:/repos/go/src/github.com/lfedge/ekuiper/internal/schema/test/test2.so"
}

示例

Use static protobuf

已有文件 helloworld.proto

syntax = "proto3";

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

采用静态编解码的方式需要如下步骤:

  1. Generate go code for the pb file, check <https://developers.google.com/protocol-buffers/docs/reference/go-generated> for detail.

    protoc --go_opt=Mhelloworld.proto=com.main --go_out=. helloworld.proto
  2. Move the generated file helloworld.pb.go into the go project test. Rename the package to main.

  3. Create the wrapper file. For each message in the proto, implement 3 functions: Encode, Decode, GetXXX. Example:

    package main
    
    func (x *HelloRequest) Encode(d interface{}) ([]byte, error) {
      switch r := d.(type) {
      case map[string]interface{}:
          t, ok := r["name"]
          if ok {
              if v, ok := t.(string); ok {
                  x.Name = v
              } else {
                  return nil, fmt.Errorf("name is not string")
              }
          } else {
              // if required, return error
              fmt.Println("message is not found")
          }
          return proto.Marshal(x)
      default:
          return nil, fmt.Errorf("unsupported type %v, must be a map", d)
      }
    }
    
    func (x *HelloRequest) Decode(b []byte) (interface{}, error) {
      err := proto.Unmarshal(b, x)
      if err != nil {
      return nil, err
      }
      result := make(map[string]interface{}, 1)
      result["name"] = x.Name
      return result, nil
    }
    
    func GetHelloRequest() interface{} {
      return &HelloRequest{}
    }
  4. Build the project into a plugin so file. go build --buildmode=plugin -o helloworld.so ..

  5. Create schema with the .so file.

    ###
    POST http://{{host}}/schemas/protobuf
    Content-Type: application/json
    
    {
      "name": "helloworld",
      "file": "file:///tmp/helloworld.proto",
      "soFile": "file:///tmp/helloworld.so"
    }


  6. Create the stream to use the static schema.

    create stream protoDemo () WITH (FORMAT="protobuf", DATASOURCE="protoDemo",SCHEMAID="helloworld.HelloReply"


Use custom format

若数据的编码类型不在已支持类型之中,用户可实现编解码接口,并注册为 schema,调用时使用 custom 格式即可。

  1. 实现编解码。实现接口方法 Encode,Decode,GetXXX

  2. 实现数据结构描述接口(可选)。

  3. 按照插件编译为 so

  4. 注册到 schema registry

    ###
    POST http://{{host}}/schemas/custom
    Content-Type: application/json
    
    {
      "name": "custom1",
      "soFile": "file:///tmp/custom1.so"
    }
  5. 在source中使用。

    create stream customDemo () WITH (FORMAT="custom", DATASOURCE="protoDemo",SCHEMAID="custom1.Message"

Sink static schema

与 source 类似,需要支持高性能场景下 sink 的 static schema 插件方式。与 source 不同的是,在 sink 中,我们主要关心编码方法,不关心结构描述。使用方法同样是配置 formatschemaId

###
POST http://{{host}}/rules
Content-Type: application/json

{
  "id": "rule1",
  "sql": "SELECT * FROM demo",
  "actions": [{
    "mqtt": {
      "server": "tcp://syno.home:1883",
      "topic": "result/protobuf",
      "format": "custom",
      "schemaId": "custom1.Message",
      "sendSingle": true
    }
  }]
}


  • No labels