Skip to content

Commit

Permalink
feat(schema): implement schema registry (#1271)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying authored May 17, 2022
1 parent 7dc70d7 commit 58bcba7
Show file tree
Hide file tree
Showing 8 changed files with 549 additions and 0 deletions.
25 changes: 25 additions & 0 deletions internal/pkg/def/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2022 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package def

type SchemaType string

const (
PROTOBUF SchemaType = "protobuf"
)

var SchemaTypes = []SchemaType{
PROTOBUF,
}
166 changes: 166 additions & 0 deletions internal/schema/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2022 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schema

import (
"fmt"
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/pkg/def"
"github.com/lf-edge/ekuiper/internal/pkg/httpx"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
)

// Initialize in the server startup
var registry *Registry

// Registry is a global registry for schemas
// It stores the schema ids and the ref to its file content in memory
// The schema definition is stored in the file system and will only be loaded once used
type Registry struct {
sync.RWMutex
// The map of schema files for all types
schemas map[def.SchemaType]map[string]string
}

// Registry provide the method to add, update, get and parse and delete schemas

// InitRegistry initialize the registry, only called once by the server
func InitRegistry() error {
registry = &Registry{
schemas: make(map[def.SchemaType]map[string]string, len(def.SchemaTypes)),
}
etcDir, err := conf.GetConfLoc()
if err != nil {
return fmt.Errorf("cannot find etc folder: %s", err)
}
for _, schemaType := range def.SchemaTypes {
schemaDir := filepath.Join(etcDir, string(schemaType))
var newSchemas map[string]string
files, err := ioutil.ReadDir(schemaDir)
if err != nil {
conf.Log.Warnf("cannot read schema directory: %s", err)
newSchemas = make(map[string]string)
} else {
newSchemas = make(map[string]string, len(files))
for _, file := range files {
fileName := filepath.Base(file.Name())
schemaId := strings.TrimSuffix(fileName, filepath.Ext(fileName))
newSchemas[schemaId] = filepath.Join(schemaDir, file.Name())
}
}
registry.schemas[schemaType] = newSchemas
}
return nil
}

func GetAllForType(schemaType def.SchemaType) ([]string, error) {
registry.RLock()
defer registry.RUnlock()
if _, ok := registry.schemas[schemaType]; !ok {
return nil, fmt.Errorf("schema type %s not found", schemaType)
}
result := make([]string, 0, len(registry.schemas[schemaType]))
for k := range registry.schemas[schemaType] {
result = append(result, k)
}
return result, nil
}

func Register(info *Info) error {
if _, ok := registry.schemas[info.Type]; !ok {
return fmt.Errorf("schema type %s not found", info.Type)
}
if _, ok := registry.schemas[info.Type][info.Name]; ok {
return fmt.Errorf("schema %s.%s already registered", info.Type, info.Name)
}
return CreateOrUpdateSchema(info)
}

func CreateOrUpdateSchema(info *Info) error {
if _, ok := registry.schemas[info.Type]; !ok {
return fmt.Errorf("schema type %s not found", info.Type)
}
etcDir, _ := conf.GetConfLoc()
etcDir = filepath.Join(etcDir, string(info.Type))
if err := os.MkdirAll(etcDir, os.ModePerm); err != nil {
return err
}
schemaFile := filepath.Join(etcDir, info.Name+schemaExt[info.Type])
if _, err := os.Stat(schemaFile); os.IsNotExist(err) {
file, err := os.Create(schemaFile)
if err != nil {
return err
}
defer file.Close()
}
if info.Content != "" {
err := os.WriteFile(schemaFile, []byte(info.Content), 0666)
if err != nil {
return err
}
} else {
err := httpx.DownloadFile(schemaFile, info.FilePath)
if err != nil {
return err
}
}

registry.schemas[info.Type][info.Name] = schemaFile
return nil
}

func GetSchemaContent(schemaType def.SchemaType, name string) (*Info, error) {
registry.RLock()
defer registry.RUnlock()
if _, ok := registry.schemas[schemaType]; !ok {
return nil, fmt.Errorf("schema type %s not found", schemaType)
}
if _, ok := registry.schemas[schemaType][name]; !ok {
return nil, fmt.Errorf("schema %s.%s not found", schemaType, name)
}
schemaFile := registry.schemas[schemaType][name]
content, err := ioutil.ReadFile(schemaFile)
if err != nil {
return nil, fmt.Errorf("cannot read schema file %s: %s", schemaFile, err)
}
return &Info{
Type: schemaType,
Name: name,
Content: string(content),
FilePath: schemaFile,
}, nil
}

func DeleteSchema(schemaType def.SchemaType, name string) error {
registry.Lock()
defer registry.Unlock()
if _, ok := registry.schemas[schemaType]; !ok {
return fmt.Errorf("schema type %s not found", schemaType)
}
if _, ok := registry.schemas[schemaType][name]; !ok {
return fmt.Errorf("schema %s.%s not found", schemaType, name)
}
schemaFile := registry.schemas[schemaType][name]
err := os.Remove(schemaFile)
if err != nil {
conf.Log.Errorf("cannot delete schema file %s: %s", schemaFile, err)
}
delete(registry.schemas[schemaType], name)
return nil
}
189 changes: 189 additions & 0 deletions internal/schema/registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2022 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schema

import (
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/testx"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"testing"
)

func TestRegistry(t *testing.T) {
testx.InitEnv()
// Move test schema file to etc dir
etcDir, err := conf.GetConfLoc()
if err != nil {
t.Fatal(err)
}
etcDir = filepath.Join(etcDir, "protobuf")
err = os.MkdirAll(etcDir, os.ModePerm)
if err != nil {
t.Fatal(err)
}
//Copy init.proto
bytesRead, err := ioutil.ReadFile("test/init.proto")
if err != nil {
t.Fatal(err)
}
err = ioutil.WriteFile(filepath.Join(etcDir, "init.proto"), bytesRead, 0755)
if err != nil {
t.Fatal(err)
}
defer func() {
err = os.RemoveAll(etcDir)
if err != nil {
t.Fatal(err)
}
}()
err = InitRegistry()
if err != nil {
t.Errorf("InitRegistry error: %v", err)
return
}
s := httptest.NewServer(
http.FileServer(http.Dir("test")),
)
defer s.Close()
endpoint := s.URL
// Create 1 by file
schema1 := &Info{
Name: "test1",
Type: "protobuf",
FilePath: endpoint + "/test1.proto",
}
err = Register(schema1)
if err != nil {
t.Errorf("Register schema1 error: %v", err)
return
}
// Get 1
expectedSchema := &Info{
Type: "protobuf",
Name: "test1",
Content: "syntax = \"proto3\";message Person {string name = 1;int32 id = 2;string email = 3;}",
FilePath: filepath.Join(etcDir, "test1.proto"),
}
gottenSchema, err := GetSchemaContent("protobuf", "test1")
if !reflect.DeepEqual(gottenSchema, expectedSchema) {
t.Errorf("Get test1 unmatch: Expect\n%v\nbut got\n%v", *expectedSchema, *gottenSchema)
return
}
// Create 2 by content
schema2 := &Info{
Name: "test2",
Type: "protobuf",
Content: "message Book{\n required string name = 1;}",
}
err = Register(schema2)
if err != nil {
t.Errorf("Register schema2 error: %v", err)
return
}
// Update 2 by file
updatedSchema2 := &Info{
Name: "test2",
Type: "protobuf",
FilePath: endpoint + "/test2.proto",
}
err = CreateOrUpdateSchema(updatedSchema2)
if err != nil {
t.Errorf("Update Schema2 error: %v", err)
return
}
// List & check file
regSchemas, err := GetAllForType("protobuf")
expectedSchemas := []string{
"init", "test1", "test2",
}
if !reflect.DeepEqual(len(regSchemas), len(expectedSchemas)) {
t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
return
}
checkFile(etcDir, expectedSchemas, t)
// Delete 2
err = DeleteSchema("protobuf", "test2")
if err != nil {
t.Errorf("Delete Schema2 error: %v", err)
return
}
// Update 1 by content
updatedSchema1 := &Info{
Name: "test1",
Type: "protobuf",
Content: "message Person{required string name = 1;required int32 id = 2;optional string email = 3;}",
}
err = CreateOrUpdateSchema(updatedSchema1)
if err != nil {
t.Errorf("Update Schema1 error: %v", err)
return
}
// List & check file
regSchemas, err = GetAllForType("protobuf")
expectedSchemas = []string{
"init", "test1",
}
if !reflect.DeepEqual(len(regSchemas), len(expectedSchemas)) {
t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
return
}
checkFile(etcDir, expectedSchemas, t)
// Delete 1
err = DeleteSchema("protobuf", "test1")
if err != nil {
t.Errorf("Delete Schema1 error: %v", err)
return
}
// List & check file
regSchemas, err = GetAllForType("protobuf")
expectedSchemas = []string{
"init",
}
if !reflect.DeepEqual(regSchemas, expectedSchemas) {
t.Errorf("Expect\n%v\nbut got\n%v", expectedSchemas, regSchemas)
return
}
checkFile(etcDir, expectedSchemas, t)
}

func checkFile(etcDir string, schemas []string, t *testing.T) {
files, err := ioutil.ReadDir(etcDir)
if err != nil {
t.Fatal(err)
}
if len(files) != len(schemas) {
t.Errorf("Expect %d files but got %d", len(schemas), len(files))
return
}
for _, file := range files {
fileName := filepath.Base(file.Name())
found := false
for _, schema := range schemas {
if fileName == schema+".proto" {
found = true
break
}
}
if !found {
t.Errorf("Expect %s but got %s", schemas, fileName)
return
}
}
}
Loading

0 comments on commit 58bcba7

Please sign in to comment.