Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic workers stats #148

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ LDFLAGS=-ldflags "-X github.com/noxiouz/stout/version.GitTag=${TAG} -X github.co


.DEFAULT: all
.PHONY: fmt vet test
.PHONY: fmt vet test gen_msgp

PKGS := $(shell go list ./... | grep -v ^github.com/noxiouz/stout/vendor/ | grep -v ^github.com/noxiouz/stout/version)

Expand All @@ -32,11 +32,14 @@ test:
cat profile.out >> coverage.txt; rm profile.out; \
fi done; \

build:
build: gen_msgp
@echo "+ $@"
go build ${LDFLAGS} -o ${NAME} github.com/noxiouz/stout/cmd/stout

build_travis_release:
build_travis_release: gen_msgp
@echo "+ $@"
env GOOS="linux" go build ${LDFLAGS} -o ${NAME} github.com/noxiouz/stout/cmd/stout
env GOOS="darwin" go build ${LDFLAGS} -o ${NAME}_osx github.com/noxiouz/stout/cmd/stout

gen_msgp:
@cd ./isolate; go generate; cd ..
14 changes: 12 additions & 2 deletions isolate/conn_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func (r *responseStream) close(ctx context.Context) {
}
}

func (r *responseStream) Write(ctx context.Context, num uint64, data []byte) error {
// Writes messagepacked payload `as is` as a packet of Cocaine
func (r *responseStream) WriteMessage(ctx context.Context, num uint64, packedPayload []byte) error {
r.Lock()
defer r.Unlock()

Expand All @@ -238,7 +239,7 @@ func (r *responseStream) Write(ctx context.Context, num uint64, data []byte) err
p = msgp.AppendUint64(p, num)

p = msgp.AppendArrayHeader(p, 1)
p = msgp.AppendStringFromBytes(p, data)
p = append(p, packedPayload...)

if _, err := r.wr.Write(p); err != nil {
log.G(r.ctx).WithError(err).Error("responseStream.Write")
Expand All @@ -247,6 +248,15 @@ func (r *responseStream) Write(ctx context.Context, num uint64, data []byte) err
return nil
}

// IMHO: should be named WriteString, really.
func (r *responseStream) Write(ctx context.Context, num uint64, str []byte) error {
p := msgpackBytePool.Get().([]byte)[:0]
defer msgpackBytePool.Put(p)

p = msgp.AppendStringFromBytes(p, str)
return r.WriteMessage(ctx, num, p)
}

func (r *responseStream) Error(ctx context.Context, num uint64, code [2]int, msg string) error {
r.Lock()
defer r.Unlock()
Expand Down
38 changes: 38 additions & 0 deletions isolate/container_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//go:generate msgp --tests=false
//msgp:ignore isolate.MarkedWorkerMetrics
package isolate

type (
NetStat struct {
RxBytes uint64 `msg:"rx_bytes"`
TxBytes uint64 `msg:"tx_bytes"`
}

WorkerMetrics struct {
UptimeSec uint64 `msg:"uptime"`
CpuUsageSec uint64 `msg:"cpu_usage"`

CpuLoad float64 `msg:"cpu_load"`

Mem uint64 `msg:"mem"`

// iface -> net stat
Net map[string]NetStat `msg:"net"`
}

MetricsResponse map[string]*WorkerMetrics

MarkedWorkerMetrics struct {
uuid string
m *WorkerMetrics
}
)

func NewWorkerMetrics() (c WorkerMetrics) {
c.Net = make(map[string]NetStat)
return
}

func NewMarkedMetrics(uuid string, cm *WorkerMetrics) MarkedWorkerMetrics {
return MarkedWorkerMetrics{uuid: uuid, m: cm}
}
5 changes: 5 additions & 0 deletions isolate/docker/box.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ func (b *Box) Inspect(ctx context.Context, workeruuid string) ([]byte, error) {
return []byte("{}"), nil
}

func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedWorkerMetrics) {
// Not implemented yet
return
}

// Spool spools an image with a tag latest
func (b *Box) Spool(ctx context.Context, name string, opts isolate.RawProfile) (err error) {
profile, err := decodeProfile(opts)
Expand Down
4 changes: 4 additions & 0 deletions isolate/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
codeOutputError
codeKillError
codeSpoolCancellationError
codeWorkerMetricsFailed
codeMarshallingError
)

var (
Expand All @@ -31,6 +33,8 @@ var (
errOutputError = [2]int{isolateErrCategory, codeOutputError}
errKillError = [2]int{isolateErrCategory, codeKillError}
errSpoolCancellationError = [2]int{isolateErrCategory, codeSpoolCancellationError}
errWorkerMetricsFailed = [2]int{isolateErrCategory, codeWorkerMetricsFailed}
errMarshallingError = [2]int{isolateErrCategory, codeMarshallingError}
errSpawnEAGAIN = [2]int{systemCategory, codeSpawnEAGAIN}
)

Expand Down
106 changes: 106 additions & 0 deletions isolate/initialdispatch.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package isolate

import (
"bytes"
"errors"
"fmt"
"reflect"
"strings"
"sync/atomic"
"syscall"
"time"

"golang.org/x/net/context"

Expand All @@ -24,13 +27,22 @@ const (
replySpawnWrite = 0
replySpawnError = 1
replySpawnClose = 2

workersMetrics = 2

replyMetricsOk = 0
replyMetricsError = 1
replyMetricsClose = 2
)

const expectedUuidsCount = 32

var (
// ErrInvalidArgsNum should be returned if number of arguments is wrong
ErrInvalidArgsNum = errors.New("invalid arguments number")
_onSpoolArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onSpool).NumIn())
_onSpawnArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onSpawn).NumIn())
_onMetricsArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onWorkersMetrics).NumIn())
)

