Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iperf output types and use it instead of go-iperf #2089

Merged
merged 3 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmds/modules/noded/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/threefoldtech/zos/pkg/events"
"github.com/threefoldtech/zos/pkg/monitord"
"github.com/threefoldtech/zos/pkg/perf"
"github.com/threefoldtech/zos/pkg/perf/cpubench"
"github.com/threefoldtech/zos/pkg/perf/iperf"
"github.com/threefoldtech/zos/pkg/registrar"
"github.com/threefoldtech/zos/pkg/stubs"
"github.com/threefoldtech/zos/pkg/utils"
Expand Down Expand Up @@ -202,10 +204,9 @@ func action(cli *cli.Context) error {
return errors.Wrap(err, "failed to create a new perfMon")
}

iperfTest := perf.NewIperfTest()
perfMon.AddTask(&iperfTest)
perfMon.AddTask(iperf.NewTask())

cpuBenchmarkTask := perf.NewCPUBenchmarkTask()
cpuBenchmarkTask := cpubench.NewCPUBenchmarkTask()
perfMon.AddTask(&cpuBenchmarkTask)

if err = perfMon.Run(ctx); err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package perf
package cpubench

import (
"context"
"encoding/json"
"fmt"
"os/exec"

"github.com/threefoldtech/zos/pkg/perf"
"github.com/threefoldtech/zos/pkg/stubs"
)

Expand All @@ -28,7 +29,7 @@ type CPUBenchmarkResult struct {
Workloads int `json:"workloads"`
}

var _ Task = (*CPUBenchmarkTask)(nil)
var _ perf.Task = (*CPUBenchmarkTask)(nil)

