Skip to content

Commit

Permalink
count records, table sizes and log errors
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder committed Nov 1, 2020
1 parent 0a33fa2 commit 78f0f21
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 20 deletions.
13 changes: 6 additions & 7 deletions cmd/lakefs/cmd/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cmd

import (
"context"
"fmt"
"log"
"os"

"github.com/treeverse/lakefs/diagnostics"
Expand All @@ -17,26 +17,25 @@ var diagnosticsCmd = &cobra.Command{
Short: "Collect lakeFS diagnostics",
Run: func(cmd *cobra.Command, args []string) {
ctx := context.Background()
//conf := config.NewConfig()
output, _ := cmd.Flags().GetString("output")

dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams())
defer dbPool.Close()

c := diagnostics.NewCollector(dbPool)

f, err := os.Create(output)
if err != nil {
fmt.Printf("Failed to open file! %s\n", err)
log.Fatalf("Create %s failed - %s", output, err)
}
defer func() { _ = f.Close() }()

fmt.Println("Collecting...")
log.Printf("Collecting...")
err = c.Collect(ctx, f)
if err != nil {
fmt.Printf("Failed to collect data: %s\n", err)
log.Fatalf("Failed to collect data: %s", err)
}

fmt.Printf("Diagnostics collected into zip file: %s\n", output)
log.Printf("Diagnostics collected into %s", output)
},
}

Expand Down
106 changes: 93 additions & 13 deletions diagnostics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ type Collector struct {
db db.Database
}

const maxRecordsPerQueryCollect = 1000
const (
maxRecordsPerQueryCollect = 1000
csvFileExt = ".csv"
)

var ErrNoColumns = errors.New("no columns in table")
var ErrNoColumnsFound = errors.New("no columns found")

func NewCollector(adb db.Database) *Collector {
return &Collector{
Expand All @@ -30,34 +33,89 @@ func (c *Collector) Collect(ctx context.Context, w io.Writer) (err error) {
writer := zip.NewWriter(w)
defer func() { err = writer.Close() }()

err = c.writeTableContent(ctx, writer, "auth_installation_metadata")
var errs []error
contentFromTables := []string{
"auth_installation_metadata",
"schema_migrations",
}
for _, tbl := range contentFromTables {
err = c.writeTableContent(ctx, writer, tbl)
if err != nil {
errs = append(errs, fmt.Errorf("write table content for %s %w", tbl, err))
}
}

countFromTables := []string{
"catalog_branches",
"catalog_commits",
"catalog_entries",
"catalog_repositories",
"catalog_object_dedup",
"catalog_multipart_uploads",
"auth_users",
}
for _, tbl := range countFromTables {
err = c.writeTableCount(ctx, writer, tbl)
if err != nil {
errs = append(errs, fmt.Errorf("write table count for %s %w", tbl, err))
}
}

err = c.writeQueryContent(ctx, writer, "entries_per_branch", sq.
Select("branch_id", "COUNT(*)").From("catalog_entries").GroupBy("branch_id"))
if err != nil {
return err
errs = append(errs, fmt.Errorf("write query entries_per_branch %w", err))
}

err = c.writeRawQueryContent(ctx, writer, "table_sizes", `
SELECT *, pg_size_pretty(total_bytes) AS total
, pg_size_pretty(index_bytes) AS INDEX
, pg_size_pretty(toast_bytes) AS toast
, pg_size_pretty(table_bytes) AS TABLE
FROM (SELECT *, total_bytes-index_bytes-COALESCE(toast_bytes,0) AS table_bytes FROM (
SELECT c.oid,nspname AS table_schema, relname AS TABLE_NAME
, c.reltuples AS row_estimate
, pg_total_relation_size(c.oid) AS total_bytes
, pg_indexes_size(c.oid) AS index_bytes
, pg_total_relation_size(reltoastrelid) AS toast_bytes
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r') a) a`)
if err != nil {
errs = append(errs, fmt.Errorf("get table sizes %w", err))
}

// write all errors into log
if err := c.writeErrors(ctx, writer, errs); err != nil {
return err
}
return nil
}

func (c *Collector) writeQueryContent(ctx context.Context, writer *zip.Writer, name string, selectBuilder sq.SelectBuilder) error {
filename := name + ".csv"
w, err := writer.Create(filename)
if err != nil {
return fmt.Errorf("new file for table %s - %w", name, err)
}
csvWriter := csv.NewWriter(w)
defer csvWriter.Flush()

q := selectBuilder.Limit(maxRecordsPerQueryCollect)
query, args, err := q.ToSql()
if err != nil {
return fmt.Errorf("query build: %w", err)
}
return c.writeRawQueryContent(ctx, writer, name, query, args...)
}

func (c *Collector) writeRawQueryContent(ctx context.Context, writer *zip.Writer, name string, query string, args ...interface{}) error {
rows, err := c.db.Query(query, args...)
if err != nil {
return fmt.Errorf("execute query: %w", err)
}
defer rows.Close()

filename := name + csvFileExt
w, err := writer.Create(filename)
if err != nil {
return fmt.Errorf("new file for table %s - %w", name, err)
}
csvWriter := csv.NewWriter(w)
defer csvWriter.Flush()

first := true
for rows.Next() {
if err := ctx.Err(); err != nil {
Expand All @@ -67,7 +125,7 @@ func (c *Collector) writeQueryContent(ctx context.Context, writer *zip.Writer, n
first = false
descriptions := rows.FieldDescriptions()
if len(descriptions) == 0 {
return ErrNoColumns
return ErrNoColumnsFound
}
cols := make([]string, len(descriptions))
for i, fd := range descriptions {
Expand Down Expand Up @@ -96,3 +154,25 @@ func (c *Collector) writeTableContent(ctx context.Context, writer *zip.Writer, n
q := sq.Select("*").From(name)
return c.writeQueryContent(ctx, writer, name, q)
}

func (c *Collector) writeTableCount(ctx context.Context, writer *zip.Writer, name string) error {
q := sq.Select("COUNT(*)").From(name)
return c.writeQueryContent(ctx, writer, name+"_count", q)
}

func (c *Collector) writeErrors(ctx context.Context, writer *zip.Writer, errs []error) error {
if len(errs) == 0 {
return nil
}
w, err := writer.Create("errors.log")
if err != nil {
return err
}
for _, err := range errs {
if ctx.Err() != nil {
return ctx.Err()
}
_, _ = fmt.Fprintf(w, "%v\n", err)
}
return nil
}

0 comments on commit 78f0f21

Please sign in to comment.