Skip to content

Commit

Permalink
review changes;
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Kamieth committed Sep 27, 2024
1 parent ac30277 commit c3f5a2a
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 71 deletions.
2 changes: 2 additions & 0 deletions pkg/encoding/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func Valid(data []byte) bool {
return json.Valid(data)
}

type RawMessage = json.RawMessage

type Marshaler interface {
json.Marshaler
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mdlsub/config_postprocessor_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func SubscriberConfigPostProcessor(config cfg.GosoConf) (bool, error) {
var inputPostProcessor SubscriberInputConfigPostProcessor
var outputPostProcessor SubscriberOutputConfigPostProcessor

settings := unmarshallSettings(config)
settings := unmarshalSettings(config)

for name, subscriberSettings := range settings.Subscribers {
subscriberKey := GetSubscriberConfigKey(name)
Expand Down
52 changes: 26 additions & 26 deletions pkg/mdlsub/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ import (
)

type FixtureSettings struct {
DataSetDbName string `cfg:"data_set_db_name"`
Host string `cfg:"host"`
Path string `cfg:"path"`
DatasetName string `cfg:"dataset_name"`
Host string `cfg:"host"`
Path string `cfg:"path"`
}

type FetchResult struct {
type fetchResult struct {
spec *ModelSpecification
contentType string
data []byte
data *FetchData
}

type FetchData struct {
Data json.RawMessage `json:"data"`
}

var unmarhaller = map[string]func(data []byte, v interface{}) error{
Expand All @@ -38,7 +42,7 @@ func FixtureSetFactory(transformerFactoryMap TransformerMapTypeVersionFactories)
var core SubscriberCore
var sets []fixtures.FixtureSet

settings := unmarshallSettings(config)
settings := unmarshalSettings(config)

fixtureSettings := &FixtureSettings{}
config.UnmarshalKey("fixtures.loaders.api", fixtureSettings)
Expand Down Expand Up @@ -80,14 +84,14 @@ func NewFixtureSetWithInterfaces(logger log.Logger, source SubscriberModel, core
func (f FixtureSet) Write(ctx context.Context) error {
var ok bool
var err error
var res *FetchResult
var res *fetchResult
var transformer ModelTransformer
var unmarshal func(data []byte, v interface{}) error
var output Output
var items []interface{}
var model Model

if res, err = f.Fetch(ctx); err != nil {
if res, err = f.fetch(ctx); err != nil {
return fmt.Errorf("failed to fetch model %s: %w", f.source.String(), err)
}

Expand All @@ -102,7 +106,7 @@ func (f FixtureSet) Write(ctx context.Context) error {
return fmt.Errorf("unknown content type: %s", res.contentType)
}

if err = unmarshal(res.data, slice); err != nil {
if err = unmarshal(res.data.Data, slice); err != nil {
return fmt.Errorf("failed to unmarshal fetched data: %w", err)
}

Expand Down Expand Up @@ -131,12 +135,12 @@ func (f FixtureSet) Write(ctx context.Context) error {
return nil
}

func (f FixtureSet) Fetch(ctx context.Context) (*FetchResult, error) {
func (f FixtureSet) fetch(ctx context.Context) (*fetchResult, error) {
var err error
var hostPath string
var u *url.URL
var resp *resty.Response
var versionInt int
var version int

if hostPath, err = url.JoinPath(f.settings.Host, f.settings.Path); err != nil {
return nil, fmt.Errorf("failed to join host path: %w", err)
Expand All @@ -146,12 +150,18 @@ func (f FixtureSet) Fetch(ctx context.Context) (*FetchResult, error) {
return nil, fmt.Errorf("failed to parse host url %s: %w", hostPath, err)
}

if version, err = f.core.GetLatestModelIdVersion(f.source.ModelId); err != nil {
return nil, fmt.Errorf("failed to get latest model version: %w", err)
}

query := u.Query()
query.Add("data_set", f.settings.DataSetDbName)
query.Add("dataset_name", f.settings.DatasetName)
query.Add("model_id", f.source.String())
query.Add("version", strconv.Itoa(version))
u.RawQuery = query.Encode()

req := f.httpClient.R().SetContext(ctx)
data := &FetchData{}
req := f.httpClient.R().SetContext(ctx).SetResult(data)
f.logger.Info("%s: fetching fixture data from %s", f.source.String(), u.String())

if resp, err = req.Get(u.String()); err != nil {
Expand All @@ -162,30 +172,20 @@ func (f FixtureSet) Fetch(ctx context.Context) (*FetchResult, error) {
return nil, fmt.Errorf("error on executing http request data, got status %d: %s", resp.StatusCode(), resp.String())
}

version := resp.Header().Get("Version")

if version == "" {
return nil, fmt.Errorf("version not found in response header")
}

if versionInt, err = strconv.Atoi(version); err != nil {
return nil, fmt.Errorf("can not convert version string %s to int: %w", version, err)
}

contentType := resp.Header().Get(headers.ContentType)

if contentType == "" {
return nil, fmt.Errorf("content type not found in response header")
}

res := &FetchResult{
res := &fetchResult{
spec: &ModelSpecification{
ModelId: f.source.String(),
CrudType: "create",
Version: versionInt,
Version: version,
},
contentType: contentType,
data: resp.Body(),
data: data,
}

return res, nil
Expand Down
55 changes: 16 additions & 39 deletions pkg/mdlsub/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"github.com/go-resty/resty/v2"
"github.com/jarcoal/httpmock"
"github.com/justtrackio/gosoline/pkg/fixtures"
mocks2 "github.com/justtrackio/gosoline/pkg/log/mocks"
loggerMocks "github.com/justtrackio/gosoline/pkg/log/mocks"
"github.com/justtrackio/gosoline/pkg/mdl"
"github.com/justtrackio/gosoline/pkg/mdlsub"
"github.com/justtrackio/gosoline/pkg/mdlsub/mocks"
mdlsubMocks "github.com/justtrackio/gosoline/pkg/mdlsub/mocks"
"github.com/stretchr/testify/suite"
)

Expand All @@ -22,17 +22,18 @@ func TestFixtureSetTestSuite(t *testing.T) {
type FixtureSetTestSuite struct {
suite.Suite

source mdlsub.SubscriberModel
spec *mdlsub.ModelSpecification
core *mocks.SubscriberCore
output *mocks.Output
core *mdlsubMocks.SubscriberCore
output *mdlsubMocks.Output
fixtureSet fixtures.FixtureSet
}

func (s *FixtureSetTestSuite) SetupTest() {
s.core = new(mocks.SubscriberCore)
s.output = new(mocks.Output)
s.core = new(mdlsubMocks.SubscriberCore)
s.output = new(mdlsubMocks.Output)

source := mdlsub.SubscriberModel{
s.source = mdlsub.SubscriberModel{
ModelId: mdl.ModelId{
Project: "justtrack",
Family: "gosoline",
Expand All @@ -48,16 +49,16 @@ func (s *FixtureSetTestSuite) SetupTest() {
}

settings := &mdlsub.FixtureSettings{
DataSetDbName: "my_test_set",
Host: "http://localhost:8080",
Path: "path/for/mdlsub",
DatasetName: "my_test_set",
Host: "http://localhost:8080",
Path: "path/for/mdlsub",
}

client := resty.New()
httpmock.ActivateNonDefault(client.GetClient())

logger := mocks2.NewLoggerMockedAll()
s.fixtureSet = mdlsub.NewFixtureSetWithInterfaces(logger, source, s.core, settings, client)
logger := loggerMocks.NewLoggerMockedAll()
s.fixtureSet = mdlsub.NewFixtureSetWithInterfaces(logger, s.source, s.core, settings, client)
}

func (s *FixtureSetTestSuite) TearDownTest() {
Expand All @@ -73,14 +74,14 @@ func (s *FixtureSetTestSuite) TestSuccess() {
s.output.EXPECT().Persist(ctx, TestModel{Id: 1}, "create").Return(nil)
s.output.EXPECT().Persist(ctx, TestModel{Id: 2}, "create").Return(nil)

s.core.EXPECT().GetLatestModelIdVersion(s.source.ModelId).Return(1, nil)
s.core.EXPECT().GetTransformer(s.spec).Return(TestTransformer{}, nil)
s.core.EXPECT().GetOutput(s.spec).Return(s.output, nil)

httpmock.RegisterResponder("GET", "http://localhost:8080/path/for/mdlsub?data_set=my_test_set&model_id=justtrack.gosoline.mdlsub.testModel",
httpmock.RegisterResponder("GET", "http://localhost:8080/path/for/mdlsub?dataset_name=my_test_set&model_id=justtrack.gosoline.mdlsub.testModel&version=1",
func(request *http.Request) (*http.Response, error) {
resp := httpmock.NewStringResponse(200, `[{"id": 1}, {"id": 2}]`)
resp := httpmock.NewStringResponse(200, `{"data":[{"id":1},{"id":2}]}`)
resp.Header.Add("Content-Type", "application/json")
resp.Header.Add("Version", "1")

return resp, nil
},
Expand All @@ -89,27 +90,3 @@ func (s *FixtureSetTestSuite) TestSuccess() {
err := s.fixtureSet.Write(ctx)
s.NoError(err)
}

type TestInput struct {
Id int `json:"id"`
}

type TestModel struct {
Id int `json:"id"`
}

type TestTransformer struct{}

func (t TestTransformer) GetInput() interface{} {
return &TestInput{}
}

func (t TestTransformer) Transform(ctx context.Context, inp interface{}) (out mdlsub.Model, err error) {
mdl := inp.(*TestInput)

return TestModel{mdl.Id}, nil
}

func (m TestModel) GetId() interface{} {
return m.Id
}
58 changes: 58 additions & 0 deletions pkg/mdlsub/mocks/SubscriberCore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mdlsub/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type SubscriberSettings struct {
TargetModel SubscriberModel `cfg:"target"`
}

func unmarshallSettings(config cfg.Config) *Settings {
func unmarshalSettings(config cfg.Config) *Settings {
settings := Settings{
Subscribers: make(map[string]*SubscriberSettings),
}
Expand Down
Loading

0 comments on commit c3f5a2a

Please sign in to comment.