Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

724 bi collection improvments #770

Merged
merged 15 commits into from
Oct 6, 2020
72 changes: 36 additions & 36 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,32 @@ const (
)

type Dependencies struct {
ctx context.Context
Cataloger catalog.Cataloger
Auth auth.Service
BlockAdapter block.Adapter
Stats stats.Collector
Retention retention.Service
Dedup *dedup.Cleaner
Meta auth.MetadataManager
Migrator db.Migrator
Collector stats.Collector
logger logging.Logger
ctx context.Context
Cataloger catalog.Cataloger
Auth auth.Service
BlockAdapter block.Adapter
Stats stats.Collector
Retention retention.Service
Dedup *dedup.Cleaner
MetadataManager auth.MetadataManager
Migrator db.Migrator
Collector stats.Collector
logger logging.Logger
}

func (d *Dependencies) WithContext(ctx context.Context) *Dependencies {
return &Dependencies{
ctx: ctx,
Cataloger: d.Cataloger,
Auth: d.Auth,
BlockAdapter: d.BlockAdapter.WithContext(ctx),
Stats: d.Stats,
Retention: d.Retention,
Dedup: d.Dedup,
Meta: d.Meta,
Migrator: d.Migrator,
Collector: d.Collector,
logger: d.logger.WithContext(ctx),
ctx: ctx,
Cataloger: d.Cataloger,
Auth: d.Auth,
BlockAdapter: d.BlockAdapter.WithContext(ctx),
Stats: d.Stats,
Retention: d.Retention,
Dedup: d.Dedup,
MetadataManager: d.MetadataManager,
Migrator: d.Migrator,
Collector: d.Collector,
logger: d.logger.WithContext(ctx),
}
}

Expand All @@ -95,20 +95,20 @@ type Controller struct {
}

