Skip to content

Commit

Permalink
fix thread-safe zerolog usage, details rs/zerolog#242 and https://git…
Browse files Browse the repository at this point in the history
  • Loading branch information
Slach committed Jun 13, 2023
1 parent d0f5b19 commit 0562dd0
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 50 deletions.
16 changes: 10 additions & 6 deletions cmd/clickhouse-backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package main
import (
"context"
"fmt"
"github.com/rs/zerolog/diode"
stdlog "log"
"os"
"time"

"github.com/Altinity/clickhouse-backup/pkg/config"
"github.com/Altinity/clickhouse-backup/pkg/status"

"github.com/Altinity/clickhouse-backup/pkg/backup"
"github.com/Altinity/clickhouse-backup/pkg/config"
"github.com/Altinity/clickhouse-backup/pkg/server"

"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog/pkgerrors"
Expand All @@ -26,9 +25,14 @@ var (
)

func main() {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout, NoColor: true, TimeFormat: time.StampMilli})
stdlog.SetOutput(log.Logger)
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack
consoleWriter := zerolog.ConsoleWriter{Out: os.Stdout, NoColor: true, TimeFormat: "2006-01-02 15:04:05.000"}
diodeWriter := diode.NewWriter(consoleWriter, 4096, 10*time.Millisecond, func(missed int) {
fmt.Printf("Logger Dropped %d messages", missed)
})
log.Logger = zerolog.New(diodeWriter).With().Timestamp().Logger()
stdlog.SetOutput(log.Logger)
cliapp := cli.NewApp()
cliapp.Name = "clickhouse-backup"
cliapp.Usage = "Tool for easy backup of ClickHouse with cloud support"
Expand Down
2 changes: 2 additions & 0 deletions pkg/backup/backuper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/Altinity/clickhouse-backup/pkg/config"
"github.com/Altinity/clickhouse-backup/pkg/resumable"
"github.com/Altinity/clickhouse-backup/pkg/storage"
"github.com/rs/zerolog/log"
"path"
)

