Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] save found blocks and round effort across runs #95

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ mainnet.json

/build/_workspace/
/build/bin/
.idea
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,19 @@ Configuration is self-describing, just copy *config.example.json* to *config.jso
"port": 18081,
"timeout": "10s"
}
]
],

"statsDir": "stats"
}
```

You must use `anything.WorkerID` as username in your miner. Either disable address validation or use `<address>.WorkerID` as username. If there is no workerID specified your rig stats will be merged under `0` worker. If mining software contains dev fee rounds its stats will usually appear under `0` worker. This stratum acts like your own pool, the only exception is that you will get rewarded only after block found, shares only used for stats.

## Stats
Blocks found and round progress will be saved across runs in `$statsDir/stats.json` if you have enabled them by setting
`statsDir` in your `config.json`. You can also choose the interval (seconds) that stats are collected with by setting
`statsInterval`. The default is 5 seconds.

### Donations

**XMR**: `47v4BWeUPFrM9YkYRYk2pkS9CubAPEc7BJjNjg4FvF66Y2oVrTAaBjDZhmFzAXgqCNRvBH2gupQ2gNag2FkP983ZMptvUWG`
Expand Down
3 changes: 2 additions & 1 deletion config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@
"newrelicEnabled": false,
"newrelicName": "MyStratum",
"newrelicKey": "SECRET_KEY",
"newrelicVerbose": false
"newrelicVerbose": false,
"statsDir": "stats"
}
2 changes: 2 additions & 0 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Config struct {
NewrelicKey string `json:"newrelicKey"`
NewrelicVerbose bool `json:"newrelicVerbose"`
NewrelicEnabled bool `json:"newrelicEnabled"`
StatsDir string `json:"statsDir"`
StatsInterval int `json:"statsInterval"`
}

type Stratum struct {
Expand Down
119 changes: 42 additions & 77 deletions stratum/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,19 @@ package stratum

import (
"encoding/json"
"net/http"
"sync/atomic"
"time"

"fmt"
"github.com/sammy007/monero-stratum/rpc"
"github.com/sammy007/monero-stratum/util"
"log"
"net/http"
"sync/atomic"
)

func (s *StratumServer) StatsIndex(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusOK)

hashrate, hashrate24h, totalOnline, miners := s.collectMinersStats()
stats := map[string]interface{}{
"miners": miners,
"hashrate": hashrate,
"hashrate24h": hashrate24h,
"totalMiners": len(miners),
"totalOnline": totalOnline,
"timedOut": len(miners) - totalOnline,
"now": util.MakeTimestamp(),
}

var upstreams []interface{}
current := atomic.LoadInt32(&s.upstream)

for i, u := range s.upstreams {
upstream := convertUpstream(u)
upstream["current"] = current == int32(i)
upstreams = append(upstreams, upstream)
}
stats["upstreams"] = upstreams
stats["current"] = convertUpstream(s.rpc())
stats["luck"] = s.getLuckStats()
stats["blocks"] = s.getBlocksStats()

if t := s.currentBlockTemplate(); t != nil {
stats["height"] = t.height
stats["diff"] = t.diffInt64
roundShares := atomic.LoadInt64(&s.roundShares)
stats["variance"] = float64(roundShares) / float64(t.diffInt64)
stats["prevHash"] = t.prevHash[0:8]
stats["template"] = true
}
json.NewEncoder(w).Encode(stats)
json.NewEncoder(w).Encode(collectMinerStatsMap(s))
}

func convertUpstream(u *rpc.RPCClient) map[string]interface{} {
Expand All @@ -63,46 +31,6 @@ func convertUpstream(u *rpc.RPCClient) map[string]interface{} {
return upstream
}

func (s *StratumServer) collectMinersStats() (float64, float64, int, []interface{}) {
now := util.MakeTimestamp()
var result []interface{}
totalhashrate := float64(0)
totalhashrate24h := float64(0)
totalOnline := 0
window24h := 24 * time.Hour

for m := range s.miners.Iter() {
stats := make(map[string]interface{})
lastBeat := m.Val.getLastBeat()
hashrate := m.Val.hashrate(s.estimationWindow)
hashrate24h := m.Val.hashrate(window24h)
totalhashrate += hashrate
totalhashrate24h += hashrate24h
stats["name"] = m.Key
stats["hashrate"] = hashrate
stats["hashrate24h"] = hashrate24h
stats["lastBeat"] = lastBeat
stats["validShares"] = atomic.LoadInt64(&m.Val.validShares)
stats["staleShares"] = atomic.LoadInt64(&m.Val.staleShares)
stats["invalidShares"] = atomic.LoadInt64(&m.Val.invalidShares)
stats["accepts"] = atomic.LoadInt64(&m.Val.accepts)
stats["rejects"] = atomic.LoadInt64(&m.Val.rejects)
if !s.config.Frontend.HideIP {
stats["ip"] = m.Val.ip
}

if now-lastBeat > (int64(s.timeout/2) / 1000000) {
stats["warning"] = true
}
if now-lastBeat > (int64(s.timeout) / 1000000) {
stats["timeout"] = true
} else {
totalOnline++
}
result = append(result, stats)
}
return totalhashrate, totalhashrate24h, totalOnline, result
}

func (s *StratumServer) getLuckStats() map[string]interface{} {
now := util.MakeTimestamp()
Expand Down Expand Up @@ -164,3 +92,40 @@ func (s *StratumServer) getBlocksStats() []interface{} {
}
return result
}

func setBlockStats(s *StratumServer, importBlocksBlob interface{}) {
s.blocksMu.Lock()
defer s.blocksMu.Unlock()

if importBlocks, ok := importBlocksBlob.([]interface{}) ; ok {
// blocks are stored in JSON as array of hash but in the our struct as timestamp -> blockEntry
for _, element := range importBlocks {
if importBlock, ok := element.(map[string]interface{}) ; ok{
block := blockEntry{}
if d, ok := importBlock["height"].(json.Number); ok {
block.height, _ = d.Int64()
}
if d, ok := importBlock["hash"].(string); ok {
block.hash = d
}
if d, ok := importBlock["variance"].(json.Number); ok {
block.variance, _ = d.Float64()
}
if d, ok := importBlock["timestamp"].(json.Number); ok {
timestamp, _ := d.Int64()
s.blockStats[timestamp] = block
log.Printf("Imported block %d OK!", block.height)
} else {
log.Printf("Skipped importing a block... timestamp dectected as %T but should be int64! got value: '%s'", importBlock["timestamp"], importBlock["timestamp"])
}
}
}
} else {
log.Println("Unable to import any blocks... *ALL* of the JSON is invalid!", importBlocksBlob)
log.Println(fmt.Sprintf("detected type: %T", importBlocksBlob))

//[]interface {}

}

}
135 changes: 131 additions & 4 deletions stratum/stratum.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/sammy007/monero-stratum/pool"
"github.com/sammy007/monero-stratum/rpc"
"github.com/sammy007/monero-stratum/util"
"io"
"log"
"math/big"
"net"
"path"
"sync"
"sync/atomic"
"time"

"github.com/sammy007/monero-stratum/pool"
"github.com/sammy007/monero-stratum/rpc"
"github.com/sammy007/monero-stratum/util"
)

type StratumServer struct {
Expand Down Expand Up @@ -64,9 +64,34 @@ const (
MaxReqSize = 10 * 1024
)

const (
DefaultStatsInterval = 5
)

func NewStratum(cfg *pool.Config) *StratumServer {
stratum := &StratumServer{config: cfg, blockStats: make(map[int64]blockEntry)}

// flush stats periodically if configured
if cfg.StatsDir != "" {
go func() {
statsJson := path.Join(cfg.StatsDir, "stats.json")
importMinerStatsMap(stratum, statsJson)

// ...work out how often we should save statistics
statsInterval := cfg.StatsInterval
if statsInterval == 0 {
log.Printf("statsInterval not found in config file, defaulting to %d seconds", DefaultStatsInterval)
statsInterval = 5
}

// ...and start a periodic flush process
for {
time.Sleep(time.Duration(statsInterval) * time.Second)
util.SaveJson(statsJson, collectMinerStatsMap(stratum))
}
}()
}

stratum.upstreams = make([]*rpc.RPCClient, len(cfg.Upstream))
for i, v := range cfg.Upstream {
client, err := rpc.NewRPCClient(&v)
Expand Down Expand Up @@ -398,3 +423,105 @@ func (s *StratumServer) rpc() *rpc.RPCClient {
i := atomic.LoadInt32(&s.upstream)
return s.upstreams[i]
}

func (s *StratumServer) collectMinersStats() (float64, float64, int, []interface{}) {
now := util.MakeTimestamp()
var result []interface{}
totalhashrate := float64(0)
totalhashrate24h := float64(0)
totalOnline := 0
window24h := 24 * time.Hour

for m := range s.miners.Iter() {
stats := make(map[string]interface{})
lastBeat := m.Val.getLastBeat()
hashrate := m.Val.hashrate(s.estimationWindow)
hashrate24h := m.Val.hashrate(window24h)
totalhashrate += hashrate
totalhashrate24h += hashrate24h
stats["name"] = m.Key
stats["hashrate"] = hashrate
stats["hashrate24h"] = hashrate24h
stats["lastBeat"] = lastBeat
stats["validShares"] = atomic.LoadInt64(&m.Val.validShares)
stats["staleShares"] = atomic.LoadInt64(&m.Val.staleShares)
stats["invalidShares"] = atomic.LoadInt64(&m.Val.invalidShares)
stats["accepts"] = atomic.LoadInt64(&m.Val.accepts)
stats["rejects"] = atomic.LoadInt64(&m.Val.rejects)
if !s.config.Frontend.HideIP {
stats["ip"] = m.Val.ip
}

if now-lastBeat > (int64(s.timeout/2) / 1000000) {
stats["warning"] = true
}
if now-lastBeat > (int64(s.timeout) / 1000000) {
stats["timeout"] = true
} else {
totalOnline++
}
result = append(result, stats)
}
return totalhashrate, totalhashrate24h, totalOnline, result
}

func collectMinerStatsMap(s *StratumServer) map[string]interface{} {
hashrate, hashrate24h, totalOnline, miners := s.collectMinersStats()
stats := map[string]interface{}{
"miners": miners,
"hashrate": hashrate,
"hashrate24h": hashrate24h,
"totalMiners": len(miners),
"totalOnline": totalOnline,
"timedOut": len(miners) - totalOnline,
"now": util.MakeTimestamp(),
}

var upstreams []interface{}
current := atomic.LoadInt32(&s.upstream)

for i, u := range s.upstreams {
upstream := convertUpstream(u)
upstream["current"] = current == int32(i)
upstreams = append(upstreams, upstream)
}
stats["upstreams"] = upstreams
stats["current"] = convertUpstream(s.rpc())
stats["luck"] = s.getLuckStats()
stats["blocks"] = s.getBlocksStats()

if t := s.currentBlockTemplate(); t != nil {
stats["height"] = t.height
stats["diff"] = t.diffInt64
roundShares := atomic.LoadInt64(&s.roundShares)
stats["variance"] = float64(roundShares) / float64(t.diffInt64)
stats["prevHash"] = t.prevHash[0:8]
stats["template"] = true

// the overall effort towards the current block "variance" is an OUTPUT of the calculation of
// roundShares/networkDifficulty - therefore it makes no sense to save it in JSON as its a calculated output
// we must persist the value of roundShares instead, as an extra field
stats["roundShares"] = roundShares
}

return stats
}


func importMinerStatsMap(stratumServer *StratumServer, statsJson string) {
log.Println("importing previous statistics...")
if parsed := util.LoadJson(statsJson) ; parsed != nil {
if stats, ok := parsed.(map[string]interface{}); ok {
// mined blocks
setBlockStats(stratumServer, stats["blocks"])

// progress
if v, ok := stats["roundShares"].(json.Number); ok {
stratumServer.roundShares, _ = v.Int64()
}

} else {
log.Println("Parsed JSON from saved stats but its unreadable", parsed)
}
}
}
Loading