// NewCPUBenchmarkTask returns a new CPU benchmark task.
func NewCPUBenchmarkTask() CPUBenchmarkTask {
Expand Down Expand Up @@ -59,7 +60,7 @@ func (c *CPUBenchmarkTask) Run(ctx context.Context) (interface{}, error) {
if err != nil {
return nil, fmt.Errorf("failed to parse cpubench output: %w", err)
}
client := GetZbusClient(ctx)
client := perf.GetZbusClient(ctx)
statistics := stubs.NewStatisticsStub(client)

workloads, err := statistics.Workloads(ctx)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package perf
package iperf

import (
"context"
Expand Down
83 changes: 52 additions & 31 deletions pkg/perf/iperf_task.go → pkg/perf/iperf/iperf_task.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package perf
package iperf

import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"

goIperf "github.com/BGrewell/go-iperf"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/threefoldtech/zos/pkg/environment"
"github.com/threefoldtech/zos/pkg/network/iperf"
"github.com/threefoldtech/zos/pkg/perf"
)

// IperfTest for iperf tcp/udp tests
Expand All @@ -20,18 +24,24 @@ type IperfTest struct {

// IperfResult for iperf test results
type IperfResult struct {
UploadSpeed float64 `json:"upload_speed"` // in bit/sec
DownloadSpeed float64 `json:"download_speed"` // in bit/sec
NodeID uint32 `json:"node_id"`
NodeIpv4 string `json:"node_ip"`
TestType string `json:"test_type"`
Error string `json:"error"`
CpuReport goIperf.CpuUtilizationReport `json:"cpu_report"`
UploadSpeed float64 `json:"upload_speed"` // in bit/sec
DownloadSpeed float64 `json:"download_speed"` // in bit/sec
NodeID uint32 `json:"node_id"`
NodeIpv4 string `json:"node_ip"`
TestType string `json:"test_type"`
Error string `json:"error"`
CpuReport CPUUtilizationPercent `json:"cpu_report"`
}

// NewIperfTest creates a new iperf test
func NewIperfTest() IperfTest {
return IperfTest{taskID: "iperf", schedule: "0 0 */6 * * *"}
// NewTask creates a new iperf test
func NewTask() perf.Task {
// because go-iperf left tmp directories with perf binary in it each time
// the task had run
matches, _ := filepath.Glob("/tmp/goiperf*")
for _, match := range matches {
os.RemoveAll(match)
}
return &IperfTest{taskID: "iperf", schedule: "0 0 */6 * * *"}
}

// ID returns the ID of the tcp task
Expand Down Expand Up @@ -104,35 +114,46 @@ func (t *IperfTest) Run(ctx context.Context) (interface{}, error) {
}

func (t *IperfTest) runIperfTest(ctx context.Context, clientIP string, tcp bool) IperfResult {
iperfClient := goIperf.NewClient(clientIP)
iperfClient.SetBandwidth("1M")
iperfClient.SetPort(iperf.IperfPort)
iperfClient.SetInterval(20)
iperfClient.SetJSON(true)
opts := make([]string, 0)
opts = append(opts,
"--client", clientIP,
"--bandwidth", "1M",
"--port", fmt.Sprint(iperf.IperfPort),
"--interval", "20",
"--json",
)

if !tcp {
iperfClient.SetLength("16B")
iperfClient.SetProto(goIperf.PROTO_UDP)
opts = append(opts, "--length", "16B", "--udp")
}

err := iperfClient.Start()
if err != nil {
log.Error().Err(err).Msgf("failed to start iperf client with ip '%s'", clientIP)
output, err := exec.CommandContext(ctx, "iperf", opts...).CombinedOutput()
exitErr := &exec.ExitError{}
if err != nil && !errors.As(err, &exitErr) {
log.Err(err).Msg("failed to run iperf")
return IperfResult{}
}

<-iperfClient.Done
var report iperfCommandOutput
if err := json.Unmarshal(output, &report); err != nil {
log.Err(err).Msg("failed to parse iperf output")
return IperfResult{}
}

proto := "tcp"
if !tcp {
proto = "udp"
}
iperfResult := IperfResult{
UploadSpeed: iperfClient.Report().End.SumSent.BitsPerSecond,
DownloadSpeed: iperfClient.Report().End.SumReceived.BitsPerSecond,
CpuReport: iperfClient.Report().End.CpuReport,
UploadSpeed: report.End.SumSent.BitsPerSecond,
DownloadSpeed: report.End.SumReceived.BitsPerSecond,
CpuReport: report.End.CPUUtilizationPercent,
NodeIpv4: clientIP,
TestType: string(iperfClient.Proto()),
Error: iperfClient.Report().Error,
TestType: proto,
Error: report.Error,
}

if !tcp && len(iperfClient.Report().End.Streams) > 0 {
iperfResult.DownloadSpeed = iperfClient.Report().End.Streams[0].Udp.BitsPerSecond
if !tcp && len(report.End.Streams) > 0 {
iperfResult.DownloadSpeed = report.End.Streams[0].UDP.BitsPerSecond
}

return iperfResult
Expand Down
134 changes: 134 additions & 0 deletions pkg/perf/iperf/iperf_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package iperf

type iperfCommandOutput struct {
Start Start `json:"start"`
Intervals []Interval `json:"intervals"`
End End `json:"end"`
Error string `json:"error"`
}

type End struct {
Streams []EndStream `json:"streams"`
SumSent Sum `json:"sum_sent"`
SumReceived Sum `json:"sum_received"`
CPUUtilizationPercent CPUUtilizationPercent `json:"cpu_utilization_percent"`
SenderTCPCongestion string `json:"sender_tcp_congestion"`
ReceiverTCPCongestion string `json:"receiver_tcp_congestion"`
}

type CPUUtilizationPercent struct {
HostTotal float64 `json:"host_total"`
HostUser float64 `json:"host_user"`
HostSystem float64 `json:"host_system"`
RemoteTotal float64 `json:"remote_total"`
RemoteUser float64 `json:"remote_user"`
RemoteSystem float64 `json:"remote_system"`
}

type EndStream struct {
Sender Sum `json:"sender"`
Receiver Sum `json:"receiver"`
UDP UDPSum `json:"udp"`
}

type UDPSum struct {
Socket int64 `json:"socket"`
Start float64 `json:"start"`
End float64 `json:"end"`
Seconds float64 `json:"seconds"`
Bytes int64 `json:"bytes"`
BitsPerSecond float64 `json:"bits_per_second"`
JitterMS float64 `json:"jitter_ms"`
LostPackets int64 `json:"lost_packets"`
Packets int64 `json:"packets"`
LostPercent float64 `json:"lost_percent"`
OutOfOrder int64 `json:"out_of_order"`
Sender bool `json:"sender"`
}

type Sum struct {
Socket int64 `json:"socket"`
Start float64 `json:"start"`
End float64 `json:"end"`
Seconds float64 `json:"seconds"`
Bytes int64 `json:"bytes"`
BitsPerSecond float64 `json:"bits_per_second"`
Retransmits int64 `json:"retransmits"`
MaxSndCwnd int64 `json:"max_snd_cwnd"`
MaxSndWnd int64 `json:"max_snd_wnd"`
MaxRtt int64 `json:"max_rtt"`
MinRtt int64 `json:"min_rtt"`
MeanRtt int64 `json:"mean_rtt"`
Sender bool `json:"sender"`
}

type Interval struct {
Streams []IntervalStream `json:"streams"`
Sum Sum `json:"sum"`
}

type IntervalStream struct {
Socket int64 `json:"socket"`
Start float64 `json:"start"`
End float64 `json:"end"`
Seconds float64 `json:"seconds"`
Bytes int64 `json:"bytes"`
BitsPerSecond float64 `json:"bits_per_second"`
Retransmits int64 `json:"retransmits"`
SndCwnd int64 `json:"snd_cwnd"`
SndWnd int64 `json:"snd_wnd"`
Rtt int64 `json:"rtt"`
Rttvar int64 `json:"rttvar"`
Pmtu int64 `json:"pmtu"`
Omitted bool `json:"omitted"`
Sender bool `json:"sender"`
}

type Start struct {
Connected []Connected `json:"connected"`
Version string `json:"version"`
SystemInfo string `json:"system_info"`
Timestamp Timestamp `json:"timestamp"`
ConnectingTo ConnectingTo `json:"connecting_to"`
Cookie string `json:"cookie"`
TCPMssDefault int64 `json:"tcp_mss_default"`
TargetBitrate int64 `json:"target_bitrate"`
FqRate int64 `json:"fq_rate"`
SockBufsize int64 `json:"sock_bufsize"`
SndbufActual int64 `json:"sndbuf_actual"`
RcvbufActual int64 `json:"rcvbuf_actual"`
TestStart TestStart `json:"test_start"`
}

type Connected struct {
Socket int64 `json:"socket"`
LocalHost string `json:"local_host"`
LocalPort int64 `json:"local_port"`
RemoteHost string `json:"remote_host"`
RemotePort int64 `json:"remote_port"`
}

type ConnectingTo struct {
Host string `json:"host"`
Port int64 `json:"port"`
}

type TestStart struct {
Protocol string `json:"protocol"`
NumStreams int64 `json:"num_streams"`
Blksize int64 `json:"blksize"`
Omit int64 `json:"omit"`
Duration int64 `json:"duration"`
Bytes int64 `json:"bytes"`
Blocks int64 `json:"blocks"`
Reverse int64 `json:"reverse"`
Tos int64 `json:"tos"`
TargetBitrate int64 `json:"target_bitrate"`
Bidir int64 `json:"bidir"`
Fqrate int64 `json:"fqrate"`
}

type Timestamp struct {
Time string `json:"time"`
Timesecs int64 `json:"timesecs"`
}
Loading