func checkSize(num uint32, r *msgp.Reader) error {
Expand All @@ -46,6 +58,26 @@ func checkSize(num uint32, r *msgp.Reader) error {
return nil
}

func readStringsSlice(r *msgp.Reader) (uuids []string, err error) {
var sz uint32

sz, err = r.ReadArrayHeader()
if err != nil {
return uuids, err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return?

}

for i := uint32(0); i < sz; i++ {
var u string
if u, err = r.ReadString(); err == nil {
uuids = append(uuids, u)
} else {
return uuids, err
}
}

return
}

func readMapStrStr(r *msgp.Reader, mp map[string]string) (err error) {
var sz uint32
sz, err = r.ReadMapHeader()
Expand Down Expand Up @@ -84,6 +116,7 @@ func newInitialDispatch(ctx context.Context, stream ResponseStream) Dispatcher {

func (d *initialDispatch) Handle(id uint64, r *msgp.Reader) (Dispatcher, error) {
var err error

switch id {
case spool:
var rawProfile = newCocaineProfile()
Expand Down Expand Up @@ -155,6 +188,19 @@ func (d *initialDispatch) Handle(id uint64, r *msgp.Reader) (Dispatcher, error)
}

return d.onSpawn(rawProfile, name, executable, args, env)
case workersMetrics:
if err = checkSize(_onMetricsArgsNum, r); err != nil {
log.G(d.ctx).Errorf("wrong args count for slot %d", id)
return nil, err
}

var uuids []string
if uuids, err = readStringsSlice(r); err != nil {
log.G(d.ctx).Errorf("wrong workersMetrics request framing: %v", err)
return nil, err
}

return d.onWorkersMetrics(uuids)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are u sure that not
return d.onWorkersMetrics(uuids), nil
?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet he's sure)

default:
return nil, fmt.Errorf("unknown transition id: %d", id)
}
Expand Down Expand Up @@ -268,6 +314,66 @@ func (d *initialDispatch) onSpawn(opts *cocaineProfile, name, executable string,
return newSpawnDispatch(d.ctx, cancelSpawn, prCh, &flagKilled, d.stream), nil
}

func (d *initialDispatch) onWorkersMetrics(uuidsQuery []string) (Dispatcher, error) {

log.G(d.ctx).Debugf("onWorkersMetrics() Uuids query (len %d): %s", len(uuidsQuery), strings.Join(uuidsQuery, ", "))

startTime := time.Now()

sendMetricsFunc := func(metrics MetricsResponse) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to make as a standalone function or method?

var (
buf bytes.Buffer
err error
)

if d == nil {
log.G(d.ctx).Error("strange: dispatch is `nil`")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it happen? If no - maybe remove, so we can get a crash when logic is being somehow violated?

return
}

if err = msgp.Encode(&buf, &metrics); err != nil {
log.G(d.ctx).WithError(err).Errorf("unable to encode containers metrics response: %v", err)
d.stream.Error(d.ctx, replyMetricsError, errMarshallingError, err.Error())
}

if err = d.stream.WriteMessage(d.ctx, replyMetricsOk, buf.Bytes()); err != nil {
log.G(d.ctx).WithError(err).Errorf("unable to send containers metrics: %v", err)
d.stream.Error(d.ctx, replyMetricsError, errWorkerMetricsFailed, err.Error())
}

log.G(d.ctx).WithField("time", time.Since(startTime)).Debugf("Containers metrics have been sent to runtime, response length %d", len(metrics))
}

go func() {
//
// TODO:
// - reduce complexity
// DONE:
// - log execution time
//
boxes := getBoxes(d.ctx)
boxesSize := len(boxes)
metricsResponse := make(MetricsResponse, len(uuidsQuery))
queryResCh := make(chan []MarkedWorkerMetrics)

for _, b := range boxes {
go func(b Box) {
queryResCh <- b.QueryMetrics(uuidsQuery)
}(b)
}

for i := 0; i < boxesSize; i++ {
for _, m := range <- queryResCh {
metricsResponse[m.uuid] = m.m
}
}

sendMetricsFunc(metricsResponse)
}()

return nil, nil
}

type OutputCollector struct {
ctx context.Context

Expand Down
9 changes: 9 additions & 0 deletions isolate/isolate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ type (
Spawn(ctx context.Context, config SpawnConfig, output io.Writer) (Process, error)
Inspect(ctx context.Context, workerid string) ([]byte, error)
Close() error

QueryMetrics(uuids []string) []MarkedWorkerMetrics
}

ResponseStream interface {
Write(ctx context.Context, num uint64, data []byte) error
// packedPayload - MessagePacked data byte stream
WriteMessage(ctx context.Context, num uint64, packedPayload []byte) error
Error(ctx context.Context, num uint64, code [2]int, msg string) error
Close(ctx context.Context, num uint64) error
}
Expand Down Expand Up @@ -74,6 +78,11 @@ type (
Headers map[string]string `json:"headers,omitempty"`
} `json:"mtn,omitempty"`
}

MetricsPollConfig struct {
PollPeriod string `json:"period"`
Args json.RawMessage `json:"args"`
}
)

func (d *JSONEncodedDuration) UnmarshalJSON(b []byte) error {
Expand Down
Loading