Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blobber stats #1305

Merged
merged 4 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions code/go/0chain.net/blobber/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func startHttpServer() {
}

r := mux.NewRouter()
initHandlers(r)
initHandlers(r, config.Development())

var wg sync.WaitGroup

Expand All @@ -36,7 +36,7 @@ func startHttpServer() {
// start https server
go startServer(&wg, r, mode, httpsPort, true)

logging.Logger.Info("Ready to listen to the requests")
logging.Logger.Info("Ready to listen to the requests with development mode: " + mode)
fmt.Print("> start http server [OK]\n")

wg.Wait()
Expand Down Expand Up @@ -104,12 +104,12 @@ func startServer(wg *sync.WaitGroup, r *mux.Router, mode string, port int, isTls
}
}

func initHandlers(r *mux.Router) {
func initHandlers(r *mux.Router, devMode bool) {
handler.StartTime = time.Now().UTC()
r.HandleFunc("/", handler.HomepageHandler)
handler.SetupHandlers(r)
handler.SetupSwagger()
common.SetAdminCredentials()
common.SetAdminCredentials(devMode)
}

func initProfHandlers(mux *http.ServeMux) {
Expand Down
2 changes: 2 additions & 0 deletions code/go/0chain.net/blobber/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/handler"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/stats"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"

Expand All @@ -26,6 +27,7 @@ func setupWorkers(ctx context.Context) {
allocation.SetupWorkers(ctx)
challenge.SetupChallengeCleanUpWorker(ctx)
challenge.SetupChallengeTimingsCleanupWorker(ctx)
stats.SetupStatsWorker(ctx)
updateCCTWorker(ctx)
}

Expand Down
18 changes: 7 additions & 11 deletions code/go/0chain.net/blobbercore/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,19 @@ func setupHandlers(r *mux.Router) {
// Allowing admin api for debugging purpose only. Later on commented out line should be
// uncommented and line below it should be deleted

// r.HandleFunc("/_debug", common.AuthenticateAdmin(common.ToJSONResponse(DumpGoRoutines)))
r.HandleFunc("/_debug", RateLimitByCommmitRL(common.ToJSONResponse(DumpGoRoutines)))
// r.HandleFunc("/_config", common.AuthenticateAdmin(common.ToJSONResponse(GetConfig)))
r.HandleFunc("/_config", RateLimitByCommmitRL(common.ToJSONResponse(GetConfig)))
r.HandleFunc("/_debug", common.AuthenticateAdmin(common.ToJSONResponse(DumpGoRoutines)))
// r.HandleFunc("/_debug", RateLimitByCommmitRL(common.ToJSONResponse(DumpGoRoutines)))
r.HandleFunc("/_config", common.AuthenticateAdmin(common.ToJSONResponse(GetConfig)))
// r.HandleFunc("/_config", RateLimitByCommmitRL(common.ToJSONResponse(GetConfig)))
// r.HandleFunc("/_stats", common.AuthenticateAdmin(StatsHandler))
r.HandleFunc("/_stats", RateLimitByCommmitRL(StatsHandler))

r.HandleFunc("/_logs", RateLimitByCommmitRL(common.ToJSONResponse(GetLogs)))

// r.HandleFunc("/_statsJSON", common.AuthenticateAdmin(common.ToJSONResponse(stats.StatsJSONHandler)))
r.HandleFunc("/_statsJSON", RateLimitByCommmitRL(common.ToJSONResponse(stats.StatsJSONHandler)))
// r.HandleFunc("/_cleanupdisk", common.AuthenticateAdmin(common.ToJSONResponse(WithReadOnlyConnection(CleanupDiskHandler))))
// r.HandleFunc("/_cleanupdisk", RateLimitByCommmitRL(common.ToJSONResponse(WithReadOnlyConnection(CleanupDiskHandler))))
// r.HandleFunc("/getstats", common.AuthenticateAdmin(common.ToJSONResponse(stats.GetStatsHandler)))
r.HandleFunc("/getstats", RateLimitByCommmitRL(common.ToJSONResponse(WithReadOnlyConnection(stats.GetStatsHandler))))
// r.HandleFunc("/challengetimings", common.AuthenticateAdmin(common.ToJSONResponse(GetChallengeTimings)))
r.HandleFunc("/challengetimings", RateLimitByCommmitRL(common.ToJSONResponse(GetChallengeTimings)))
r.HandleFunc("/challengetimings", common.AuthenticateAdmin(common.ToJSONResponse(GetChallengeTimings)))
// r.HandleFunc("/challengetimings", RateLimitByCommmitRL(common.ToJSONResponse(GetChallengeTimings)))
r.HandleFunc("/challenge-timings-by-challengeId", RateLimitByCommmitRL(common.ToJSONResponse(GetChallengeTiming)))

//marketplace related
Expand Down Expand Up @@ -745,7 +741,7 @@ func StatsHandler(w http.ResponseWriter, r *http.Request) {
writeResponse(w, []byte(err.Error()))
return
}

w.Header().Set("Content-Type", "application/json")
writeResponse(w, statsJson)

return
Expand Down
86 changes: 32 additions & 54 deletions code/go/0chain.net/blobbercore/stats/blobberstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,42 @@ type BlobberStats struct {
WriteMarkers WriteMarkersStat `json:"write_markers"`
}

var fs *BlobberStats

const statsHandlerPeriod = 30 * time.Minute

type AllocationId struct {
Id string `json:"id"`
}

func SetupStatsWorker(ctx context.Context) {
fs = &BlobberStats{}
go func() {
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
fs.loadBasicStats(ctx)
fs.loadDetailedStats(ctx)
fs.loadFailedChallengeList(ctx)
return common.NewError("rollback", "read_only")
})
for {
select {
case <-ctx.Done():
return
case <-time.After(statsHandlerPeriod):
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
fs.loadBasicStats(ctx)
fs.loadDetailedStats(ctx)
fs.loadFailedChallengeList(ctx)
return common.NewError("rollback", "read_only")
})
}
}
}()
}

