Skip to content
This repository has been archived by the owner on Dec 22, 2023. It is now read-only.

Commit

Permalink
Send event when JIT schema changed
Browse files Browse the repository at this point in the history
Ref. #199
  • Loading branch information
Ben Lei committed Nov 7, 2016
1 parent be7f1e5 commit 63e5cb1
Show file tree
Hide file tree
Showing 14 changed files with 355 additions and 227 deletions.
28 changes: 18 additions & 10 deletions pkg/server/handler/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/mitchellh/mapstructure"

"github.com/skygeario/skygear-server/pkg/server/asset"
pluginEvent "github.com/skygeario/skygear-server/pkg/server/plugin/event"
"github.com/skygeario/skygear-server/pkg/server/plugin/hook"
"github.com/skygeario/skygear-server/pkg/server/router"
"github.com/skygeario/skygear-server/pkg/server/skydb"
Expand Down Expand Up @@ -237,15 +238,16 @@ curl -X POST -H "Content-Type: application/json" \
EOF
*/
type RecordSaveHandler struct {
HookRegistry *hook.Registry `inject:"HookRegistry"`
AssetStore asset.Store `inject:"AssetStore"`
AccessModel skydb.AccessModel `inject:"AccessModel"`
Authenticator router.Processor `preprocessor:"authenticator"`
DBConn router.Processor `preprocessor:"dbconn"`
InjectUser router.Processor `preprocessor:"inject_user"`
InjectDB router.Processor `preprocessor:"inject_db"`
RequireUser router.Processor `preprocessor:"require_user"`
PluginReady router.Processor `preprocessor:"plugin"`
HookRegistry *hook.Registry `inject:"HookRegistry"`
AssetStore asset.Store `inject:"AssetStore"`
AccessModel skydb.AccessModel `inject:"AccessModel"`
EventSender pluginEvent.Sender `inject:"PluginEventSender"`
Authenticator router.Processor `preprocessor:"authenticator"`
DBConn router.Processor `preprocessor:"dbconn"`
InjectUser router.Processor `preprocessor:"inject_user"`
InjectDB router.Processor `preprocessor:"inject_db"`
RequireUser router.Processor `preprocessor:"require_user"`
PluginReady router.Processor `preprocessor:"plugin"`
preprocessors []router.Processor
}

Expand Down Expand Up @@ -344,8 +346,14 @@ func (h *RecordSaveHandler) Handle(payload *router.Payload, response *router.Res

results = append(results, result)
}

response.Result = results

if resp.SchemaUpdated && h.EventSender != nil {
err := sendSchemaChangedEvent(h.EventSender, payload.Database)
if err != nil {
log.WithField("err", err).Warn("Fail to send schema changed event")
}
}
}