func NewController(cataloger catalog.Cataloger, auth auth.Service, blockAdapter block.Adapter, stats stats.Collector, retention retention.Service,
dedupCleaner *dedup.Cleaner, meta auth.MetadataManager, migrator db.Migrator, collector stats.Collector, logger logging.Logger) *Controller {
dedupCleaner *dedup.Cleaner, metadataManager auth.MetadataManager, migrator db.Migrator, collector stats.Collector, logger logging.Logger) *Controller {
c := &Controller{
deps: &Dependencies{
ctx: context.Background(),
Cataloger: cataloger,
Auth: auth,
BlockAdapter: blockAdapter,
Stats: stats,
Retention: retention,
Dedup: dedupCleaner,
Meta: meta,
Migrator: migrator,
Collector: collector,
logger: logger,
ctx: context.Background(),
Cataloger: cataloger,
Auth: auth,
BlockAdapter: blockAdapter,
Stats: stats,
Retention: retention,
Dedup: dedupCleaner,
MetadataManager: metadataManager,
Migrator: migrator,
Collector: collector,
logger: logger,
},
}
return c
Expand Down Expand Up @@ -235,7 +235,7 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler {
}

// check if previous setup completed
if ts, _ := c.deps.Meta.SetupTimestamp(); !ts.IsZero() {
if ts, _ := c.deps.MetadataManager.SetupTimestamp(); !ts.IsZero() {
return setupop.NewSetupLakeFSConflict().
WithPayload(&models.Error{
Message: "lakeFS already initialized",
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler {
}

// update setup completed timestamp
if err := c.deps.Meta.UpdateSetupTimestamp(time.Now()); err != nil {
if err := c.deps.MetadataManager.UpdateSetupTimestamp(time.Now()); err != nil {
c.deps.logger.WithError(err).Error("Failed the update setup timestamp")
}

Expand Down
44 changes: 22 additions & 22 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@ var (
)

type Handler struct {
meta auth.MetadataManager
cataloger catalog.Cataloger
blockStore block.Adapter
authService auth.Service
stats stats.Collector
retention retention.Service
migrator db.Migrator
apiServer *restapi.Server
handler *http.ServeMux
dedupCleaner *dedup.Cleaner
logger logging.Logger
metadataManager auth.MetadataManager
cataloger catalog.Cataloger
blockStore block.Adapter
authService auth.Service
stats stats.Collector
retention retention.Service
migrator db.Migrator
apiServer *restapi.Server
handler *http.ServeMux
dedupCleaner *dedup.Cleaner
logger logging.Logger
}

func NewHandler(
cataloger catalog.Cataloger,
blockStore block.Adapter,
authService auth.Service,
meta auth.MetadataManager,
metadataManager auth.MetadataManager,
stats stats.Collector,
retention retention.Service,
migrator db.Migrator,
Expand All @@ -67,15 +67,15 @@ func NewHandler(
) http.Handler {
logger.Info("initialized OpenAPI server")
s := &Handler{
cataloger: cataloger,
blockStore: blockStore,
authService: authService,
meta: meta,
stats: stats,
retention: retention,
migrator: migrator,
dedupCleaner: dedupCleaner,
logger: logger,
cataloger: cataloger,
blockStore: blockStore,
authService: authService,
metadataManager: metadataManager,
stats: stats,
retention: retention,
migrator: migrator,
dedupCleaner: dedupCleaner,
logger: logger,
}
s.buildAPI()
return s.handler
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *Handler) buildAPI() {
api.BasicAuthAuth = s.BasicAuth()
api.JwtTokenAuth = s.JwtTokenAuth()
// bind our handlers to the server
NewController(s.cataloger, s.authService, s.blockStore, s.stats, s.retention, s.dedupCleaner, s.meta, s.migrator, s.stats, s.logger).Configure(api)
NewController(s.cataloger, s.authService, s.blockStore, s.stats, s.retention, s.dedupCleaner, s.metadataManager, s.migrator, s.stats, s.logger).Configure(api)

// setup host/port
s.apiServer = restapi.NewServer(api)
Expand Down
3 changes: 2 additions & 1 deletion api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

dbparams "github.com/treeverse/lakefs/db/params"
"github.com/treeverse/lakefs/dedup"
"github.com/treeverse/lakefs/stats"

"github.com/go-openapi/runtime"
httptransport "github.com/go-openapi/runtime/client"
Expand Down Expand Up @@ -71,7 +72,7 @@ func createDefaultAdminUser(authService auth.Service, t *testing.T) *authmodel.C

type mockCollector struct{}

func (m *mockCollector) CollectMetadata(_ map[string]string) {}
func (m *mockCollector) CollectMetadata(_ *stats.Metadata) {}

func (m *mockCollector) CollectEvent(_, _ string) {}

Expand Down
2 changes: 1 addition & 1 deletion block/s3/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/treeverse/lakefs/block"
inventorys3 "github.com/treeverse/lakefs/inventory/s3"
inventorys3 "github.com/treeverse/lakefs/cloud/aws/s3_inventory"
"github.com/treeverse/lakefs/logging"
)

Expand Down
2 changes: 1 addition & 1 deletion block/s3/inventory_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"time"

"github.com/treeverse/lakefs/block"
inventorys3 "github.com/treeverse/lakefs/cloud/aws/s3_inventory"
"github.com/treeverse/lakefs/cmdutils"
inventorys3 "github.com/treeverse/lakefs/inventory/s3"
)

var ErrInventoryNotSorted = errors.New("got unsorted s3 inventory")
Expand Down
2 changes: 1 addition & 1 deletion block/s3/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/go-openapi/swag"
"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/block/s3"
inventorys3 "github.com/treeverse/lakefs/inventory/s3"
inventorys3 "github.com/treeverse/lakefs/cloud/aws/s3_inventory"
"github.com/treeverse/lakefs/logging"
)

Expand Down
34 changes: 34 additions & 0 deletions cloud/aws/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/treeverse/lakefs/logging"
)

type MetadataProvider struct {
logger logging.Logger
awsConfig *aws.Config
}

func NewMetadataProvider(logger logging.Logger, awsConfig *aws.Config) *MetadataProvider {
return &MetadataProvider{logger: logger, awsConfig: awsConfig}
}

func (m *MetadataProvider) GetMetadata() map[string]string {
sess, err := session.NewSession(m.awsConfig)
if err != nil {
m.logger.Errorf("%v: failed to create AWS session for BI", err)
return nil
}
sess.ClientConfig(s3.ServiceName)
stsClient := sts.New(sess)
identity, err := stsClient.GetCallerIdentity(&sts.GetCallerIdentityInput{})
if err != nil {
m.logger.Errorf("%v: failed to get AWS account ID for BI", err)
return nil
}
return map[string]string{"aws_account_id": *identity.Account}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package s3
package s3_inventory

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package s3
package s3_inventory

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package s3
package s3_inventory

import "github.com/xitongsys/parquet-go/reader"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package s3
package s3_inventory

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package s3
package s3_inventory

import (
"context"
Expand Down
23 changes: 23 additions & 0 deletions cloud/gcp/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package gcp

import (
"cloud.google.com/go/compute/metadata"
"github.com/treeverse/lakefs/logging"
)

type MetadataProvider struct {
logger logging.Logger
}

func NewMetadataProvider(logger logging.Logger) *MetadataProvider {
return &MetadataProvider{logger: logger}
}

func (m *MetadataProvider) GetMetadata() map[string]string {
projectID, err := metadata.NumericProjectID()
if err != nil {
m.logger.Errorf("%v: failed to get Google numeric project ID from instance metadata", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a warning - it doesn't affect the health or correctness of lakeFS in any way (nor do we ever request explicit permissions to do this operation (also true for the AWS metadata provider))

return nil
}
return map[string]string{"google_numeric_project_id": projectID}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another thing that's missing (sorry for not catching this earlier) is one way hashing: we never want to send the actual account id, just to know that different installations come from the same one. I suggest hashing the received id here and in the aws provider as well.

in terms of metadata fields, i would seperate it into 2 fields: account_type ("aws", "gcp", etc) and "account_id" which is the hash. would make it easier downstream to consume.

}
25 changes: 25 additions & 0 deletions cloud/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package cloud

import (
"github.com/treeverse/lakefs/block/gs"
s3a "github.com/treeverse/lakefs/block/s3"
"github.com/treeverse/lakefs/cloud/aws"
"github.com/treeverse/lakefs/cloud/gcp"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/logging"
)

type MetadataProvider interface {
GetMetadata() map[string]string
}

func BuildMetadataProvider(logger logging.Logger, c *config.Config) MetadataProvider {
switch c.GetBlockstoreType() {
case gs.BlockstoreType:
return gcp.NewMetadataProvider(logger)
case s3a.BlockstoreType:
return aws.NewMetadataProvider(logger, c.GetAwsConfig())
default:
return nil
}
}
16 changes: 6 additions & 10 deletions cmd/lakefs/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/treeverse/lakefs/auth"
"github.com/treeverse/lakefs/auth/crypt"
"github.com/treeverse/lakefs/auth/model"
"github.com/treeverse/lakefs/cloud"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/stats"
)

Expand All @@ -38,14 +40,9 @@ var initCmd = &cobra.Command{
dbPool,
crypt.NewSecretStore(cfg.GetAuthEncryptionSecret()),
cfg.GetAuthCacheConfig())

metaManager := auth.NewDBMetadataManager(config.Version, dbPool)
metadata, err := metaManager.Write()
if err != nil {
fmt.Printf("failed to write initial setup metadata: %s\n", err)
os.Exit(1)
}

authMetadataManager := auth.NewDBMetadataManager(config.Version, dbPool)
cloudMetadataProvider := cloud.BuildMetadataProvider(logging.Default(), cfg)
metadata := stats.NewMetadata(logging.Default(), cfg, authMetadataManager, cloudMetadataProvider)
credentials, err := auth.SetupAdminUser(authService, &model.User{
CreatedAt: time.Now(),
Username: userName,
Expand All @@ -56,8 +53,7 @@ var initCmd = &cobra.Command{
}

ctx, cancelFn := context.WithCancel(context.Background())
processID, bufferedCollectorArgs := cfg.GetStatsBufferedCollectorArgs()
stats := stats.NewBufferedCollector(metadata[auth.InstallationIDKeyName], processID, bufferedCollectorArgs...)
stats := stats.NewBufferedCollector(metadata.InstallationID, cfg)
go stats.Run(ctx)
stats.CollectMetadata(metadata)
stats.CollectEvent("global", "init")
Expand Down
Loading