Skip to content

Commit

Permalink
Merge pull request #2098 from keboola/fix-stream-acl-tokens
Browse files Browse the repository at this point in the history
fix: Add ACL for CUD stream API endpoints
  • Loading branch information
Matovidlo authored Oct 23, 2024
2 parents cded779 + 2d0ad4f commit 73b1cc0
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 17 deletions.
16 changes: 16 additions & 0 deletions build/ci/projects.json
Original file line number Diff line number Diff line change
Expand Up @@ -430,5 +430,21 @@
"backend": "snowflake",
"token": "$TEST_KBC_PROJECT_20474_TOKEN",
"legacyTransformation": true
},
{
"host": "connection.keboola.com",
"project": 10166,
"stagingStorage": "s3",
"backend": "snowflake",
"token": "$TEST_KBC_PROJECT_10166_TOKEN",
"isGuest": true
},
{
"host": "connection.keboola.com",
"project": 10169,
"stagingStorage": "s3",
"backend": "snowflake",
"token": "$TEST_KBC_PROJECT_10169_TOKEN",
"isGuest": true
}
]
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/jpillora/longestcommon v0.0.0-20161227235612-adb9d91ee629
github.com/json-iterator/go v1.1.12
github.com/keboola/go-client v1.27.0
github.com/keboola/go-utils v1.1.0
github.com/keboola/go-utils v1.2.0
github.com/klauspost/compress v1.17.10
github.com/klauspost/pgzip v1.2.6
github.com/kylelemons/godebug v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ github.com/keboola/go-mockoidc v0.0.0-20240405064136-5229d2b53db6 h1:HcvX1VQkiav
github.com/keboola/go-mockoidc v0.0.0-20240405064136-5229d2b53db6/go.mod h1:eDjgYHYDJbPLBLsyZ6qRaugP0mX8vePOhZ5id1fdzJw=
github.com/keboola/go-oauth2-proxy/v7 v7.6.1-0.20240418143152-9d00aaa29562 h1:EiwSnkbGt2i6XxvjDMrWx6/bGlQjVs+yq1mDJ5b3U1U=
github.com/keboola/go-oauth2-proxy/v7 v7.6.1-0.20240418143152-9d00aaa29562/go.mod h1:uPrZkzwsuFyIPP04hIt6TG2KvWujglvkOnUUnQJyIdw=
github.com/keboola/go-utils v1.1.0 h1:2tJikVr1kESR88qeGy/Vtmendw7TXB1UrJKLfPKceSk=
github.com/keboola/go-utils v1.1.0/go.mod h1:4YVC2/V0QwgHqxtch8JAVDNVI1aINF2arJ7sh6TO1GY=
github.com/keboola/go-utils v1.2.0 h1:mz12Eo+/XW+V0qcEGN4mkXQjrFOdxy+muWe0hLj2n/c=
github.com/keboola/go-utils v1.2.0/go.mod h1:4YVC2/V0QwgHqxtch8JAVDNVI1aINF2arJ7sh6TO1GY=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
Expand Down
7 changes: 7 additions & 0 deletions internal/pkg/service/stream/api/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/config"
definitionRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

const (
adminRole = "admin"
)

