Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/heathcheck #718

Merged
merged 6 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions code/go/0chain.net/blobber/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func setupConfig(configDir string, deploymentMode int) {
config.Configuration.RMRedeemFreq = viper.GetInt64("readmarker_redeem.frequency")
config.Configuration.RMRedeemNumWorkers = viper.GetInt("readmarker_redeem.num_workers")

config.Configuration.HealthCheckWorkerFreq = viper.GetDuration("healthcheck.frequency")

config.Configuration.ChallengeResolveFreq = viper.GetInt64("challenge_response.frequency")
config.Configuration.ChallengeResolveNumWorkers = viper.GetInt("challenge_response.num_workers")
config.Configuration.ChallengeMaxRetires = viper.GetInt("challenge_response.max_retries")
Expand Down
81 changes: 53 additions & 28 deletions code/go/0chain.net/blobber/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,75 @@ import (
"go.uber.org/zap"
)

func setupWorkers() {
var root = common.GetRootContext()
handler.SetupWorkers(root)
challenge.SetupWorkers(root)
readmarker.SetupWorkers(root)
writemarker.SetupWorkers(root)
allocation.StartUpdateWorker(root, config.Configuration.UpdateAllocationsInterval)
func setupWorkers(ctx context.Context) {

handler.SetupWorkers(ctx)
challenge.SetupWorkers(ctx)
readmarker.SetupWorkers(ctx)
writemarker.SetupWorkers(ctx)
allocation.StartUpdateWorker(ctx, config.Configuration.UpdateAllocationsInterval)
if config.Configuration.AutomaticUpdate {
go StartUpdateWorker(root, config.Configuration.BlobberUpdateInterval)
go StartUpdateWorker(ctx, config.Configuration.BlobberUpdateInterval)
}
}

func refreshPriceOnChain() {
func refreshPriceOnChain(ctx context.Context) {
var REPEAT_DELAY = 60 * 60 * time.Duration(viper.GetInt("price_worker_in_hours")) // 12 hours with default settings
var err error
for {
time.Sleep(REPEAT_DELAY * time.Second)
if err := handler.RefreshPriceOnChain(common.GetRootContext()); err != nil {
logging.Logger.Error("refresh price on chain ", zap.Error(err))
select {
case <-ctx.Done():
break

case <-time.After(REPEAT_DELAY * time.Second):
err = handler.RefreshPriceOnChain(common.GetRootContext())
if err != nil {
logging.Logger.Error("refresh price on chain ", zap.Error(err))
}
}

}
}

func startHealthCheck() {
const REPEAT_DELAY = 60 * 15 // 15 minutes
var err error
func startHealthCheck(ctx context.Context) {
for {
err = handler.SendHealthCheck()
if err == nil {
logging.Logger.Info("success to send heartbeat")
} else {
logging.Logger.Warn("failed to send heartbeat", zap.Error(err))
select {
case <-ctx.Done():
break
case <-time.After(config.Configuration.HealthCheckWorkerFreq):
go func() {
start := time.Now()

txnHash, err := handler.SendHealthCheck()
end := time.Now()
if err == nil {
logging.Logger.Info("success to send heartbeat", zap.String("txn_hash", txnHash), zap.Time("start", start), zap.Time("end", end), zap.Duration("duration", end.Sub(start)))
} else {
logging.Logger.Warn("failed to send heartbeat", zap.String("txn_hash", txnHash), zap.Time("start", start), zap.Time("end", end), zap.Duration("duration", end.Sub(start)))
}
}()
}
<-time.After(REPEAT_DELAY * time.Second)
}
}

// startRefreshSettings sync settings from blockchain
func startRefreshSettings() {
func startRefreshSettings(ctx context.Context) {
const REPEAT_DELAY = 60 * 3 // 3 minutes
var err error
var b *zcncore.Blobber
for {
b, err = config.ReloadFromChain(common.GetRootContext(), datastore.GetStore().GetDB())
if err == nil {
select {
case <-ctx.Done():
break
case <-time.After(REPEAT_DELAY * time.Second):
b, err = config.ReloadFromChain(common.GetRootContext(), datastore.GetStore().GetDB())
if err != nil {
logging.Logger.Warn("failed to refresh blobber settings from chain", zap.Error(err))
continue
}

logging.Logger.Info("success to refresh blobber settings from chain")

// BaseURL is changed, register blobber to refresh it on blockchain again
if b.BaseURL != node.Self.GetURLBase() {
err = handler.UpdateBlobber(context.TODO())
Expand All @@ -74,11 +98,8 @@ func startRefreshSettings() {
logging.Logger.Warn("failed to refresh blobber BaseURL on chain", zap.Error(err))
}
}
} else {
logging.Logger.Warn("failed to refresh blobber settings from chain", zap.Error(err))
}

<-time.After(REPEAT_DELAY * time.Second)
}
}

Expand All @@ -98,7 +119,9 @@ func StartUpdateWorker(ctx context.Context, interval time.Duration) {
logging.Logger.Error("Error while getting capacity", zap.Error(err))
break
}
if uint64(config.Configuration.Capacity) != filestore.GetFileStore().GetCurrentDiskCapacity() {
capacity := filestore.GetFileStore().GetCurrentDiskCapacity()

if uint64(config.Configuration.Capacity) != capacity {

_, err = config.ReloadFromChain(common.GetRootContext(), datastore.GetStore().GetDB())

Expand All @@ -111,6 +134,8 @@ func StartUpdateWorker(ctx context.Context, interval time.Duration) {
if err != nil {
logging.Logger.Error("Error while updating blobber updates on chain", zap.Error(err))
}

config.Configuration.Capacity = int64(capacity)
}
}
}
Expand Down
49 changes: 26 additions & 23 deletions code/go/0chain.net/blobber/zcn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func registerOnChain() error {
//wait http & grpc startup, and go to setup on chain
fmt.Print("> connecting to chain \n")

const ATTEMPT_DELAY = 60 * 1
const ATTEMPT_DELAY = 30 //30s

// setup wallet
fmt.Print(" + connect to miners: ")
Expand All @@ -26,53 +26,56 @@ func registerOnChain() error {
return nil
}

if err := handler.WalletRegister(); err != nil {
fmt.Println(err.Error() + "\n")
panic(err)
var err error

err = handler.WalletRegister()
if err != nil {
return err
}
fmt.Print(" [OK]\n")

var err error
err = filestore.GetFileStore().CalculateCurrentDiskCapacity()
if err != nil {
return err
}

// setup blobber (add or update) on the blockchain (multiple attempts)
for i := 1; i <= 10; i++ {
if i == 1 {
fmt.Printf("\r + connect to sharders:")
} else {

for n := ATTEMPT_DELAY; n < 1; n-- {
for n := ATTEMPT_DELAY; n > 0; n-- {
if n == 1 {
fmt.Printf("\r + [%v/10]connect to sharders :", i)
fmt.Printf("\r + [%v/10]connect to sharders: ", i)
} else {
fmt.Printf("\r + [%v/10]connect to sharders %v:", i, n)
fmt.Printf("\r + [%v/10]connect to sharders: %.2vs", i, n)
}
time.Sleep(1 * time.Second)
}

}

err = filestore.GetFileStore().CalculateCurrentDiskCapacity()
if err != nil {
continue
<-time.After(1 * time.Second)
}
}

err = handler.RegisterBlobber(common.GetRootContext())
if err != nil {
continue
if err == nil {
break
}
}

break
if err != nil {
return err
}

fmt.Print(" [OK]\n")

if !isIntegrationTest {
go setupWorkers()
ctx := common.GetRootContext()
go setupWorkers(ctx)

go startHealthCheck()
go startRefreshSettings()
go startHealthCheck(ctx)
go startRefreshSettings(ctx)

if config.Configuration.PriceInUSD {
go refreshPriceOnChain()
go refreshPriceOnChain(ctx)
}
}

Expand Down
4 changes: 4 additions & 0 deletions code/go/0chain.net/blobbercore/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func SetupDefaultConfig() {
viper.SetDefault("challenge_response.num_workers", 5)
viper.SetDefault("challenge_response.max_retries", 10)

viper.SetDefault("healthcheck.frequency", "60s")

viper.SetDefault("capacity", -1)
viper.SetDefault("read_price", 0.0)
viper.SetDefault("write_price", 0.0)
Expand Down Expand Up @@ -107,6 +109,8 @@ type Config struct {
ColdStorageDeleteLocalCopy bool
ColdStorageDeleteCloudCopy bool

HealthCheckWorkerFreq time.Duration

MinioStart bool
MinioWorkerFreq int64
MinioUseSSL bool
Expand Down
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobbercore/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Settings struct {
MaxOfferDuration string `gorm:"column:max_offer_duration;size:30;default:'-1ns';not null"`
MaxStake int64 `gorm:"column:max_stake;not null;default:100"`
MinLockDemand float64 `gorm:"column:min_lock_demand;not null;default:0"`
MinStake int64 `gorm:"column:min_lock_demand;not null;default:1"`
MinStake int64 `gorm:"column:min_stake;not null;default:1"`
NumDelegates int `gorm:"column:num_delegates;not null;default:100"`
ReadPrice float64 `gorm:"column:read_price;not null;default:0"`
WritePrice float64 `gorm:"column:write_price;not null;default:0"`
Expand Down
3 changes: 2 additions & 1 deletion code/go/0chain.net/blobbercore/handler/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
"go.uber.org/zap"
Expand Down Expand Up @@ -42,7 +43,7 @@ func BlobberHealthCheck() (string, error) {
}

err = txn.ExecuteSmartContract(transaction.STORAGE_CONTRACT_ADDRESS,
transaction.BLOBBER_HEALTH_CHECK, "", 0)
transaction.BLOBBER_HEALTH_CHECK, common.Now(), 0)
if err != nil {
logging.Logger.Info("Failed to health check on the blockchain",
zap.String("err:", err.Error()))
Expand Down
17 changes: 9 additions & 8 deletions code/go/0chain.net/blobbercore/handler/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,13 @@ func RegisterBlobber(ctx context.Context) error {
return nil
}

return SendHealthCheck()
txnHash, err := SendHealthCheck()
if err != nil {
logging.Logger.Error("Failed to send healthcheck transaction", zap.String("txn_hash", txnHash))
return err
}

return nil
}

// UpdateBlobber update blobber
Expand Down Expand Up @@ -238,16 +243,12 @@ func WalletRegister() error {
}

// SendHealthCheck send heartbeat to blockchain
func SendHealthCheck() error {
func SendHealthCheck() (string, error) {
txnHash, err := BlobberHealthCheck()
if err != nil {
return err
return txnHash, err
}
_, err = TransactionVerify(txnHash)
if err != nil {
logging.Logger.Error("Failed to verify blobber health check", zap.Any("err", err), zap.String("txn.Hash", txnHash))
return err
}

return nil
return txnHash, err
}
6 changes: 5 additions & 1 deletion config/0chain_blobber.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "1.0"

logging:
level: "error"
level: "info"
console: true # printing log to console is only supported in development mode

info:
Expand Down Expand Up @@ -93,6 +93,10 @@ challenge_response:
frequency: 10
num_workers: 5
max_retries: 20

healthcheck:
frequency: 60s # send healthcheck to miners every 60 seconds

pg:
user: postgres
password: postgres
Expand Down