diff --git a/api/api_controller.go b/api/api_controller.go index 3498549fd3f..c458dfa3c9d 100644 --- a/api/api_controller.go +++ b/api/api_controller.go @@ -53,32 +53,32 @@ const ( ) type Dependencies struct { - ctx context.Context - Cataloger catalog.Cataloger - Auth auth.Service - BlockAdapter block.Adapter - Stats stats.Collector - Retention retention.Service - Dedup *dedup.Cleaner - Meta auth.MetadataManager - Migrator db.Migrator - Collector stats.Collector - logger logging.Logger + ctx context.Context + Cataloger catalog.Cataloger + Auth auth.Service + BlockAdapter block.Adapter + Stats stats.Collector + Retention retention.Service + Dedup *dedup.Cleaner + MetadataManager auth.MetadataManager + Migrator db.Migrator + Collector stats.Collector + logger logging.Logger } func (d *Dependencies) WithContext(ctx context.Context) *Dependencies { return &Dependencies{ - ctx: ctx, - Cataloger: d.Cataloger, - Auth: d.Auth, - BlockAdapter: d.BlockAdapter.WithContext(ctx), - Stats: d.Stats, - Retention: d.Retention, - Dedup: d.Dedup, - Meta: d.Meta, - Migrator: d.Migrator, - Collector: d.Collector, - logger: d.logger.WithContext(ctx), + ctx: ctx, + Cataloger: d.Cataloger, + Auth: d.Auth, + BlockAdapter: d.BlockAdapter.WithContext(ctx), + Stats: d.Stats, + Retention: d.Retention, + Dedup: d.Dedup, + MetadataManager: d.MetadataManager, + Migrator: d.Migrator, + Collector: d.Collector, + logger: d.logger.WithContext(ctx), } } @@ -95,20 +95,20 @@ type Controller struct { } func NewController(cataloger catalog.Cataloger, auth auth.Service, blockAdapter block.Adapter, stats stats.Collector, retention retention.Service, - dedupCleaner *dedup.Cleaner, meta auth.MetadataManager, migrator db.Migrator, collector stats.Collector, logger logging.Logger) *Controller { + dedupCleaner *dedup.Cleaner, metadataManager auth.MetadataManager, migrator db.Migrator, collector stats.Collector, logger logging.Logger) *Controller { c := &Controller{ deps: &Dependencies{ - ctx: context.Background(), - Cataloger: cataloger, - Auth: auth, - BlockAdapter: blockAdapter, - Stats: stats, - Retention: retention, - Dedup: dedupCleaner, - Meta: meta, - Migrator: migrator, - Collector: collector, - logger: logger, + ctx: context.Background(), + Cataloger: cataloger, + Auth: auth, + BlockAdapter: blockAdapter, + Stats: stats, + Retention: retention, + Dedup: dedupCleaner, + MetadataManager: metadataManager, + Migrator: migrator, + Collector: collector, + logger: logger, }, } return c @@ -235,7 +235,7 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler { } // check if previous setup completed - if ts, _ := c.deps.Meta.SetupTimestamp(); !ts.IsZero() { + if ts, _ := c.deps.MetadataManager.SetupTimestamp(); !ts.IsZero() { return setupop.NewSetupLakeFSConflict(). WithPayload(&models.Error{ Message: "lakeFS already initialized", @@ -269,7 +269,7 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler { } // update setup completed timestamp - if err := c.deps.Meta.UpdateSetupTimestamp(time.Now()); err != nil { + if err := c.deps.MetadataManager.UpdateSetupTimestamp(time.Now()); err != nil { c.deps.logger.WithError(err).Error("Failed the update setup timestamp") } diff --git a/api/handler.go b/api/handler.go index c8f823ec823..55c3848aac1 100644 --- a/api/handler.go +++ b/api/handler.go @@ -41,24 +41,24 @@ var ( ) type Handler struct { - meta auth.MetadataManager - cataloger catalog.Cataloger - blockStore block.Adapter - authService auth.Service - stats stats.Collector - retention retention.Service - migrator db.Migrator - apiServer *restapi.Server - handler *http.ServeMux - dedupCleaner *dedup.Cleaner - logger logging.Logger + metadataManager auth.MetadataManager + cataloger catalog.Cataloger + blockStore block.Adapter + authService auth.Service + stats stats.Collector + retention retention.Service + migrator db.Migrator + apiServer *restapi.Server + handler *http.ServeMux + dedupCleaner *dedup.Cleaner + logger logging.Logger } func NewHandler( cataloger catalog.Cataloger, blockStore block.Adapter, authService auth.Service, - meta auth.MetadataManager, + metadataManager auth.MetadataManager, stats stats.Collector, retention retention.Service, migrator db.Migrator, @@ -67,15 +67,15 @@ func NewHandler( ) http.Handler { logger.Info("initialized OpenAPI server") s := &Handler{ - cataloger: cataloger, - blockStore: blockStore, - authService: authService, - meta: meta, - stats: stats, - retention: retention, - migrator: migrator, - dedupCleaner: dedupCleaner, - logger: logger, + cataloger: cataloger, + blockStore: blockStore, + authService: authService, + metadataManager: metadataManager, + stats: stats, + retention: retention, + migrator: migrator, + dedupCleaner: dedupCleaner, + logger: logger, } s.buildAPI() return s.handler @@ -167,7 +167,7 @@ func (s *Handler) buildAPI() { api.BasicAuthAuth = s.BasicAuth() api.JwtTokenAuth = s.JwtTokenAuth() // bind our handlers to the server - NewController(s.cataloger, s.authService, s.blockStore, s.stats, s.retention, s.dedupCleaner, s.meta, s.migrator, s.stats, s.logger).Configure(api) + NewController(s.cataloger, s.authService, s.blockStore, s.stats, s.retention, s.dedupCleaner, s.metadataManager, s.migrator, s.stats, s.logger).Configure(api) // setup host/port s.apiServer = restapi.NewServer(api) diff --git a/api/handler_test.go b/api/handler_test.go index 03599f7d88a..f6e04b45721 100644 --- a/api/handler_test.go +++ b/api/handler_test.go @@ -11,6 +11,7 @@ import ( dbparams "github.com/treeverse/lakefs/db/params" "github.com/treeverse/lakefs/dedup" + "github.com/treeverse/lakefs/stats" "github.com/go-openapi/runtime" httptransport "github.com/go-openapi/runtime/client" @@ -71,7 +72,7 @@ func createDefaultAdminUser(authService auth.Service, t *testing.T) *authmodel.C type mockCollector struct{} -func (m *mockCollector) CollectMetadata(_ map[string]string) {} +func (m *mockCollector) CollectMetadata(_ *stats.Metadata) {} func (m *mockCollector) CollectEvent(_, _ string) {} diff --git a/block/s3/inventory.go b/block/s3/inventory.go index d852275dcc4..92c97f63c22 100644 --- a/block/s3/inventory.go +++ b/block/s3/inventory.go @@ -12,7 +12,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/treeverse/lakefs/block" - inventorys3 "github.com/treeverse/lakefs/inventory/s3" + "github.com/treeverse/lakefs/cloud/aws/s3inventory" "github.com/treeverse/lakefs/logging" ) @@ -33,10 +33,10 @@ type inventoryFile struct { } func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, manifestURL string, shouldSort bool) (block.Inventory, error) { - return GenerateInventory(logger, manifestURL, a.s3, inventorys3.NewReader(ctx, a.s3, logger), shouldSort) + return GenerateInventory(logger, manifestURL, a.s3, s3inventory.NewReader(ctx, a.s3, logger), shouldSort) } -func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3API, inventoryReader inventorys3.IReader, shouldSort bool) (block.Inventory, error) { +func GenerateInventory(logger logging.Logger, manifestURL string, s3 s3iface.S3API, inventoryReader s3inventory.IReader, shouldSort bool) (block.Inventory, error) { if logger == nil { logger = logging.Default() } @@ -57,7 +57,7 @@ type Inventory struct { Manifest *Manifest logger logging.Logger shouldSort bool - reader inventorys3.IReader + reader s3inventory.IReader } func (inv *Inventory) Iterator() block.InventoryIterator { @@ -86,8 +86,8 @@ func loadManifest(manifestURL string, s3svc s3iface.S3API) (*Manifest, error) { if err != nil { return nil, err } - if m.Format != inventorys3.OrcFormatName && m.Format != inventorys3.ParquetFormatName { - return nil, fmt.Errorf("%w. got format: %s", inventorys3.ErrUnsupportedInventoryFormat, m.Format) + if m.Format != s3inventory.OrcFormatName && m.Format != s3inventory.ParquetFormatName { + return nil, fmt.Errorf("%w. got format: %s", s3inventory.ErrUnsupportedInventoryFormat, m.Format) } m.URL = manifestURL inventoryBucketArn, err := arn.Parse(m.InventoryBucketArn) @@ -98,7 +98,7 @@ func loadManifest(manifestURL string, s3svc s3iface.S3API) (*Manifest, error) { return &m, nil } -func sortManifest(m *Manifest, logger logging.Logger, reader inventorys3.IReader) error { +func sortManifest(m *Manifest, logger logging.Logger, reader s3inventory.IReader) error { firstKeyByInventoryFile := make(map[string]string) lastKeyByInventoryFile := make(map[string]string) for _, f := range m.Files { diff --git a/block/s3/inventory_iterator.go b/block/s3/inventory_iterator.go index f42f290c6d3..bd4c964af26 100644 --- a/block/s3/inventory_iterator.go +++ b/block/s3/inventory_iterator.go @@ -7,8 +7,8 @@ import ( "time" "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/cloud/aws/s3inventory" "github.com/treeverse/lakefs/cmdutils" - inventorys3 "github.com/treeverse/lakefs/inventory/s3" ) var ErrInventoryNotSorted = errors.New("got unsorted s3 inventory") @@ -17,7 +17,7 @@ type InventoryIterator struct { *Inventory err error val *block.InventoryObject - buffer []inventorys3.InventoryObject + buffer []s3inventory.InventoryObject inventoryFileIndex int valIndexInBuffer int inventoryFileProgress *cmdutils.Progress @@ -94,7 +94,7 @@ func (it *InventoryIterator) fillBuffer() bool { it.logger.Errorf("failed to close manifest file reader. file=%s, err=%w", it.Manifest.Files[it.inventoryFileIndex].Key, err) } }() - it.buffer = make([]inventorys3.InventoryObject, rdr.GetNumRows()) + it.buffer = make([]s3inventory.InventoryObject, rdr.GetNumRows()) err = rdr.Read(&it.buffer) if err != nil { it.err = err diff --git a/block/s3/inventory_test.go b/block/s3/inventory_test.go index e715a4d41d7..3b3cd8d3353 100644 --- a/block/s3/inventory_test.go +++ b/block/s3/inventory_test.go @@ -15,20 +15,20 @@ import ( "github.com/go-openapi/swag" "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/block/s3" - inventorys3 "github.com/treeverse/lakefs/inventory/s3" + "github.com/treeverse/lakefs/cloud/aws/s3inventory" "github.com/treeverse/lakefs/logging" ) var ErrReadFile = errors.New("error reading file") -func rows(keys []string, lastModified map[string]time.Time) []*inventorys3.InventoryObject { +func rows(keys []string, lastModified map[string]time.Time) []*s3inventory.InventoryObject { if keys == nil { return nil } - res := make([]*inventorys3.InventoryObject, len(keys)) + res := make([]*s3inventory.InventoryObject, len(keys)) for i, key := range keys { if key != "" { - res[i] = new(inventorys3.InventoryObject) + res[i] = new(s3inventory.InventoryObject) res[i].Key = key res[i].IsLatest = swag.Bool(!strings.Contains(key, "_expired")) res[i].IsDeleteMarker = swag.Bool(strings.Contains(key, "_del")) @@ -210,7 +210,7 @@ type mockInventoryReader struct { } type mockInventoryFileReader struct { - rows []*inventorys3.InventoryObject + rows []*s3inventory.InventoryObject nextIdx int inventoryReader *mockInventoryReader key string @@ -247,8 +247,8 @@ func (m *mockInventoryFileReader) Close() error { } func (m *mockInventoryFileReader) Read(dstInterface interface{}) error { - res := make([]inventorys3.InventoryObject, 0, len(m.rows)) - dst := dstInterface.(*[]inventorys3.InventoryObject) + res := make([]s3inventory.InventoryObject, 0, len(m.rows)) + dst := dstInterface.(*[]s3inventory.InventoryObject) for i := m.nextIdx; i < len(m.rows) && i < m.nextIdx+len(*dst); i++ { if m.rows[i] == nil { return ErrReadFile // for test - simulate file with error @@ -264,12 +264,12 @@ func (m *mockInventoryFileReader) GetNumRows() int64 { return int64(len(m.rows)) } -func (m *mockInventoryReader) GetFileReader(_ string, _ string, key string) (inventorys3.FileReader, error) { +func (m *mockInventoryReader) GetFileReader(_ string, _ string, key string) (s3inventory.FileReader, error) { m.openFiles[key] = true return &mockInventoryFileReader{rows: rows(fileContents[key], m.lastModified), inventoryReader: m, key: key}, nil } -func (m *mockInventoryReader) GetMetadataReader(_ string, _ string, key string) (inventorys3.MetadataReader, error) { +func (m *mockInventoryReader) GetMetadataReader(_ string, _ string, key string) (s3inventory.MetadataReader, error) { m.openFiles[key] = true return &mockInventoryFileReader{rows: rows(fileContents[key], m.lastModified), inventoryReader: m, key: key}, nil } diff --git a/cloud/aws/metadata.go b/cloud/aws/metadata.go new file mode 100644 index 00000000000..eb3a50bf5c9 --- /dev/null +++ b/cloud/aws/metadata.go @@ -0,0 +1,41 @@ +package aws + +import ( + "crypto/md5" //nolint:gosec + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/sts" + "github.com/treeverse/lakefs/cloud" + "github.com/treeverse/lakefs/logging" +) + +type MetadataProvider struct { + logger logging.Logger + awsConfig *aws.Config +} + +func NewMetadataProvider(logger logging.Logger, awsConfig *aws.Config) *MetadataProvider { + return &MetadataProvider{logger: logger, awsConfig: awsConfig} +} + +func (m *MetadataProvider) GetMetadata() map[string]string { + sess, err := session.NewSession(m.awsConfig) + if err != nil { + m.logger.Warnf("%v: failed to create AWS session for BI", err) + return nil + } + sess.ClientConfig(s3.ServiceName) + stsClient := sts.New(sess) + identity, err := stsClient.GetCallerIdentity(&sts.GetCallerIdentityInput{}) + if err != nil { + m.logger.Warnf("%v: failed to get AWS account ID for BI", err) + return nil + } + return map[string]string{ + cloud.IDKey: fmt.Sprintf("%x", md5.Sum([]byte(*identity.Account))), //nolint:gosec + cloud.IDTypeKey: "aws_account_id", + } +} diff --git a/inventory/s3/orc_reader.go b/cloud/aws/s3inventory/orc_reader.go similarity index 99% rename from inventory/s3/orc_reader.go rename to cloud/aws/s3inventory/orc_reader.go index bf5accacff9..c75b9449aec 100644 --- a/inventory/s3/orc_reader.go +++ b/cloud/aws/s3inventory/orc_reader.go @@ -1,4 +1,4 @@ -package s3 +package s3inventory import ( "context" diff --git a/inventory/s3/orc_utils.go b/cloud/aws/s3inventory/orc_utils.go similarity index 99% rename from inventory/s3/orc_utils.go rename to cloud/aws/s3inventory/orc_utils.go index 49036af4ccf..88745af8efb 100644 --- a/inventory/s3/orc_utils.go +++ b/cloud/aws/s3inventory/orc_utils.go @@ -1,4 +1,4 @@ -package s3 +package s3inventory import ( "context" diff --git a/inventory/s3/parquet_reader.go b/cloud/aws/s3inventory/parquet_reader.go similarity index 96% rename from inventory/s3/parquet_reader.go rename to cloud/aws/s3inventory/parquet_reader.go index 88509f68c59..33b5255427a 100644 --- a/inventory/s3/parquet_reader.go +++ b/cloud/aws/s3inventory/parquet_reader.go @@ -1,4 +1,4 @@ -package s3 +package s3inventory import "github.com/xitongsys/parquet-go/reader" diff --git a/inventory/s3/reader.go b/cloud/aws/s3inventory/reader.go similarity index 99% rename from inventory/s3/reader.go rename to cloud/aws/s3inventory/reader.go index 15278fb1a37..18c46211b81 100644 --- a/inventory/s3/reader.go +++ b/cloud/aws/s3inventory/reader.go @@ -1,4 +1,4 @@ -package s3 +package s3inventory import ( "context" diff --git a/inventory/s3/reader_test.go b/cloud/aws/s3inventory/reader_test.go similarity index 99% rename from inventory/s3/reader_test.go rename to cloud/aws/s3inventory/reader_test.go index b19eac67088..2131d2c7426 100644 --- a/inventory/s3/reader_test.go +++ b/cloud/aws/s3inventory/reader_test.go @@ -1,4 +1,4 @@ -package s3 +package s3inventory import ( "context" diff --git a/cloud/gcp/metadata.go b/cloud/gcp/metadata.go new file mode 100644 index 00000000000..99097cc529b --- /dev/null +++ b/cloud/gcp/metadata.go @@ -0,0 +1,30 @@ +package gcp + +import ( + "crypto/md5" //nolint:gosec + "fmt" + + "cloud.google.com/go/compute/metadata" + "github.com/treeverse/lakefs/cloud" + "github.com/treeverse/lakefs/logging" +) + +type MetadataProvider struct { + logger logging.Logger +} + +func NewMetadataProvider(logger logging.Logger) *MetadataProvider { + return &MetadataProvider{logger: logger} +} + +func (m *MetadataProvider) GetMetadata() map[string]string { + projectID, err := metadata.NumericProjectID() + if err != nil { + m.logger.Warnf("%v: failed to get Google numeric project ID from instance metadata", err) + return nil + } + return map[string]string{ + cloud.IDKey: fmt.Sprintf("%x", md5.Sum([]byte(projectID))), //nolint:gosec + cloud.IDTypeKey: "gcp_project_numerical_id", + } +} diff --git a/cloud/metadata.go b/cloud/metadata.go new file mode 100644 index 00000000000..06ec9f790d1 --- /dev/null +++ b/cloud/metadata.go @@ -0,0 +1,10 @@ +package cloud + +const ( + IDTypeKey = "cloud_id_type" + IDKey = "cloud_id" +) + +type MetadataProvider interface { + GetMetadata() map[string]string +} diff --git a/cmd/lakefs/cmd/init.go b/cmd/lakefs/cmd/init.go index aac1c7c6ded..0edc5748727 100644 --- a/cmd/lakefs/cmd/init.go +++ b/cmd/lakefs/cmd/init.go @@ -12,6 +12,7 @@ import ( "github.com/treeverse/lakefs/auth/model" "github.com/treeverse/lakefs/config" "github.com/treeverse/lakefs/db" + "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/stats" ) @@ -38,14 +39,9 @@ var initCmd = &cobra.Command{ dbPool, crypt.NewSecretStore(cfg.GetAuthEncryptionSecret()), cfg.GetAuthCacheConfig()) - - metaManager := auth.NewDBMetadataManager(config.Version, dbPool) - metadata, err := metaManager.Write() - if err != nil { - fmt.Printf("failed to write initial setup metadata: %s\n", err) - os.Exit(1) - } - + authMetadataManager := auth.NewDBMetadataManager(config.Version, dbPool) + cloudMetadataProvider := stats.BuildMetadataProvider(logging.Default(), cfg) + metadata := stats.NewMetadata(logging.Default(), cfg, authMetadataManager, cloudMetadataProvider) credentials, err := auth.SetupAdminUser(authService, &model.User{ CreatedAt: time.Now(), Username: userName, @@ -56,8 +52,7 @@ var initCmd = &cobra.Command{ } ctx, cancelFn := context.WithCancel(context.Background()) - processID, bufferedCollectorArgs := cfg.GetStatsBufferedCollectorArgs() - stats := stats.NewBufferedCollector(metadata[auth.InstallationIDKeyName], processID, bufferedCollectorArgs...) + stats := stats.NewBufferedCollector(metadata.InstallationID, cfg) go stats.Run(ctx) stats.CollectMetadata(metadata) stats.CollectEvent("global", "init") diff --git a/cmd/lakefs/cmd/run.go b/cmd/lakefs/cmd/run.go index bc38d34dafc..dba796cbbb6 100644 --- a/cmd/lakefs/cmd/run.go +++ b/cmd/lakefs/cmd/run.go @@ -75,20 +75,12 @@ var runCmd = &cobra.Command{ dbPool, crypt.NewSecretStore(cfg.GetAuthEncryptionSecret()), cfg.GetAuthCacheConfig()) - - meta := auth.NewDBMetadataManager(config.Version, dbPool) - - processID, bufferedCollectorArgs := cfg.GetStatsBufferedCollectorArgs() - - // collect and write metadata - metadata, err := meta.Write() - if err != nil { - logger.WithError(err).Debug("failed to collect account metadata") - } - - stats := stats.NewBufferedCollector(metadata[auth.InstallationIDKeyName], processID, bufferedCollectorArgs...) + authMetadataManager := auth.NewDBMetadataManager(config.Version, dbPool) + cloudMetadataProvider := stats.BuildMetadataProvider(logger, cfg) + metadata := stats.NewMetadata(logger, cfg, authMetadataManager, cloudMetadataProvider) + bufferedCollector := stats.NewBufferedCollector(metadata.InstallationID, cfg) // send metadata - stats.CollectMetadata(metadata) + bufferedCollector.CollectMetadata(metadata) dedupCleaner := dedup.NewCleaner(blockStore, cataloger.DedupReportChannel()) defer func() { @@ -106,8 +98,8 @@ var runCmd = &cobra.Command{ cataloger, blockStore, authService, - meta, - stats, + authMetadataManager, + bufferedCollector, retention, migrator, dedupCleaner, @@ -121,14 +113,14 @@ var runCmd = &cobra.Command{ blockStore, authService, cfg.GetS3GatewayDomainName(), - stats, + bufferedCollector, dedupCleaner, ) ctx, cancelFn := context.WithCancel(context.Background()) - go stats.Run(ctx) + go bufferedCollector.Run(ctx) - stats.CollectEvent("global", "run") + bufferedCollector.CollectEvent("global", "run") logging.Default().WithField("listen_address", cfg.GetListenAddress()).Info("starting HTTP server") server := &http.Server{ @@ -152,7 +144,7 @@ var runCmd = &cobra.Command{ <-done cancelFn() - <-stats.Done() + <-bufferedCollector.Done() }, } diff --git a/config/config.go b/config/config.go index a67d4048d04..6238fc2df13 100644 --- a/config/config.go +++ b/config/config.go @@ -11,7 +11,6 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sts" - "github.com/google/uuid" _ "github.com/jackc/pgx/v4/stdlib" "github.com/mitchellh/go-homedir" log "github.com/sirupsen/logrus" @@ -20,7 +19,6 @@ import ( blockparams "github.com/treeverse/lakefs/block/params" catalogparams "github.com/treeverse/lakefs/catalog/params" dbparams "github.com/treeverse/lakefs/db/params" - "github.com/treeverse/lakefs/stats" ) const ( @@ -298,20 +296,6 @@ func (c *Config) GetStatsFlushInterval() time.Duration { return viper.GetDuration("stats.flush_interval") } -func (c *Config) GetStatsBufferedCollectorArgs() (processID string, opts []stats.BufferedCollectorOpts) { - var sender stats.Sender - if c.GetStatsEnabled() && !strings.HasPrefix(Version, UnreleasedVersion) { - sender = stats.NewHTTPSender(c.GetStatsAddress(), time.Now) - } else { - sender = stats.NewDummySender() - } - return uuid.Must(uuid.NewUUID()).String(), - []stats.BufferedCollectorOpts{ - stats.WithSender(sender), - stats.WithFlushInterval(c.GetStatsFlushInterval()), - } -} - func GetMetastoreAwsConfig() *aws.Config { cfg := &aws.Config{ Region: aws.String(viper.GetString("metastore.glue.region")), diff --git a/gateway/playback_test.go b/gateway/playback_test.go index 1c2d490630f..fc3a4c18dbf 100644 --- a/gateway/playback_test.go +++ b/gateway/playback_test.go @@ -21,6 +21,7 @@ import ( "github.com/treeverse/lakefs/gateway" "github.com/treeverse/lakefs/gateway/simulator" "github.com/treeverse/lakefs/logging" + "github.com/treeverse/lakefs/stats" "github.com/treeverse/lakefs/testutil" ) @@ -93,7 +94,7 @@ func TestMain(m *testing.M) { os.Exit(code) } -func (m *mockCollector) CollectMetadata(accountMetadata map[string]string) {} +func (m *mockCollector) CollectMetadata(accountMetadata *stats.Metadata) {} func (m *mockCollector) CollectEvent(class, action string) {} diff --git a/go.mod b/go.mod index 127b9553214..68ccf979225 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/treeverse/lakefs go 1.15 require ( - cloud.google.com/go v0.63.0 // indirect + cloud.google.com/go v0.63.0 cloud.google.com/go/storage v1.10.0 github.com/Masterminds/squirrel v1.4.0 github.com/apache/thrift v0.13.0 diff --git a/loadtest/local_load_test.go b/loadtest/local_load_test.go index 4e2ed943fbe..e5f559b0587 100644 --- a/loadtest/local_load_test.go +++ b/loadtest/local_load_test.go @@ -8,9 +8,6 @@ import ( "testing" "time" - dbparams "github.com/treeverse/lakefs/db/params" - "github.com/treeverse/lakefs/dedup" - "github.com/ory/dockertest/v3" "github.com/treeverse/lakefs/api" "github.com/treeverse/lakefs/auth" @@ -20,8 +17,11 @@ import ( "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/catalog" "github.com/treeverse/lakefs/db" + dbparams "github.com/treeverse/lakefs/db/params" + "github.com/treeverse/lakefs/dedup" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/retention" + "github.com/treeverse/lakefs/stats" "github.com/treeverse/lakefs/testutil" ) @@ -45,7 +45,7 @@ func TestMain(m *testing.M) { type mockCollector struct{} -func (m *mockCollector) CollectMetadata(_ map[string]string) {} +func (m *mockCollector) CollectMetadata(_ *stats.Metadata) {} func (m *mockCollector) CollectEvent(_, _ string) {} diff --git a/stats/collector.go b/stats/collector.go index 6197dbee34e..39cf37d3a45 100644 --- a/stats/collector.go +++ b/stats/collector.go @@ -3,8 +3,11 @@ package stats import ( "context" "fmt" + "strings" "time" + "github.com/google/uuid" + "github.com/treeverse/lakefs/config" "github.com/treeverse/lakefs/logging" ) @@ -16,7 +19,7 @@ const ( type Collector interface { CollectEvent(class, action string) - CollectMetadata(accountMetadata map[string]string) + CollectMetadata(accountMetadata *Metadata) } type Metric struct { @@ -32,16 +35,6 @@ type InputEvent struct { Metrics []Metric `json:"metrics"` } -type MetadataEntry struct { - Name string `json:"name"` - Value string `json:"value"` -} - -type Metadata struct { - InstallationID string `json:"installation_id"` - Entries []MetadataEntry `json:"entries"` -} - type primaryKey struct { class string action string @@ -113,7 +106,9 @@ func WithSendTimeout(d time.Duration) BufferedCollectorOpts { } } -func NewBufferedCollector(installationID, processID string, opts ...BufferedCollectorOpts) *BufferedCollector { +func NewBufferedCollector(installationID string, c *config.Config, opts ...BufferedCollectorOpts) *BufferedCollector { + processID, moreOpts := getBufferedCollectorArgs(c) + opts = append(opts, moreOpts...) s := &BufferedCollector{ cache: make(keyIndex), writes: make(chan primaryKey, DefaultCollectorEventBufferSize), @@ -124,11 +119,9 @@ func NewBufferedCollector(installationID, processID string, opts ...BufferedColl installationID: installationID, processID: processID, } - for _, opt := range opts { opt(s) } - return s } func (s *BufferedCollector) getInstallationID() string { @@ -201,19 +194,10 @@ func makeMetrics(counters keyIndex) []Metric { return metrics } -func (s *BufferedCollector) CollectMetadata(accountMetadata map[string]string) { - entries := make([]MetadataEntry, len(accountMetadata)) - i := 0 - for k, v := range accountMetadata { - entries[i] = MetadataEntry{Name: k, Value: v} - i++ - } +func (s *BufferedCollector) CollectMetadata(accountMetadata *Metadata) { ctx, cancel := context.WithTimeout(context.Background(), s.sendTimeout) defer cancel() - err := s.sender.UpdateMetadata(ctx, Metadata{ - InstallationID: s.getInstallationID(), - Entries: entries, - }) + err := s.sender.UpdateMetadata(ctx, *accountMetadata) if err != nil { logging.Default(). WithError(err). @@ -221,3 +205,20 @@ func (s *BufferedCollector) CollectMetadata(accountMetadata map[string]string) { Debug("could not update metadata") } } + +func getBufferedCollectorArgs(c *config.Config) (processID string, opts []BufferedCollectorOpts) { + if c == nil { + return "", nil + } + var sender Sender + if c.GetStatsEnabled() && !strings.HasPrefix(config.Version, config.UnreleasedVersion) { + sender = NewHTTPSender(c.GetStatsAddress(), time.Now) + } else { + sender = NewDummySender() + } + return uuid.Must(uuid.NewUUID()).String(), + []BufferedCollectorOpts{ + WithSender(sender), + WithFlushInterval(c.GetStatsFlushInterval()), + } +} diff --git a/stats/collector_test.go b/stats/collector_test.go index eb02bd7fc5f..22066b32d51 100644 --- a/stats/collector_test.go +++ b/stats/collector_test.go @@ -12,7 +12,7 @@ type mockSender struct { metrics chan []stats.Metric } -func (s *mockSender) SendEvent(ctx context.Context, installationId, processId string, m []stats.Metric) error { +func (s *mockSender) SendEvent(ctx context.Context, installationID, processID string, m []stats.Metric) error { s.metrics <- m return nil } @@ -41,7 +41,7 @@ func TestCallHomeCollector_Collect(t *testing.T) { sender := &mockSender{metrics: make(chan []stats.Metric, 10)} ticker := &mockTicker{tc: make(chan time.Time)} ctx, cancelFn := context.WithCancel(context.Background()) - collector := stats.NewBufferedCollector("installation_id", "process_id", + collector := stats.NewBufferedCollector("installation_id", nil, stats.WithSender(sender), stats.WithTicker(ticker), stats.WithWriteBufferSize(0)) diff --git a/stats/metadata.go b/stats/metadata.go new file mode 100644 index 00000000000..9f3841092ce --- /dev/null +++ b/stats/metadata.go @@ -0,0 +1,58 @@ +package stats + +import ( + "github.com/treeverse/lakefs/auth" + "github.com/treeverse/lakefs/block/gs" + s3a "github.com/treeverse/lakefs/block/s3" + "github.com/treeverse/lakefs/cloud" + "github.com/treeverse/lakefs/cloud/aws" + "github.com/treeverse/lakefs/cloud/gcp" + "github.com/treeverse/lakefs/config" + "github.com/treeverse/lakefs/logging" +) + +const BlockstoreTypeKey = "blockstore_type" + +type MetadataEntry struct { + Name string `json:"name"` + Value string `json:"value"` +} + +type Metadata struct { + InstallationID string `json:"installation_id"` + Entries []MetadataEntry `json:"entries"` +} + +func NewMetadata(logger logging.Logger, c *config.Config, authMetadataManager auth.MetadataManager, cloudMetadataProvider cloud.MetadataProvider) *Metadata { + res := &Metadata{} + authMetadata, err := authMetadataManager.Write() + if err != nil { + logger.WithError(err).Debug("failed to collect account metadata") + } + for k, v := range authMetadata { + if k == auth.InstallationIDKeyName { + res.InstallationID = v + } + res.Entries = append(res.Entries, MetadataEntry{Name: k, Value: v}) + } + if cloudMetadataProvider != nil { + cloudMetadata := cloudMetadataProvider.GetMetadata() + for k, v := range cloudMetadata { + res.Entries = append(res.Entries, MetadataEntry{Name: k, Value: v}) + } + } + blockstoreType := c.GetBlockstoreType() + res.Entries = append(res.Entries, MetadataEntry{Name: BlockstoreTypeKey, Value: blockstoreType}) + return res +} + +func BuildMetadataProvider(logger logging.Logger, c *config.Config) cloud.MetadataProvider { + switch c.GetBlockstoreType() { + case gs.BlockstoreType: + return gcp.NewMetadataProvider(logger) + case s3a.BlockstoreType: + return aws.NewMetadataProvider(logger, c.GetAwsConfig()) + default: + return nil + } +}