Expand All @@ -25,6 +26,7 @@ type Backuper struct {
func NewBackuper(cfg *config.Config) *Backuper {
ch := &clickhouse.ClickHouse{
Config: &cfg.ClickHouse,
Logger: log.With().Str("logger", "clickhouse").Logger(),
}
return &Backuper{
cfg: cfg,
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
logger.Error().Msgf("can't acquire semaphore during Download metadata: %v", err)
break
}
tableLogger := logger.With().Str("table_metadata", fmt.Sprintf("%s.%s", t.Database, t.Table)).Logger()
idx := i
tableTitle := t
metadataGroup.Go(func() error {
tableLogger := logger.With().Str("table_metadata", fmt.Sprintf("%s.%s", tableTitle.Database, tableTitle.Table)).Logger()
defer downloadSemaphore.Release(1)
downloadedMetadata, size, err := b.downloadTableMetadata(metadataCtx, backupName, disks, tableLogger, tableTitle, schemaOnly, partitions)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str
idx := i
uploadGroup.Go(func() error {
defer uploadSemaphore.Release(1)
uploadLogger := logger.With().
Str("table", fmt.Sprintf("%s.%s", tablesForUpload[idx].Database, tablesForUpload[idx].Table)).
Logger()
var uploadedBytes int64
if !schemaOnly {
var files map[string][]string
Expand All @@ -163,8 +166,7 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str
return err
}
atomic.AddInt64(&metadataSize, tableMetadataSize)
logger.Info().
Str("table", fmt.Sprintf("%s.%s", tablesForUpload[idx].Database, tablesForUpload[idx].Table)).
uploadLogger.Info().
Str("duration", utils.HumanizeDuration(time.Since(start))).
Str("size", utils.FormatBytes(uint64(uploadedBytes+tableMetadataSize))).
Msg("done")
Expand Down
72 changes: 36 additions & 36 deletions pkg/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"github.com/Altinity/clickhouse-backup/pkg/metadata"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/rs/zerolog/log"
)

// ClickHouse - provide
type ClickHouse struct {
Config *config.ClickHouseConfig
conn driver.Conn
disks []Disk
Logger zerolog.Logger
version int
isPartsColumnPresent int8
IsOpen bool
Expand All @@ -38,7 +38,7 @@ type ClickHouse struct {
func (ch *ClickHouse) Connect() error {
if ch.IsOpen {
if err := ch.conn.Close(); err != nil {
log.Error().Msgf("close previous connection error: %v", err)
ch.Logger.Error().Msgf("close previous connection error: %v", err)
}
}
ch.IsOpen = false
Expand All @@ -55,9 +55,9 @@ func (ch *ClickHouse) Connect() error {
Password: ch.Config.Password,
},
Settings: clickhouse.Settings{
"connect_timeout": int(timeout.Seconds()),
"receive_timeout": int(timeout.Seconds()),
"send_timeout": int(timeout.Seconds()),
// "connect_timeout": int(timeout.Seconds()),
// "receive_timeout": int(timeout.Seconds()),
// "send_timeout": int(timeout.Seconds()),
},
MaxOpenConns: 1,
ConnMaxLifetime: 0,
Expand All @@ -78,20 +78,20 @@ func (ch *ClickHouse) Connect() error {
if ch.Config.TLSCert != "" || ch.Config.TLSKey != "" {
cert, err := tls.LoadX509KeyPair(ch.Config.TLSCert, ch.Config.TLSKey)
if err != nil {
log.Error().Msgf("tls.LoadX509KeyPair error: %v", err)
ch.Logger.Error().Msgf("tls.LoadX509KeyPair error: %v", err)
return err
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
if ch.Config.TLSCa != "" {
caCert, err := os.ReadFile(ch.Config.TLSCa)
if err != nil {
log.Error().Msgf("read `tls_ca` file %s return error: %v ", ch.Config.TLSCa, err)
ch.Logger.Error().Msgf("read `tls_ca` file %s return error: %v ", ch.Config.TLSCa, err)
return err
}
caCertPool := x509.NewCertPool()
if caCertPool.AppendCertsFromPEM(caCert) != true {
log.Error().Msgf("AppendCertsFromPEM %s return false", ch.Config.TLSCa)
ch.Logger.Error().Msgf("AppendCertsFromPEM %s return false", ch.Config.TLSCa)
return fmt.Errorf("AppendCertsFromPEM %s return false", ch.Config.TLSCa)
}
tlsConfig.RootCAs = caCertPool
Expand All @@ -104,23 +104,23 @@ func (ch *ClickHouse) Connect() error {
}

if ch.conn, err = clickhouse.Open(opt); err != nil {
log.Error().Msgf("clickhouse connection: %s, sql.Open return error: %v", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port), err)
ch.Logger.Error().Msgf("clickhouse connection: %s, clickhouse.Open return error: %v", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port), err)
return err
}

logFunc := log.Info()
logFunc := ch.Logger.Info()
if !ch.Config.LogSQLQueries {
logFunc = log.Debug()
logFunc = ch.Logger.Debug()
}
logFunc.Msgf("clickhouse connection prepared: %s run ping", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port))
logFunc.Stack().Msgf("clickhouse connection prepared: %s run ping", fmt.Sprintf("tcp://%v:%v?timeout=%v", ch.Config.Host, ch.Config.Port, ch.Config.Timeout))
err = ch.conn.Ping(context.Background())
if err != nil {
log.Error().Msgf("clickhouse connection ping: %s return error: %v", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port), err)
ch.Logger.Error().Msgf("clickhouse connection ping: %s return error: %v", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port), err)
return err
} else {
ch.IsOpen = true
}
logFunc.Msgf("clickhouse connection open: %s", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port))
logFunc.Stack().Msgf("clickhouse connection open: %s", fmt.Sprintf("tcp://%v:%v", ch.Config.Host, ch.Config.Port))
return err
}

Expand Down Expand Up @@ -259,13 +259,13 @@ func (ch *ClickHouse) getDisksFromSystemDisks(ctx context.Context) ([]Disk, erro
func (ch *ClickHouse) Close() {
if ch.IsOpen {
if err := ch.conn.Close(); err != nil {
log.Warn().Msgf("can't close clickhouse connection: %v", err)
ch.Logger.Warn().Msgf("can't close clickhouse connection: %v", err)
}
}
if ch.Config.LogSQLQueries {
log.Info().Msg("clickhouse connection closed")
ch.Logger.Info().Msg("clickhouse connection closed")
} else {
log.Debug().Msg("clickhouse connection closed")
ch.Logger.Debug().Msg("clickhouse connection closed")
}
ch.IsOpen = false
}
Expand Down Expand Up @@ -461,7 +461,7 @@ func (ch *ClickHouse) GetDatabases(ctx context.Context, cfg *config.Config, tabl
var result string
// 19.4 doesn't have /var/lib/clickhouse/metadata/default.sql
if err := ch.SelectSingleRow(ctx, &result, showDatabaseSQL); err != nil {
log.Warn().Msgf("can't get create database query: %v", err)
ch.Logger.Warn().Msgf("can't get create database query: %v", err)
allDatabases[i].Query = fmt.Sprintf("CREATE DATABASE `%s` ENGINE = %s", db.Name, db.Engine)
} else {
// 23.3+ masked secrets https://github.com/Altinity/clickhouse-backup/issues/640
Expand All @@ -486,7 +486,7 @@ func (ch *ClickHouse) getTableSizeFromParts(ctx context.Context, table Table) ui
}
query := fmt.Sprintf("SELECT sum(bytes_on_disk) as size FROM system.parts WHERE active AND database='%s' AND table='%s' GROUP BY database, table", table.Database, table.Name)
if err := ch.SelectContext(ctx, &tablesSize, query); err != nil {
log.Warn().Msgf("error parsing tablesSize: %v", err)
ch.Logger.Warn().Msgf("error parsing tablesSize: %v", err)
}
if len(tablesSize) > 0 {
return tablesSize[0].Size
Expand Down Expand Up @@ -517,7 +517,7 @@ func (ch *ClickHouse) fixVariousVersions(ctx context.Context, t Table, metadataP
if strings.Contains(t.CreateTableQuery, "'[HIDDEN]'") {
tableSQLPath := path.Join(metadataPath, common.TablePathEncode(t.Database), common.TablePathEncode(t.Name)+".sql")
if attachSQL, err := os.ReadFile(tableSQLPath); err != nil {
log.Warn().Msgf("can't read %s: %v", tableSQLPath, err)
ch.Logger.Warn().Msgf("can't read %s: %v", tableSQLPath, err)
} else {
t.CreateTableQuery = strings.Replace(string(attachSQL), "ATTACH", "CREATE", 1)
t.CreateTableQuery = strings.Replace(t.CreateTableQuery, " _ ", " `"+t.Database+"`.`"+t.Name+"` ", 1)
Expand All @@ -536,7 +536,7 @@ func (ch *ClickHouse) GetVersion(ctx context.Context) (int, error) {
var err error
query := "SELECT value FROM `system`.`build_options` where name='VERSION_INTEGER'"
if err = ch.SelectSingleRow(ctx, &result, query); err != nil {
log.Warn().Msgf("can't get ClickHouse version: %v", err)
ch.Logger.Warn().Msgf("can't get ClickHouse version: %v", err)
return 0, nil
}
ch.version, err = strconv.Atoi(result)
Expand Down Expand Up @@ -567,7 +567,7 @@ func (ch *ClickHouse) FreezeTableOldWay(ctx context.Context, table *Table, name
withNameQuery = fmt.Sprintf("WITH NAME '%s'", name)
}
for _, item := range partitions {
log.Debug().Msgf(" partition '%v'", item.PartitionID)
ch.Logger.Debug().Msgf(" partition '%v'", item.PartitionID)
query := fmt.Sprintf(
"ALTER TABLE `%v`.`%v` FREEZE PARTITION ID '%v' %s;",
table.Database,
Expand All @@ -585,7 +585,7 @@ func (ch *ClickHouse) FreezeTableOldWay(ctx context.Context, table *Table, name
}
if err := ch.QueryContext(ctx, query); err != nil {
if (strings.Contains(err.Error(), "code: 60") || strings.Contains(err.Error(), "code: 81")) && ch.Config.IgnoreNotExistsErrorDuringFreeze {
log.Warn().Msgf("can't freeze partition: %v", err)
ch.Logger.Warn().Msgf("can't freeze partition: %v", err)
} else {
return fmt.Errorf("can't freeze partition '%s': %w", item.PartitionID, err)
}
Expand All @@ -604,9 +604,9 @@ func (ch *ClickHouse) FreezeTable(ctx context.Context, table *Table, name string
if strings.HasPrefix(table.Engine, "Replicated") && ch.Config.SyncReplicatedTables {
query := fmt.Sprintf("SYSTEM SYNC REPLICA `%s`.`%s`;", table.Database, table.Name)
if err := ch.QueryContext(ctx, query); err != nil {
log.Warn().Msgf("can't sync replica: %v", err)
ch.Logger.Warn().Msgf("can't sync replica: %v", err)
} else {
log.Debug().Str("table", fmt.Sprintf("%s.%s", table.Database, table.Name)).Msg("replica synced")
ch.Logger.Debug().Str("table", fmt.Sprintf("%s.%s", table.Database, table.Name)).Msg("replica synced")
}
}
if version < 19001005 || ch.Config.FreezeByPart {
Expand All @@ -619,7 +619,7 @@ func (ch *ClickHouse) FreezeTable(ctx context.Context, table *Table, name string
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` FREEZE %s;", table.Database, table.Name, withNameQuery)
if err := ch.QueryContext(ctx, query); err != nil {
if (strings.Contains(err.Error(), "code: 60") || strings.Contains(err.Error(), "code: 81")) && ch.Config.IgnoreNotExistsErrorDuringFreeze {
log.Warn().Msgf("can't freeze table: %v", err)
ch.Logger.Warn().Msgf("can't freeze table: %v", err)
return nil
}
return fmt.Errorf("can't freeze table: %v", err)
Expand All @@ -643,7 +643,7 @@ func (ch *ClickHouse) AttachDataParts(table metadata.TableMetadata, disks []Disk
if err := ch.Query(query); err != nil {
return err
}
log.Debug().Str("table", fmt.Sprintf("%s.%s", table.Database, table.Table)).Str("disk", disk.Name).Str("part", part.Name).Msg("attached")
ch.Logger.Debug().Str("table", fmt.Sprintf("%s.%s", table.Database, table.Table)).Str("disk", disk.Name).Str("part", part.Name).Msg("attached")
}
}
}
Expand All @@ -656,7 +656,7 @@ var uuidRE = regexp.MustCompile(`UUID '([^']+)'`)
// AttachTable - execute ATTACH TABLE command for specific table
func (ch *ClickHouse) AttachTable(ctx context.Context, table metadata.TableMetadata) error {
if len(table.Parts) == 0 {
log.Warn().Msgf("no data parts for restore for `%s`.`%s`", table.Database, table.Table)
ch.Logger.Warn().Msgf("no data parts for restore for `%s`.`%s`", table.Database, table.Table)
return nil
}
canContinue, err := ch.CheckReplicationInProgress(table)
Expand Down Expand Up @@ -703,7 +703,7 @@ func (ch *ClickHouse) AttachTable(ctx context.Context, table metadata.TableMetad
return err
}

log.Debug().Str("table", fmt.Sprintf("%s.%s", table.Database, table.Table)).Msg("attached")
ch.Logger.Debug().Str("table", fmt.Sprintf("%s.%s", table.Database, table.Table)).Msg("attached")
return nil
}
func (ch *ClickHouse) ShowCreateTable(ctx context.Context, database, name string) string {
Expand Down Expand Up @@ -834,7 +834,7 @@ func (ch *ClickHouse) CreateTable(table Table, query string, dropTable, ignoreDe
if onCluster != "" && distributedRE.MatchString(query) {
matches := distributedRE.FindAllStringSubmatch(query, -1)
if onCluster != strings.Trim(matches[0][2], "'\" ") {
log.Warn().Msgf("Will replace cluster ENGINE=Distributed %s -> %s", matches[0][2], onCluster)
ch.Logger.Warn().Msgf("Will replace cluster ENGINE=Distributed %s -> %s", matches[0][2], onCluster)
query = distributedRE.ReplaceAllString(query, fmt.Sprintf("${1}(%s,${3})", onCluster))
}
}
Expand All @@ -857,7 +857,7 @@ func (ch *ClickHouse) IsClickhouseShadow(path string) bool {
}
defer func() {
if err := d.Close(); err != nil {
log.Warn().Msgf("can't close directory %v", err)
ch.Logger.Warn().Msgf("can't close directory %v", err)
}
}()
names, err := d.Readdirnames(-1)
Expand Down Expand Up @@ -910,9 +910,9 @@ func (ch *ClickHouse) SelectSingleRowNoCtx(dest interface{}, query string, args
func (ch *ClickHouse) LogQuery(query string, args ...interface{}) string {
var logF *zerolog.Event
if !ch.Config.LogSQLQueries {
logF = log.Debug()
logF = ch.Logger.Debug()
} else {
logF = log.Info()
logF = ch.Logger.Info()
}
if len(args) > 0 {
logF.Msg(strings.NewReplacer("\n", " ", "\r", " ", "\t", " ").Replace(fmt.Sprintf("%s with args %v", query, args)))
Expand Down Expand Up @@ -1049,10 +1049,10 @@ func (ch *ClickHouse) CheckReplicationInProgress(table metadata.TableMetadata) (
return false, fmt.Errorf("invalid result for check exists replicas: %+v", existsReplicas)
}
if existsReplicas[0].InProgress > 0 {
log.Warn().Msgf("%s.%s skipped cause system.replicas entry already exists and replication in progress from another replica", table.Database, table.Table)
ch.Logger.Warn().Msgf("%s.%s skipped cause system.replicas entry already exists and replication in progress from another replica", table.Database, table.Table)
return false, nil
} else {
log.Info().Msgf("replication_in_progress status = %+v", existsReplicas)
ch.Logger.Info().Msgf("replication_in_progress status = %+v", existsReplicas)
}
}
return true, nil
Expand Down Expand Up @@ -1089,7 +1089,7 @@ func (ch *ClickHouse) CheckSystemPartsColumns(ctx context.Context, table *Table)
}
if len(isPartsColumnsInconsistent) > 0 {
for i := range isPartsColumnsInconsistent {
log.Error().Msgf("`%s`.`%s` have inconsistent data types %#v for \"%s\" column", table.Database, table.Name, isPartsColumnsInconsistent[i].Types, isPartsColumnsInconsistent[i].Column)
ch.Logger.Error().Msgf("`%s`.`%s` have inconsistent data types %#v for \"%s\" column", table.Database, table.Name, isPartsColumnsInconsistent[i].Types, isPartsColumnsInconsistent[i].Column)
}
return fmt.Errorf("`%s`.`%s` have inconsistent data types for active data part in system.parts_columns", table.Database, table.Name)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/resumable/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package resumable
import (
"encoding/json"
"fmt"
"github.com/rs/zerolog"
"os"
"path"
"strconv"
"strings"
"sync"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

Expand All @@ -27,7 +27,7 @@ func NewState(defaultDiskPath, backupName, command string, params map[string]int
stateFile: path.Join(defaultDiskPath, "backup", backupName, fmt.Sprintf("%s.state", command)),
currentState: "",
mx: &sync.RWMutex{},
logger: log.With().Str("logger", "resumable").Logger(),
logger: log.Logger.With().Str("logger", "resumable").Logger(),
}
fp, err := os.OpenFile(s.stateFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
Expand Down Expand Up @@ -104,6 +104,7 @@ func (s *State) IsAlreadyProcessed(path string) (bool, int64) {
s.mx.RLock()
res := strings.Index(s.currentState, path+":")
if res >= 0 {
// s.logger is non thread-safe https://github.com/rs/zerolog/issues/242
s.logger.Info().Msgf("%s already processed", path)
sSize := s.currentState[res : res+strings.Index(s.currentState[res:], "\n")]
sSize = sSize[strings.Index(sSize, ":")+1:]
Expand Down
Loading

0 comments on commit 0562dd0

Please sign in to comment.