func LoadBlobberStats(ctx context.Context) *BlobberStats {
fs := &BlobberStats{}
fs.loadBasicStats(ctx)
fs.loadDetailedStats(ctx)
fs.loadInfraStats(ctx)
fs.loadDBStats()
fs.loadFailedChallengeList(ctx)
return fs
}

Expand Down Expand Up @@ -211,14 +236,10 @@ func (bs *BlobberStats) loadStats(ctx context.Context) {
const sel = `
COALESCE (SUM (reference_objects.size), 0) AS files_size,
COALESCE (SUM (reference_objects.thumbnail_size), 0) AS thumbnails_size,
COALESCE (SUM (file_stats.num_of_block_downloads), 0) AS num_of_reads,
COALESCE (SUM (reference_objects.num_of_block_downloads), 0) AS num_of_reads,
COALESCE (SUM (reference_objects.num_of_blocks), 0) AS num_of_block_writes,
COUNT (*) AS num_of_writes`

const join = `
INNER JOIN file_stats ON reference_objects.id = file_stats.ref_id
WHERE reference_objects.type = 'f'`

var (
db = datastore.GetStore().GetTransaction(ctx)
row *sql.Row
Expand All @@ -227,7 +248,7 @@ func (bs *BlobberStats) loadStats(ctx context.Context) {

row = db.Table("reference_objects").
Select(sel).
Joins(join).
Where("reference_objects.type = 'f'").
Row()

err = row.Scan(&bs.FilesSize, &bs.ThumbnailsSize, &bs.NumReads,
Expand Down Expand Up @@ -269,13 +290,11 @@ func (bs *BlobberStats) loadAllocationStats(ctx context.Context) {
reference_objects.allocation_id,
SUM(reference_objects.size) as files_size,
SUM(reference_objects.thumbnail_size) as thumbnails_size,
SUM(file_stats.num_of_block_downloads) as num_of_reads,
SUM(reference_objects.num_of_block_downloads) as num_of_reads,
SUM(reference_objects.num_of_blocks) as num_of_block_writes,
COUNT(*) as num_of_writes,
allocations.blobber_size AS allocated_size,
allocations.expiration_date AS expiration_date`).
Joins(`INNER JOIN file_stats
ON reference_objects.id = file_stats.ref_id`).
Joins(`
INNER JOIN allocations
ON allocations.id = reference_objects.allocation_id`).
Expand Down Expand Up @@ -440,47 +459,6 @@ func (bs *BlobberStats) loadAllocationChallengeStats(ctx context.Context) {
}
}

func loadAllocationList(ctx context.Context) (interface{}, error) {
var (
allocations = make([]AllocationId, 0)
db = datastore.GetStore().GetTransaction(ctx)
rows *sql.Rows
err error
)

rows, err = db.Table("reference_objects").
Select("reference_objects.allocation_id").
Group("reference_objects.allocation_id").
Rows()

if err != nil {
Logger.Error("Error in getting the allocation list", zap.Error(err))
return nil, common.NewError("get_allocations_list_failed",
"Failed to get allocation list from DB")
}
defer rows.Close()

for rows.Next() {
var allocationId AllocationId
if err = rows.Scan(&allocationId.Id); err != nil {
Logger.Error("Error in scanning record for blobber allocations",
zap.Error(err))
return nil, common.NewError("get_allocations_list_failed",
"Failed to scan allocation from DB")
}
allocations = append(allocations, allocationId)
}

if err = rows.Err(); err != nil && err != sql.ErrNoRows {
Logger.Error("Error in scanning record for blobber allocations",
zap.Error(err))
return nil, common.NewError("get_allocations_list_failed",
"Failed to scan allocations from DB")
}

return allocations, nil
}

type ReadMarkerEntity struct {
ReadCounter int64 `gorm:"column:counter" json:"counter"`
LatestRedeemedRC int64 `gorm:"column:latest_redeemed_rc"`
Expand Down
24 changes: 0 additions & 24 deletions code/go/0chain.net/blobbercore/stats/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
. "github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/constants"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -485,26 +484,3 @@ func StatsJSONHandler(ctx context.Context, r *http.Request) (interface{}, error)
bs := LoadBlobberStats(ctx)
return bs, nil
}

func GetStatsHandler(ctx context.Context, r *http.Request) (interface{}, error) {
q := r.URL.Query()
ctx = context.WithValue(ctx, constants.ContextKeyAllocation, q.Get("allocation_id"))
allocationID := ctx.Value(constants.ContextKeyAllocation).(string)
bs := &BlobberStats{}
if allocationID != "" {
// TODO: Get only the allocation info from DB
bs.loadDetailedStats(ctx)
for _, allocStat := range bs.AllocationStats {
if allocStat.AllocationID == allocationID {
return allocStat, nil
}
}
return nil, common.NewError("allocation_stats_not_found", "Stats for allocation not found")
}
allocations := q.Get("allocations")
if allocations != "" {
return loadAllocationList(ctx)
}
bs.loadBasicStats(ctx)
return bs, nil
}
26 changes: 15 additions & 11 deletions code/go/0chain.net/core/common/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,29 @@ import (

// global username and password used to access endpoints only by admin
var gUsername, gPassword string
var isDevelopment bool

func SetAdminCredentials() {
func SetAdminCredentials(devMode bool) {
gUsername = viper.GetString("admin.username")
gPassword = viper.GetString("admin.password")
isDevelopment = devMode
}

func AuthenticateAdmin(handler ReqRespHandlerf) ReqRespHandlerf {
return func(w http.ResponseWriter, r *http.Request) {
uname, passwd, ok := r.BasicAuth()
if !ok {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("Admin only api")) // nolint
return
}
if !isDevelopment {
uname, passwd, ok := r.BasicAuth()
if !ok {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("Admin only api")) // nolint
return
}

if uname != gUsername || passwd != gPassword {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("Invalid username or password")) // nolint
return
if uname != gUsername || passwd != gPassword {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("Invalid username or password")) // nolint
return
}
}

handler(w, r)
Expand Down