type service struct {
Expand All @@ -24,6 +29,7 @@ type service struct {
locks *distlock.Provider
definition *definitionRepo.Repository
mapper *mapper.Mapper
adminError error
}

func New(d dependencies.APIScope, cfg config.Config) api.Service {
Expand All @@ -35,6 +41,7 @@ func New(d dependencies.APIScope, cfg config.Config) api.Service {
locks: d.DistributedLockProvider(),
definition: d.DefinitionRepository(),
mapper: mapper.New(d, cfg),
adminError: errors.New("only admin token can do write operations on streams"),
}
}

Expand Down
42 changes: 42 additions & 0 deletions internal/pkg/service/stream/api/service/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (

//nolint:dupl // CreateSource method is similar
func (s *service) CreateSink(ctx context.Context, d dependencies.SourceRequestScope, payload *api.CreateSinkPayload) (*api.Task, error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

// Create entity
sink, err := s.mapper.NewSinkEntity(d.SourceKey(), payload)
if err != nil {
Expand Down Expand Up @@ -114,6 +120,12 @@ func (s *service) ListDeletedSinks(ctx context.Context, scope dependencies.Sourc
}

func (s *service) UpdateSink(ctx context.Context, d dependencies.SinkRequestScope, payload *api.UpdateSinkPayload) (*api.Task, error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

// Get the change description
var changeDesc string
if payload.ChangeDescription == nil {
Expand Down Expand Up @@ -172,6 +184,12 @@ func (s *service) UpdateSink(ctx context.Context, d dependencies.SinkRequestScop
}

func (s *service) DeleteSink(ctx context.Context, d dependencies.SinkRequestScope, _ *api.DeleteSinkPayload) (*api.Task, error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

// Quick check before the task
if err := s.sinkMustExist(ctx, d.SinkKey()); err != nil {
return nil, err
Expand Down Expand Up @@ -209,6 +227,12 @@ func (s *service) GetSinkSettings(ctx context.Context, d dependencies.SinkReques
}

func (s *service) UpdateSinkSettings(ctx context.Context, d dependencies.SinkRequestScope, payload *api.UpdateSinkSettingsPayload) (*api.Task, error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

// Quick check before the task
if err := s.sinkMustExist(ctx, d.SinkKey()); err != nil {
return nil, err
Expand Down Expand Up @@ -328,6 +352,12 @@ func (s *service) SinkStatisticsFiles(ctx context.Context, d dependencies.SinkRe
}

func (s *service) SinkStatisticsClear(ctx context.Context, d dependencies.SinkRequestScope, payload *api.SinkStatisticsClearPayload) (err error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return svcerrors.NewForbiddenError(s.adminError)
}

if err := s.sinkMustExist(ctx, d.SinkKey()); err != nil {
return err
}
Expand All @@ -336,6 +366,12 @@ func (s *service) SinkStatisticsClear(ctx context.Context, d dependencies.SinkRe
}

func (s *service) DisableSink(ctx context.Context, d dependencies.SinkRequestScope, payload *api.DisableSinkPayload) (res *api.Task, err error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

if err := s.sinkMustExist(ctx, d.SinkKey()); err != nil {
return nil, err
}
Expand Down Expand Up @@ -364,6 +400,12 @@ func (s *service) DisableSink(ctx context.Context, d dependencies.SinkRequestSco
}

func (s *service) EnableSink(ctx context.Context, d dependencies.SinkRequestScope, payload *api.EnableSinkPayload) (res *api.Task, err error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

if err := s.sinkMustExist(ctx, d.SinkKey()); err != nil {
return nil, err
}
Expand Down
37 changes: 37 additions & 0 deletions internal/pkg/service/stream/api/service/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/keboola/keboola-as-code/internal/pkg/log"
svcerrors "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/iterator"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/task"
api "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream"
Expand All @@ -19,6 +20,12 @@ import (

//nolint:dupl // CreateSink method is similar
func (s *service) CreateSource(ctx context.Context, d dependencies.BranchRequestScope, payload *api.CreateSourcePayload) (*api.Task, error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

// Create entity
source, err := s.mapper.NewSourceEntity(d.BranchKey(), payload)
if err != nil {
Expand Down Expand Up @@ -83,6 +90,12 @@ func (s *service) CreateSource(ctx context.Context, d dependencies.BranchRequest
}

func (s *service) UpdateSource(ctx context.Context, d dependencies.SourceRequestScope, payload *api.UpdateSourcePayload) (*api.Task, error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

// Get the change description
var changeDesc string
if payload.ChangeDescription == nil {
Expand Down Expand Up @@ -157,6 +170,12 @@ func (s *service) GetSource(ctx context.Context, d dependencies.SourceRequestSco
}

func (s *service) DeleteSource(ctx context.Context, d dependencies.SourceRequestScope, _ *api.DeleteSourcePayload) (*api.Task, error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

// Quick check before the task
if err := s.sourceMustExist(ctx, d.SourceKey()); err != nil {
return nil, err
Expand Down Expand Up @@ -224,6 +243,12 @@ func (s *service) GetSourceSettings(ctx context.Context, d dependencies.SourceRe
}

func (s *service) UpdateSourceSettings(ctx context.Context, d dependencies.SourceRequestScope, payload *api.UpdateSourceSettingsPayload) (*api.Task, error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

// Quick check before the task
if err := s.sourceMustExist(ctx, d.SourceKey()); err != nil {
return nil, err
Expand Down Expand Up @@ -311,6 +336,12 @@ func (s *service) SourceStatisticsClear(ctx context.Context, d dependencies.Sour
}

func (s *service) DisableSource(ctx context.Context, d dependencies.SourceRequestScope, payload *api.DisableSourcePayload) (res *api.Task, err error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

if err := s.sourceMustExist(ctx, d.SourceKey()); err != nil {
return nil, err
}
Expand Down Expand Up @@ -339,6 +370,12 @@ func (s *service) DisableSource(ctx context.Context, d dependencies.SourceReques
}

func (s *service) EnableSource(ctx context.Context, d dependencies.SourceRequestScope, payload *api.EnableSourcePayload) (res *api.Task, err error) {
// If user is not admin deny access for write
token := d.StorageAPIToken()
if token.Admin == nil || token.Admin.Role != adminRole {
return nil, svcerrors.NewForbiddenError(s.adminError)
}

if err := s.sourceMustExist(ctx, d.SourceKey()); err != nil {
return nil, err
}
Expand Down
20 changes: 12 additions & 8 deletions internal/pkg/utils/testproject/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func GetTestProject(path string, envs *env.Map, options ...testproject.Option) (
p.logf(`■ ️Initialization done.`)

// Remove all objects
if err := p.Clean(); err != nil {
if err := p.Clean(); !p.IsGuest() && err != nil {
cleanupFn()
return nil, nil, err
}
Expand Down Expand Up @@ -217,9 +217,11 @@ func (p *Project) SetState(stateFilePath string) error {
}

// Create branches
err = p.createBranches(stateFile.Branches)
if err != nil {
return err
if !p.IsGuest() {
err = p.createBranches(stateFile.Branches)
if err != nil {
return err
}
}

// Create configs in branches
Expand All @@ -240,10 +242,12 @@ func (p *Project) SetState(stateFilePath string) error {
return err
}

// Create sandboxes in default branch
err = p.createSandboxes(p.defaultBranch.ID, stateFile.Sandboxes)
if err != nil {
return err
if !p.IsGuest() {
// Create sandboxes in default branch
err = p.createSandboxes(p.defaultBranch.ID, stateFile.Sandboxes)
if err != nil {
return err
}
}

p.logf("■ Project state set.")
Expand Down
72 changes: 72 additions & 0 deletions test/stream/bridge/keboola/guest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package keboola_test

import (
"context"
"net/http"
"net/url"
"strconv"
"testing"
"time"

utilsproject "github.com/keboola/go-utils/pkg/testproject"
"github.com/stretchr/testify/require"

commonDeps "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/api"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/config"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/utils/netutils"
)

// To see details run: TEST_VERBOSE=true go test ./test/stream/bridge/... -v

func TestGuestUserWorkflow(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()
modifyConfig := func(cfg *config.Config) {
apiPort := netutils.FreePortForTest(t)
cfg.API.Listen = "0.0.0.0:" + strconv.FormatInt(int64(apiPort), 10)
u, err := url.Parse("http://localhost:" + strconv.FormatInt(int64(apiPort), 10))
require.NoError(t, err)
cfg.API.PublicURL = u
}
ts := setup(
t,
ctx,
modifyConfig,
utilsproject.WithIsGuest(),
)
ts.setupSourceThroughAPI(t, ctx, http.StatusForbidden)
defer ts.teardown(t, ctx)
recreateStreamAPI(t, &ts, ctx, modifyConfig)
ts.setupSourceThroughAPI(t, ctx, http.StatusOK)

recreateStreamAPI(t, &ts, ctx, modifyConfig, utilsproject.WithIsGuest())
ts.setupSinkThroughAPI(t, ctx, http.StatusForbidden)
}

func recreateStreamAPI(t *testing.T, ts *testState, ctx context.Context, modifyConfig func(cfg *config.Config), options ...utilsproject.Option) {
t.Helper()

// Kill existing API as we are changing project
ts.apiScp.Process().Shutdown(ctx, errors.New("bye bye"))
ts.apiScp.Process().WaitForShutdown()

// Setup new project without guest user to setup source
ts.setupProject(t, options...)
ts.apiScp, ts.apiMock = dependencies.NewMockedAPIScopeWithConfig(
t,
ctx,
func(c *config.Config) {
c.NodeID = "api"
modifyConfig(c)
},
commonDeps.WithEtcdConfig(ts.etcdConfig),
commonDeps.WithDebugLogger(ts.logger),
commonDeps.WithTestProject(ts.project),
)
require.NoError(t, api.Start(ctx, ts.apiScp, ts.apiMock.TestConfig()))
}
1 change: 1 addition & 0 deletions test/stream/bridge/keboola/keboola_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func TestKeboolaBridgeWorkflow(t *testing.T) {
}

ts := setup(t, ctx, configFn)
ts.setupSink(t, ctx)
defer ts.teardown(t, ctx)

// Check initial state
Expand Down
Loading

0 comments on commit 73b1cc0

Please sign in to comment.