Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjustments to error handling and stat command #16

Merged
merged 4 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
./hperf
dist
hperf
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ NOTE: Be careful not to re-use the ID's if you care about fetching results at a

```bash
# get test results
./hperf get --hosts 1.1.1.{1...100} --id [my_test_id]
./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id]
# save test results
./hperf stat --hosts 1.1.1.{1...100} --id [my_test_id] --output /tmp/file

# listen in on a running test
./hperf listen --hosts 1.1.1.{1...100} --id [my_test_id]
Expand Down
25 changes: 23 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net/http"
"os"
"runtime/debug"
"slices"
"strconv"
Expand Down Expand Up @@ -206,7 +207,7 @@ func handleWSConnection(ctx context.Context, c *shared.Config, host string, id i
case shared.ListTests:
go parseTestList(signal.TestList)
case shared.GetTest:
go receiveJSONDataPoint(signal.Data)
go receiveJSONDataPoint(signal.Data, c)
case shared.Err:
go PrintErrorString(signal.Error)
case shared.Done:
Expand All @@ -231,7 +232,7 @@ func PrintError(err error) {
fmt.Println(ErrorStyle.Render("ERROR: ", err.Error()))
}

func receiveJSONDataPoint(data []byte) {
func receiveJSONDataPoint(data []byte, c *shared.Config) {
responseLock.Lock()
defer responseLock.Unlock()

Expand Down Expand Up @@ -430,6 +431,26 @@ func GetTest(ctx context.Context, c shared.Config) (err error) {
}
})

if c.Output != "" {
f, err := os.Create(c.Output)
if err != nil {
return err
}
for i := range responseDPS {
outb, err := json.Marshal(responseDPS[i])
if err != nil {
PrintError(err)
continue
}
_, err = f.Write(append(outb, []byte{10}...))
if err != nil {
return err
}
}

return nil
}

printDataPointHeaders(responseDPS[0].Type)
for i := range responseDPS {
dp := responseDPS[i]
Expand Down
5 changes: 5 additions & 0 deletions cmd/hperf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ var (
Name: "id",
Usage: "specify custom ID per test",
}
outputFlag = cli.StringFlag{
Name: "output",
Usage: "set output file path/name",
}
saveTestFlag = cli.BoolTFlag{
Name: "save",
EnvVar: "HPERF_SAVE",
Expand Down Expand Up @@ -234,6 +238,7 @@ func parseConfig(ctx *cli.Context) (*shared.Config, error) {
TestID: ctx.String(testIDFlag.Name),
RestartOnError: ctx.BoolT(restartOnErrorFlag.Name),
Hosts: hosts,
Output: ctx.String(outputFlag.Name),
}

if ctx.String("id") == "" {
Expand Down
3 changes: 3 additions & 0 deletions cmd/hperf/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var statTestsCMD = cli.Command{
hostsFlag,
portFlag,
testIDFlag,
outputFlag,
},
CustomHelpTemplate: `NAME:
{{.HelpName}} - {{.Usage}}
Expand All @@ -44,6 +45,8 @@ FLAGS:
EXAMPLES:
1. Print stats by ID for hosts '10.10.10.1' and '10.10.10.2':
{{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id
2. Save stats by ID for hosts '10.10.10.1' and '10.10.10.2':
{{.Prompt}} {{.HelpName}} --hosts 10.10.10.1,10.10.10.2 --id my_test_id --output /tmp/output-file
`,
}

Expand Down
139 changes: 75 additions & 64 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"os"
"runtime"
"runtime/debug"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -38,6 +39,7 @@ import (

"github.com/gofiber/contrib/websocket"
"github.com/gofiber/fiber/v2"
"github.com/google/uuid"
"github.com/minio/hperf/shared"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem"
Expand Down Expand Up @@ -69,24 +71,31 @@ type test struct {

Readers []*netPerfReader
errors []shared.TError
errMap map[string]struct{}
errIndex atomic.Int32
DPS []shared.DP
M sync.Mutex

DataFile *os.File
DataFileIndex int
cons map[string]*websocket.Conn
}

func (t *test) AddError(err error) {
func (t *test) AddError(err error, id string) {
t.M.Lock()
defer t.M.Unlock()
if err == nil {
return
}
_, ok := t.errMap[id]
if ok {
return
}
if t.Config.Debug {
fmt.Println("ERR:", err)
}
t.M.Lock()
defer t.M.Unlock()
t.errors = append(t.errors, shared.TError{Error: err.Error(), Created: time.Now()})
t.errMap[id] = struct{}{}
}

func RunServer(ctx context.Context, address string, storagePath string) (err error) {
Expand Down Expand Up @@ -359,6 +368,8 @@ func newTest(c *shared.Config) (t *test, err error) {
defer testLock.Unlock()

t = new(test)
t.errMap = make(map[string]struct{})
t.cons = make(map[string]*websocket.Conn)
t.Started = time.Now()
t.Config = *c
t.DPS = make([]shared.DP, 0)
Expand Down Expand Up @@ -477,7 +488,7 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) {
go startPerformanceReader(test, test.Readers[i])
}

var paginator DataPointPaginator
listenToLiveTests(con, signal)
for {
if test.ctx.Err() != nil {
return
Expand All @@ -490,34 +501,24 @@ func createAndRunTest(con *websocket.Conn, signal shared.WebsocketSignal) {
if signal.Config.Debug {
fmt.Println("Duration: ", signal.Config.TestID, time.Since(start).Seconds())
}

generateDataPoints(test)
if con != nil {
_, paginator = sendDataResponseToWebsocket(con, test, paginator)
}
_ = sendAndSaveData(test)
}
}

func listenToLiveTests(con *websocket.Conn, s shared.WebsocketSignal) {
var paginator DataPointPaginator
var err error
paginator.After = time.Now()
for {
time.Sleep(1 * time.Second)
for i := range tests {
if time.Since(tests[i].Started).Seconds() > float64(tests[i].Config.Duration) {
continue
}
if s.Config.TestID != "" && tests[i].ID != s.Config.TestID {
continue
}
if s.Config.Debug {
fmt.Println("Listen:", tests[i].ID, "DPS:", len(tests[i].DPS), "ERR:", len(tests[i].errors))
}
err, paginator = sendDataResponseToWebsocket(con, tests[i], paginator)
if err != nil {
return
}
uid := uuid.NewString()

for i := range tests {
if s.Config.TestID != "" && tests[i].ID != s.Config.TestID {
continue
}
if s.Config.Debug {
fmt.Println("Listen:", tests[i].ID, "DPS:", len(tests[i].DPS), "ERR:", len(tests[i].errors))
}

tests[i].cons[uid] = con
}
}

Expand All @@ -527,45 +528,67 @@ type DataPointPaginator struct {
After time.Time
}

func sendDataResponseToWebsocket(con *websocket.Conn, t *test, lastPaginator DataPointPaginator) (err error, Paginator DataPointPaginator) {
func sendAndSaveData(t *test) (err error) {
defer func() {
r := recover()
if r != nil {
log.Println(r, string(debug.Stack()))
}
}()

wss := new(shared.WebsocketSignal)
wss.SType = shared.Stats
dataResponse := new(shared.DataReponseToClient)

if t.DataFile == nil && t.Config.Save {
newTestFile(t)
}

for i := range t.DPS {
if i <= lastPaginator.DPIndex {
continue
}
if !lastPaginator.After.IsZero() {
if t.DPS[i].Created.Before(lastPaginator.After) {
continue
dataResponse.DPS = append(dataResponse.DPS, t.DPS[i])
if t.Config.Save {
fileb, err := json.Marshal(t.DPS[i])
if err != nil {
t.AddError(err, "datapoint-marshaling")
}
t.DataFile.Write(append(fileb, []byte{10}...))
}
dataResponse.DPS = append(dataResponse.DPS, t.DPS[i])
Paginator.DPIndex = i
t.DPS = slices.Delete(t.DPS, i, i+1)
}

for i := range t.errors {
if i <= lastPaginator.ErrIndex {
continue
}
if !lastPaginator.After.IsZero() {
if t.errors[i].Created.Before(lastPaginator.After) {
continue
dataResponse.Errors = append(dataResponse.Errors, t.errors[i])
if t.Config.Save {
fileb, err := json.Marshal(t.errors[i])
if err != nil {
t.AddError(err, "error-marshaling")
}
t.DataFile.Write(append(fileb, []byte{10}...))
}
dataResponse.Errors = append(dataResponse.Errors, t.errors[i])
Paginator.ErrIndex = i
t.M.Lock()
t.errors = slices.Delete(t.errors, i, i+1)
t.M.Unlock()
}

t.M.Lock()
t.errMap = make(map[string]struct{})
t.M.Unlock()

wss.DataPoint = dataResponse
err = con.WriteJSON(wss)
if err != nil {
if t.Config.Debug {
fmt.Println("Unable to send data point:", err)
for i := range t.cons {
if t.cons[i] == nil {
continue
}

err = t.cons[i].WriteJSON(wss)
if err != nil {
if t.Config.Debug {
fmt.Println("Unable to send data point:", err)
}
t.cons[i].Close()
delete(t.cons, i)
continue
}
con.Close()
con = nil
}
return
}
Expand Down Expand Up @@ -609,18 +632,6 @@ func generateDataPoints(t *test) {
r.m.Unlock()

t.DPS = append(t.DPS, d)

if t.Config.Save {
fileb, err := json.Marshal(d)
if err != nil {
t.AddError(err)
}
if t.DataFile == nil {
newTestFile(t)
}
t.DataFile.Write(append(fileb, []byte{10}...))
}

}
return
}
Expand Down Expand Up @@ -727,7 +738,7 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) {
route = "/http"
body = AR
default:
t.AddError(fmt.Errorf("Unknown test type: %d", t.Config.TestType))
t.AddError(fmt.Errorf("Unknown test type: %d", t.Config.TestType), "unknown-signal")
}

req, err = http.NewRequestWithContext(
Expand All @@ -748,12 +759,12 @@ func sendRequestToHost(t *test, r *netPerfReader, cid int) {
if errors.Is(err, context.Canceled) {
return
}
t.AddError(err)
t.AddError(err, "network-error")
return
}

if resp.StatusCode != http.StatusOK {
t.AddError(fmt.Errorf("Status code was %d, expected 200 from host %s", resp.StatusCode, r.addr))
t.AddError(fmt.Errorf("Status code was %d, expected 200 from host %s", resp.StatusCode, r.addr), "invalid-status-code")
return
}

Expand Down
1 change: 1 addition & 0 deletions shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type Config struct {
Save bool `json:"Save"`
Insecure bool `json:"Insecure"`
TestType TestType `json:"TestType"`
Output string `json:"Output"`
// AllowLocalInterface bool `json:"AllowLocalInterfaces"`

// Client Only
Expand Down