type recordFetchPayload struct {
Expand Down
20 changes: 10 additions & 10 deletions pkg/server/handler/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,8 @@ type bogusFieldDatabase struct {

func (db bogusFieldDatabase) IsReadOnly() bool { return false }

func (db bogusFieldDatabase) Extend(recordType string, schema skydb.RecordSchema) error {
return nil
func (db bogusFieldDatabase) Extend(recordType string, schema skydb.RecordSchema) (bool, error) {
return false, nil
}

func (db bogusFieldDatabase) Get(id skydb.RecordID, record *skydb.Record) error {
Expand Down Expand Up @@ -653,9 +653,9 @@ type noExtendDatabase struct {

func (db *noExtendDatabase) IsReadOnly() bool { return false }

func (db *noExtendDatabase) Extend(recordType string, schema skydb.RecordSchema) error {
func (db *noExtendDatabase) Extend(recordType string, schema skydb.RecordSchema) (bool, error) {
db.calledExtend = true
return errors.New("You shalt not call Extend")
return false, errors.New("You shalt not call Extend")
}

func TestRecordSaveNoExtendIfRecordMalformed(t *testing.T) {
Expand Down Expand Up @@ -1251,8 +1251,8 @@ func (db *singleRecordDatabase) Query(query *skydb.Query) (*skydb.Rows, error) {
return skydb.NewRows(skydb.NewMemoryRows([]skydb.Record{db.record})), nil
}

func (db *singleRecordDatabase) Extend(recordType string, schema skydb.RecordSchema) error {
return nil
func (db *singleRecordDatabase) Extend(recordType string, schema skydb.RecordSchema) (bool, error) {
return false, nil
}

func TestRecordOwnerIDSerialization(t *testing.T) {
Expand Down Expand Up @@ -1579,8 +1579,8 @@ func (db *referencedRecordDatabase) Query(query *skydb.Query) (*skydb.Rows, erro
return skydb.NewRows(skydb.NewMemoryRows([]skydb.Record{db.note})), nil
}

func (db *referencedRecordDatabase) Extend(recordType string, schema skydb.RecordSchema) error {
return nil
func (db *referencedRecordDatabase) Extend(recordType string, schema skydb.RecordSchema) (bool, error) {
return false, nil
}

func TestRecordQueryWithEagerLoad(t *testing.T) {
Expand Down Expand Up @@ -1800,8 +1800,8 @@ type erroneousDB struct {

func (db erroneousDB) IsReadOnly() bool { return false }

func (db erroneousDB) Extend(string, skydb.RecordSchema) error {
return nil
func (db erroneousDB) Extend(string, skydb.RecordSchema) (bool, error) {
return false, nil
}

func (db erroneousDB) Get(skydb.RecordID, *skydb.Record) error {
Expand Down
25 changes: 19 additions & 6 deletions pkg/server/handler/recordutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type recordModifyRequest struct {

type recordModifyResponse struct {
ErrMap map[skydb.RecordID]skyerr.Error
SchemaUpdated bool
SavedRecords []*skydb.Record
DeletedRecordIDs []skydb.RecordID
}
Expand Down Expand Up @@ -220,7 +221,8 @@ func recordSaveHandler(req *recordModifyRequest, resp *recordModifyResponse) sky
}

// derive and extend record schema
if err := extendRecordSchema(db, records); err != nil {
schemaExtended, err := extendRecordSchema(db, records)
if err != nil {
log.WithField("err", err).Errorln("failed to migrate record schema")
if myerr, ok := err.(skyerr.Error); ok {
return myerr
Expand Down Expand Up @@ -278,6 +280,8 @@ func recordSaveHandler(req *recordModifyRequest, resp *recordModifyResponse) sky
}

resp.SavedRecords = records
resp.SchemaUpdated = schemaExtended

return nil
}

Expand Down Expand Up @@ -352,7 +356,7 @@ func deriveDeltaRecord(dst, base, delta *skydb.Record) {
}
}

func extendRecordSchema(db skydb.Database, records []*skydb.Record) error {
func extendRecordSchema(db skydb.Database, records []*skydb.Record) (bool, error) {
recordSchemaMergerMap := map[string]schemaMerger{}
for _, record := range records {
recordType := record.ID.Type
Expand All @@ -369,18 +373,27 @@ func extendRecordSchema(db skydb.Database, records []*skydb.Record) error {
recordSchemaMergerMap[recordType] = merger
}

extended := false
for recordType, merger := range recordSchemaMergerMap {
schema, err := merger.Schema()
if err != nil {
return err
return false, err
}

if err = db.Extend(recordType, schema); err != nil {
return err
schemaExtended, err := db.Extend(recordType, schema)
if err != nil {
return false, err
}
if schemaExtended {
log.
WithField("type", recordType).
WithField("schema", schema).
Info("Schema Extended")
extended = true
}
}

return nil
return extended, nil
}

func recordDeleteHandler(req *recordModifyRequest, resp *recordModifyResponse) skyerr.Error {
Expand Down
105 changes: 23 additions & 82 deletions pkg/server/handler/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package handler

import (
"encoding/json"
"sort"
"strings"

"github.com/mitchellh/mapstructure"
Expand All @@ -30,46 +28,6 @@ type schemaResponse struct {
Schemas map[string]schemaFieldList `json:"record_types"`
}

type schemaFieldList struct {
Fields []schemaField `mapstructure:"fields" json:"fields"`
}

func (s schemaFieldList) Len() int {
return len(s.Fields)
}

func (s schemaFieldList) Swap(i, j int) {
s.Fields[i], s.Fields[j] = s.Fields[j], s.Fields[i]
}

func (s schemaFieldList) Less(i, j int) bool {
return strings.Compare(s.Fields[i].Name, s.Fields[j].Name) < 0
}

type schemaField struct {
Name string `mapstructure:"name" json:"name"`
TypeName string `mapstructure:"type" json:"type"`
}

func (resp *schemaResponse) Encode(data map[string]skydb.RecordSchema) {
resp.Schemas = make(map[string]schemaFieldList)
for recordType, schema := range data {
fieldList := schemaFieldList{}
for fieldName, val := range schema {
if strings.HasPrefix(fieldName, "_") {
continue
}

fieldList.Fields = append(fieldList.Fields, schemaField{
Name: fieldName,
TypeName: val.ToSimpleName(),
})
}
sort.Sort(fieldList)
resp.Schemas[recordType] = fieldList
}
}

/*
SchemaRenameHandler handles the action of renaming column
curl -X POST -H "Content-Type: application/json" \
Expand Down Expand Up @@ -152,31 +110,26 @@ func (h *SchemaRenameHandler) Handle(rpayload *router.Payload, response *router.
}

db := rpayload.Database

if err := db.RenameSchema(payload.RecordType, payload.OldName, payload.NewName); err != nil {
response.Err = skyerr.NewError(skyerr.ResourceNotFound, err.Error())
return
}

results, err := db.GetRecordSchemas()
schemas, err := db.GetRecordSchemas()
if err != nil {
response.Err = skyerr.NewError(skyerr.UnexpectedError, err.Error())
return
}

resp := &schemaResponse{}
resp.Encode(results)
response.Result = &schemaResponse{
Schemas: encodeRecordSchemas(schemas),
}

if h.EventSender != nil {
encodedSchema, err := json.Marshal(resp)
err := sendSchemaChangedEvent(h.EventSender, db)
if err != nil {
log.WithField("err", err).Warn("Unable to encode schema")
} else {
h.EventSender.Send("schema-changed", encodedSchema, true)
log.WithField("err", err).Warn("Fail to send schema changed event")
}
}

response.Result = resp
}

/*
Expand Down Expand Up @@ -253,31 +206,26 @@ func (h *SchemaDeleteHandler) Handle(rpayload *router.Payload, response *router.
}

db := rpayload.Database

if err := db.DeleteSchema(payload.RecordType, payload.ColumnName); err != nil {
response.Err = skyerr.NewError(skyerr.ResourceNotFound, err.Error())
return
}

results, err := db.GetRecordSchemas()
schemas, err := db.GetRecordSchemas()
if err != nil {
response.Err = skyerr.NewError(skyerr.UnexpectedError, err.Error())
return
}

resp := &schemaResponse{}
resp.Encode(results)
response.Result = &schemaResponse{
Schemas: encodeRecordSchemas(schemas),
}

if h.EventSender != nil {
encodedSchema, err := json.Marshal(resp)
err := sendSchemaChangedEvent(h.EventSender, db)
if err != nil {
log.WithField("err", err).Warn("Unable to encode schema")
} else {
h.EventSender.Send("schema-changed", encodedSchema, true)
log.WithField("err", err).Warn("Fail to send schema changed event")
}
}

response.Result = resp
}

/*
Expand Down Expand Up @@ -369,34 +317,29 @@ func (h *SchemaCreateHandler) Handle(rpayload *router.Payload, response *router.
}

db := rpayload.Database

for recordType, recordSchema := range payload.Schemas {
err := db.Extend(recordType, recordSchema)
_, err := db.Extend(recordType, recordSchema)
if err != nil {
response.Err = skyerr.NewError(skyerr.IncompatibleSchema, err.Error())
return
}
}

results, err := db.GetRecordSchemas()
schemas, err := db.GetRecordSchemas()
if err != nil {
response.Err = skyerr.NewError(skyerr.UnexpectedError, err.Error())
return
}

resp := &schemaResponse{}
resp.Encode(results)
response.Result = &schemaResponse{
Schemas: encodeRecordSchemas(schemas),
}

if h.EventSender != nil {
encodedSchema, err := json.Marshal(resp)
err := sendSchemaChangedEvent(h.EventSender, db)
if err != nil {
log.WithField("err", err).Warn("Unable to encode schema")
} else {
h.EventSender.Send("schema-changed", encodedSchema, true)
log.WithField("err", err).Warn("Fail to send schema changed event")
}
}

response.Result = resp
}

/*
Expand Down Expand Up @@ -430,17 +373,15 @@ func (h *SchemaFetchHandler) GetPreprocessors() []router.Processor {

func (h *SchemaFetchHandler) Handle(rpayload *router.Payload, response *router.Response) {
db := rpayload.Database

results, err := db.GetRecordSchemas()
schemas, err := db.GetRecordSchemas()
if err != nil {
response.Err = skyerr.NewError(skyerr.UnexpectedError, err.Error())
return
}

resp := &schemaResponse{}
resp.Encode(results)

response.Result = resp
response.Result = &schemaResponse{
Schemas: encodeRecordSchemas(schemas),
}
}

/*
Expand Down
Loading

0 comments on commit 63e5cb1

Please sign in to comment.