diff --git a/isolate/conn_handler.go b/isolate/conn_handler.go index 1856637..642f5c9 100644 --- a/isolate/conn_handler.go +++ b/isolate/conn_handler.go @@ -220,7 +220,8 @@ func (r *responseStream) close(ctx context.Context) { } } -func (r *responseStream) Write(ctx context.Context, num uint64, data []byte) error { +// Writes messagepacked payload `as is` as a packet of Cocaine +func (r *responseStream) WriteMessage(ctx context.Context, num uint64, packedPayload []byte) error { r.Lock() defer r.Unlock() @@ -238,7 +239,7 @@ func (r *responseStream) Write(ctx context.Context, num uint64, data []byte) err p = msgp.AppendUint64(p, num) p = msgp.AppendArrayHeader(p, 1) - p = msgp.AppendStringFromBytes(p, data) + p = append(p, packedPayload...) if _, err := r.wr.Write(p); err != nil { log.G(r.ctx).WithError(err).Error("responseStream.Write") @@ -247,6 +248,15 @@ func (r *responseStream) Write(ctx context.Context, num uint64, data []byte) err return nil } +// IMHO: should be named WriteString, really. +func (r *responseStream) Write(ctx context.Context, num uint64, str []byte) error { + p := msgpackBytePool.Get().([]byte)[:0] + defer msgpackBytePool.Put(p) + + p = msgp.AppendStringFromBytes(p, str) + return r.WriteMessage(ctx, num, p) +} + func (r *responseStream) Error(ctx context.Context, num uint64, code [2]int, msg string) error { r.Lock() defer r.Unlock() diff --git a/isolate/container_metrics.go b/isolate/container_metrics.go new file mode 100644 index 0000000..636e399 --- /dev/null +++ b/isolate/container_metrics.go @@ -0,0 +1,38 @@ +//go:generate msgp --tests=false +//msgp:ignore isolate.MarkedWorkerMetrics +package isolate + +type ( + NetStat struct { + RxBytes uint64 `msg:"rx_bytes"` + TxBytes uint64 `msg:"tx_bytes"` + } + + WorkerMetrics struct { + UptimeSec uint64 `msg:"uptime"` + CpuUsageSec uint64 `msg:"cpu_usage"` + + CpuLoad float32 `msg:"cpu_load"` + + Mem uint64 `msg:"mem"` + + // iface -> net stat + Net map[string]NetStat `msg:"net"` + } + + MetricsResponse map[string]*WorkerMetrics + + MarkedWorkerMetrics struct { + uuid string + m *WorkerMetrics + } +) + +func NewWorkerMetrics() (c WorkerMetrics) { + c.Net = make(map[string]NetStat) + return +} + +func NewMarkedMetrics(uuid string, cm *WorkerMetrics) MarkedWorkerMetrics { + return MarkedWorkerMetrics{uuid: uuid, m: cm} +} diff --git a/isolate/container_metrics_gen.go b/isolate/container_metrics_gen.go new file mode 100644 index 0000000..4c6af20 --- /dev/null +++ b/isolate/container_metrics_gen.go @@ -0,0 +1,665 @@ +package isolate + +// NOTE: THIS FILE WAS PRODUCED BY THE +// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) +// DO NOT EDIT + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *MarkedWorkerMetrics) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + default: + err = dc.Skip() + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z MarkedWorkerMetrics) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 0 + err = en.Append(0x80) + if err != nil { + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z MarkedWorkerMetrics) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 0 + o = append(o, 0x80) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *MarkedWorkerMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z MarkedWorkerMetrics) Msgsize() (s int) { + s = 1 + return +} + +// DecodeMsg implements msgp.Decodable +func (z *MetricsResponse) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + return + } + if (*z) == nil && zb0003 > 0 { + (*z) = make(MetricsResponse, zb0003) + } else if len((*z)) > 0 { + for key, _ := range *z { + delete((*z), key) + } + } + for zb0003 > 0 { + zb0003-- + var zb0001 string + var zb0002 *WorkerMetrics + zb0001, err = dc.ReadString() + if err != nil { + return + } + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + return + } + zb0002 = nil + } else { + if zb0002 == nil { + zb0002 = new(WorkerMetrics) + } + err = zb0002.DecodeMsg(dc) + if err != nil { + return + } + } + (*z)[zb0001] = zb0002 + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z MetricsResponse) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteMapHeader(uint32(len(z))) + if err != nil { + return + } + for zb0004, zb0005 := range z { + err = en.WriteString(zb0004) + if err != nil { + return + } + if zb0005 == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = zb0005.EncodeMsg(en) + if err != nil { + return + } + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z MetricsResponse) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendMapHeader(o, uint32(len(z))) + for zb0004, zb0005 := range z { + o = msgp.AppendString(o, zb0004) + if zb0005 == nil { + o = msgp.AppendNil(o) + } else { + o, err = zb0005.MarshalMsg(o) + if err != nil { + return + } + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *MetricsResponse) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + if (*z) == nil && zb0003 > 0 { + (*z) = make(MetricsResponse, zb0003) + } else if len((*z)) > 0 { + for key, _ := range *z { + delete((*z), key) + } + } + for zb0003 > 0 { + var zb0001 string + var zb0002 *WorkerMetrics + zb0003-- + zb0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + zb0002 = nil + } else { + if zb0002 == nil { + zb0002 = new(WorkerMetrics) + } + bts, err = zb0002.UnmarshalMsg(bts) + if err != nil { + return + } + } + (*z)[zb0001] = zb0002 + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z MetricsResponse) Msgsize() (s int) { + s = msgp.MapHeaderSize + if z != nil { + for zb0004, zb0005 := range z { + _ = zb0005 + s += msgp.StringPrefixSize + len(zb0004) + if zb0005 == nil { + s += msgp.NilSize + } else { + s += zb0005.Msgsize() + } + } + } + return +} + +// DecodeMsg implements msgp.Decodable +func (z *NetStat) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "rx_bytes": + z.RxBytes, err = dc.ReadUint64() + if err != nil { + return + } + case "tx_bytes": + z.TxBytes, err = dc.ReadUint64() + if err != nil { + return + } + default: + err = dc.Skip() + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z NetStat) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "rx_bytes" + err = en.Append(0x82, 0xa8, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.RxBytes) + if err != nil { + return + } + // write "tx_bytes" + err = en.Append(0xa8, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.TxBytes) + if err != nil { + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z NetStat) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "rx_bytes" + o = append(o, 0x82, 0xa8, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, z.RxBytes) + // string "tx_bytes" + o = append(o, 0xa8, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, z.TxBytes) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *NetStat) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "rx_bytes": + z.RxBytes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "tx_bytes": + z.TxBytes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z NetStat) Msgsize() (s int) { + s = 1 + 9 + msgp.Uint64Size + 9 + msgp.Uint64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *WorkerMetrics) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "uptime": + z.UptimeSec, err = dc.ReadUint64() + if err != nil { + return + } + case "cpu_usage": + z.CpuUsageSec, err = dc.ReadUint64() + if err != nil { + return + } + case "cpu_load": + z.CpuLoad, err = dc.ReadFloat32() + if err != nil { + return + } + case "mem": + z.Mem, err = dc.ReadUint64() + if err != nil { + return + } + case "net": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + return + } + if z.Net == nil && zb0002 > 0 { + z.Net = make(map[string]NetStat, zb0002) + } else if len(z.Net) > 0 { + for key, _ := range z.Net { + delete(z.Net, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 NetStat + za0001, err = dc.ReadString() + if err != nil { + return + } + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + return + } + for zb0003 > 0 { + zb0003-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "rx_bytes": + za0002.RxBytes, err = dc.ReadUint64() + if err != nil { + return + } + case "tx_bytes": + za0002.TxBytes, err = dc.ReadUint64() + if err != nil { + return + } + default: + err = dc.Skip() + if err != nil { + return + } + } + } + z.Net[za0001] = za0002 + } + default: + err = dc.Skip() + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *WorkerMetrics) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 5 + // write "uptime" + err = en.Append(0x85, 0xa6, 0x75, 0x70, 0x74, 0x69, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.UptimeSec) + if err != nil { + return + } + // write "cpu_usage" + err = en.Append(0xa9, 0x63, 0x70, 0x75, 0x5f, 0x75, 0x73, 0x61, 0x67, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.CpuUsageSec) + if err != nil { + return + } + // write "cpu_load" + err = en.Append(0xa8, 0x63, 0x70, 0x75, 0x5f, 0x6c, 0x6f, 0x61, 0x64) + if err != nil { + return + } + err = en.WriteFloat32(z.CpuLoad) + if err != nil { + return + } + // write "mem" + err = en.Append(0xa3, 0x6d, 0x65, 0x6d) + if err != nil { + return + } + err = en.WriteUint64(z.Mem) + if err != nil { + return + } + // write "net" + err = en.Append(0xa3, 0x6e, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Net))) + if err != nil { + return + } + for za0001, za0002 := range z.Net { + err = en.WriteString(za0001) + if err != nil { + return + } + // map header, size 2 + // write "rx_bytes" + err = en.Append(0x82, 0xa8, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(za0002.RxBytes) + if err != nil { + return + } + // write "tx_bytes" + err = en.Append(0xa8, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(za0002.TxBytes) + if err != nil { + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *WorkerMetrics) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 5 + // string "uptime" + o = append(o, 0x85, 0xa6, 0x75, 0x70, 0x74, 0x69, 0x6d, 0x65) + o = msgp.AppendUint64(o, z.UptimeSec) + // string "cpu_usage" + o = append(o, 0xa9, 0x63, 0x70, 0x75, 0x5f, 0x75, 0x73, 0x61, 0x67, 0x65) + o = msgp.AppendUint64(o, z.CpuUsageSec) + // string "cpu_load" + o = append(o, 0xa8, 0x63, 0x70, 0x75, 0x5f, 0x6c, 0x6f, 0x61, 0x64) + o = msgp.AppendFloat32(o, z.CpuLoad) + // string "mem" + o = append(o, 0xa3, 0x6d, 0x65, 0x6d) + o = msgp.AppendUint64(o, z.Mem) + // string "net" + o = append(o, 0xa3, 0x6e, 0x65, 0x74) + o = msgp.AppendMapHeader(o, uint32(len(z.Net))) + for za0001, za0002 := range z.Net { + o = msgp.AppendString(o, za0001) + // map header, size 2 + // string "rx_bytes" + o = append(o, 0x82, 0xa8, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, za0002.RxBytes) + // string "tx_bytes" + o = append(o, 0xa8, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, za0002.TxBytes) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *WorkerMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "uptime": + z.UptimeSec, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "cpu_usage": + z.CpuUsageSec, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "cpu_load": + z.CpuLoad, bts, err = msgp.ReadFloat32Bytes(bts) + if err != nil { + return + } + case "mem": + z.Mem, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "net": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + if z.Net == nil && zb0002 > 0 { + z.Net = make(map[string]NetStat, zb0002) + } else if len(z.Net) > 0 { + for key, _ := range z.Net { + delete(z.Net, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 NetStat + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for zb0003 > 0 { + zb0003-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "rx_bytes": + za0002.RxBytes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "tx_bytes": + za0002.TxBytes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + z.Net[za0001] = za0002 + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *WorkerMetrics) Msgsize() (s int) { + s = 1 + 7 + msgp.Uint64Size + 10 + msgp.Uint64Size + 9 + msgp.Float32Size + 4 + msgp.Uint64Size + 4 + msgp.MapHeaderSize + if z.Net != nil { + for za0001, za0002 := range z.Net { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + 1 + 9 + msgp.Uint64Size + 9 + msgp.Uint64Size + } + } + return +} diff --git a/isolate/d_test.go b/isolate/d_test.go index 66a15e5..4f7ce0f 100644 --- a/isolate/d_test.go +++ b/isolate/d_test.go @@ -36,6 +36,10 @@ func (t *testDownstream) Write(ctx context.Context, code uint64, data []byte) er return nil } +func (t *testDownstream) WriteMessage(ctx context.Context, num uint64, packedPayload []byte) error { + return nil +} + func (t *testDownstream) Error(ctx context.Context, code uint64, errorcode [2]int, msg string) error { t.ch <- testDownstreamItem{code, []interface{}{errorcode, msg}} return nil @@ -72,6 +76,10 @@ func (b *testBox) Close() error { return nil } +func (b *testBox) QueryMetrics(uuids []string) (response []MarkedWorkerMetrics) { + return +} + type testProcess struct { ctx context.Context killed chan struct{} diff --git a/isolate/docker/box.go b/isolate/docker/box.go index 9bdc702..9fd94b7 100644 --- a/isolate/docker/box.go +++ b/isolate/docker/box.go @@ -254,6 +254,11 @@ func (b *Box) Inspect(ctx context.Context, workeruuid string) ([]byte, error) { return []byte("{}"), nil } +func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedWorkerMetrics) { + // Not implemented yet + return +} + // Spool spools an image with a tag latest func (b *Box) Spool(ctx context.Context, name string, opts isolate.RawProfile) (err error) { profile, err := decodeProfile(opts) diff --git a/isolate/errors.go b/isolate/errors.go index 8d65061..ea94752 100644 --- a/isolate/errors.go +++ b/isolate/errors.go @@ -20,6 +20,8 @@ const ( codeOutputError codeKillError codeSpoolCancellationError + codeWorkerMetricsFailed + codeMarshallingError ) var ( @@ -31,6 +33,8 @@ var ( errOutputError = [2]int{isolateErrCategory, codeOutputError} errKillError = [2]int{isolateErrCategory, codeKillError} errSpoolCancellationError = [2]int{isolateErrCategory, codeSpoolCancellationError} + errWorkerMetricsFailed = [2]int{isolateErrCategory, codeWorkerMetricsFailed} + errMarshallingError = [2]int{isolateErrCategory, codeMarshallingError} errSpawnEAGAIN = [2]int{systemCategory, codeSpawnEAGAIN} ) diff --git a/isolate/initialdispatch.go b/isolate/initialdispatch.go index 5510e37..afb9bd0 100644 --- a/isolate/initialdispatch.go +++ b/isolate/initialdispatch.go @@ -1,11 +1,14 @@ package isolate import ( + "bytes" "errors" "fmt" "reflect" + "strings" "sync/atomic" "syscall" + "time" "golang.org/x/net/context" @@ -24,13 +27,22 @@ const ( replySpawnWrite = 0 replySpawnError = 1 replySpawnClose = 2 + + workersMetrics = 2 + + replyMetricsOk = 0 + replyMetricsError = 1 + replyMetricsClose = 2 ) +const expectedUuidsCount = 32 + var ( // ErrInvalidArgsNum should be returned if number of arguments is wrong ErrInvalidArgsNum = errors.New("invalid arguments number") _onSpoolArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onSpool).NumIn()) _onSpawnArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onSpawn).NumIn()) + _onMetricsArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onWorkersMetrics).NumIn()) ) func checkSize(num uint32, r *msgp.Reader) error { @@ -46,6 +58,26 @@ func checkSize(num uint32, r *msgp.Reader) error { return nil } +func readStringsSlice(r *msgp.Reader) (uuids []string, err error) { + var sz uint32 + + sz, err = r.ReadArrayHeader() + if err != nil { + return + } + + for i := uint32(0); i < sz; i++ { + var u string + if u, err = r.ReadString(); err == nil { + uuids = append(uuids, u) + } else { + return + } + } + + return +} + func readMapStrStr(r *msgp.Reader, mp map[string]string) (err error) { var sz uint32 sz, err = r.ReadMapHeader() @@ -84,6 +116,7 @@ func newInitialDispatch(ctx context.Context, stream ResponseStream) Dispatcher { func (d *initialDispatch) Handle(id uint64, r *msgp.Reader) (Dispatcher, error) { var err error + switch id { case spool: var rawProfile = newCocaineProfile() @@ -155,6 +188,19 @@ func (d *initialDispatch) Handle(id uint64, r *msgp.Reader) (Dispatcher, error) } return d.onSpawn(rawProfile, name, executable, args, env) + case workersMetrics: + if err = checkSize(_onMetricsArgsNum, r); err != nil { + log.G(d.ctx).Errorf("wrong args count for slot %d", id) + return nil, err + } + + var uuids []string + if uuids, err = readStringsSlice(r); err != nil { + log.G(d.ctx).Errorf("wrong workersMetrics request framing: %v", err) + return nil, err + } + + return d.onWorkersMetrics(uuids) default: return nil, fmt.Errorf("unknown transition id: %d", id) } @@ -268,6 +314,61 @@ func (d *initialDispatch) onSpawn(opts *cocaineProfile, name, executable string, return newSpawnDispatch(d.ctx, cancelSpawn, prCh, &flagKilled, d.stream), nil } +func (d *initialDispatch) onWorkersMetrics(uuidsQuery []string) (Dispatcher, error) { + + log.G(d.ctx).Debugf("onWorkersMetrics() Uuids query (len %d): %s", len(uuidsQuery), strings.Join(uuidsQuery, ", ")) + + startTime := time.Now() + + sendMetricsFunc := func(metrics MetricsResponse) { + var ( + buf bytes.Buffer + err error + ) + + if err = msgp.Encode(&buf, &metrics); err != nil { + log.G(d.ctx).WithError(err).Errorf("unable to encode containers metrics response: %v", err) + d.stream.Error(d.ctx, replyMetricsError, errMarshallingError, err.Error()) + } + + if err = d.stream.WriteMessage(d.ctx, replyMetricsOk, buf.Bytes()); err != nil { + log.G(d.ctx).WithError(err).Errorf("unable to send containers metrics: %v", err) + d.stream.Error(d.ctx, replyMetricsError, errWorkerMetricsFailed, err.Error()) + } + + log.G(d.ctx).WithField("time", time.Since(startTime)).Debugf("Containers metrics have been sent to runtime, response length %d", len(metrics)) + } + + go func() { + // + // TODO: + // - reduce complexity + // DONE: + // - log execution time + // + boxes := getBoxes(d.ctx) + boxesSize := len(boxes) + metricsResponse := make(MetricsResponse, len(uuidsQuery)) + queryResCh := make(chan []MarkedWorkerMetrics) + + for _, b := range boxes { + go func(b Box) { + queryResCh <- b.QueryMetrics(uuidsQuery) + }(b) + } + + for i := 0; i < boxesSize; i++ { + for _, m := range <-queryResCh { + metricsResponse[m.uuid] = m.m + } + } + + sendMetricsFunc(metricsResponse) + }() + + return nil, nil +} + type OutputCollector struct { ctx context.Context diff --git a/isolate/isolate.go b/isolate/isolate.go index c7b7c7e..2bc13b1 100644 --- a/isolate/isolate.go +++ b/isolate/isolate.go @@ -27,10 +27,14 @@ type ( Spawn(ctx context.Context, config SpawnConfig, output io.Writer) (Process, error) Inspect(ctx context.Context, workerid string) ([]byte, error) Close() error + + QueryMetrics(uuids []string) []MarkedWorkerMetrics } ResponseStream interface { Write(ctx context.Context, num uint64, data []byte) error + // packedPayload - MessagePacked data byte stream + WriteMessage(ctx context.Context, num uint64, packedPayload []byte) error Error(ctx context.Context, num uint64, code [2]int, msg string) error Close(ctx context.Context, num uint64) error } @@ -74,6 +78,11 @@ type ( Headers map[string]string `json:"headers,omitempty"` } `json:"mtn,omitempty"` } + + MetricsPollConfig struct { + PollPeriod string `json:"period"` + Args json.RawMessage `json:"args"` + } ) func (d *JSONEncodedDuration) UnmarshalJSON(b []byte) error { diff --git a/isolate/porto/box.go b/isolate/porto/box.go index ce5b40b..e799ac9 100644 --- a/isolate/porto/box.go +++ b/isolate/porto/box.go @@ -33,6 +33,8 @@ import ( portorpc "github.com/yandex/porto/src/api/go/rpc" ) +const expectedContainersCount = 1000 + type portoBoxConfig struct { // Directory where volumes per app are placed Layers string `json:"layers"` @@ -49,6 +51,8 @@ type portoBoxConfig struct { WeakEnabled bool `json:"weakenabled"` DefaultUlimits string `json:"defaultulimits"` VolumeBackend string `json:"volumebackend"` + + WorkersMetrics isolate.MetricsPollConfig `json:"workersmetrics"` } func (c *portoBoxConfig) String() string { @@ -80,6 +84,10 @@ type Box struct { onClose context.CancelFunc containerPropertiesAndData []string + + // mappig uuid -> metrics + muMetrics sync.Mutex + containersMetrics map[string]*isolate.WorkerMetrics } const defaultVolumeBackend = "overlay" @@ -93,6 +101,8 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta CleanupEnabled: true, WeakEnabled: false, + + WorkersMetrics: isolate.MetricsPollConfig{}, } decoderConfig := mapstructure.DecoderConfig{ WeaklyTypedInput: true, @@ -183,6 +193,8 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta rootPrefix: rootPrefix, blobRepo: blobRepo, + + containersMetrics: make(map[string]*isolate.WorkerMetrics), } body, err := json.Marshal(config) @@ -205,8 +217,11 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta journalContent.Set(box.journal.String()) + pollDuration, _ := time.ParseDuration(config.WorkersMetrics.PollPeriod) + go box.waitLoop(ctx) go box.dumpJournalEvery(ctx, time.Minute) + go box.gatherMetricsEvery(ctx, pollDuration) return box, nil } @@ -558,6 +573,44 @@ func (b *Box) Inspect(ctx context.Context, workeruuid string) ([]byte, error) { return []byte(""), nil } +func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedWorkerMetrics) { + mm := b.getMetricsMapping() + for _, uuid := range uuids { + if met, ok := mm[uuid]; ok { + r = append(r, isolate.NewMarkedMetrics(uuid, met)) + } + } + + return +} + +func (b *Box) getIdUuidMapping() map[string]string { + result := make(map[string]string, expectedContainersCount) + + b.muContainers.Lock() + defer b.muContainers.Unlock() + + for _, c := range b.containers { + result[c.containerID] = c.uuid + } + + return result +} + +func (b *Box) setMetricsMapping(m map[string]*isolate.WorkerMetrics) { + b.muMetrics.Lock() + defer b.muMetrics.Unlock() + + b.containersMetrics = m +} + +func (b *Box) getMetricsMapping() map[string]*isolate.WorkerMetrics { + b.muMetrics.Lock() + defer b.muMetrics.Unlock() + + return b.containersMetrics +} + // Close releases all resources such as idle connections from http.Transport func (b *Box) Close() error { b.transport.CloseIdleConnections() diff --git a/isolate/porto/metrics_gatherer.go b/isolate/porto/metrics_gatherer.go new file mode 100644 index 0000000..19e2312 --- /dev/null +++ b/isolate/porto/metrics_gatherer.go @@ -0,0 +1,247 @@ +// TODO: +// - log timings +// +package porto + +import ( + "fmt" + "golang.org/x/net/context" + + "regexp" + "strconv" + "strings" + "time" + + apexlog "github.com/apex/log" + + "github.com/noxiouz/stout/isolate" + "github.com/noxiouz/stout/pkg/log" + + porto "github.com/yandex/porto/src/api/go" +) + +var ( + spacesRegexp, _ = regexp.Compile("[ ]+") + metricsNames = []string{ + "cpu_usage", + "time", + "memory_usage", + "net_tx_bytes", + "net_rx_bytes", + } +) + +const ( + nanosPerSecond = 1000000000 + errorsToDisplay = 4 +) + +const ( + pairName = iota + pairVal + pairLen +) + +type portoResponse map[string]map[string]porto.TPortoGetResponse +type rawMetrics map[string]porto.TPortoGetResponse + +type netIfStat struct { + name string + bytesCount uint64 +} + +// +// Parses string in format `w(lan) interface: bytes count` +// +func parseNetPair(eth string) (nstat netIfStat, err error) { + pair := strings.Split(eth, ": ") + if len(pair) != pairLen { + err = fmt.Errorf("wrong fields count") + return + } + + var v uint64 + trimmedStr := strings.Trim(pair[pairVal], " ") + v, err = strconv.ParseUint(trimmedStr, 10, 64) + if err != nil { + return + } + + name := strings.Trim(pair[pairName], " ") + name = spacesRegexp.ReplaceAllString(name, "_") + + nstat = netIfStat{ + name: name, + bytesCount: v, + } + + return +} + +// TODO: check property Error/ErrorMsg fields +func parseNetValues(val porto.TPortoGetResponse) (ifs []netIfStat) { + for _, eth := range strings.Split(val.Value, ";") { + if nf, err := parseNetPair(eth); err == nil { + ifs = append(ifs, nf) + } + } + + return +} + +func isBroken(r porto.TPortoGetResponse) bool { + return len(r.Value) == 0 || r.Error != 0 || len(r.ErrorMsg) > 0 +} + +func parseUintProp(raw rawMetrics, propName string) (v uint64, err error) { + s, ok := raw[propName] + if !ok { + return 0, fmt.Errorf("no such prop in Porto response: %s", propName) + } + + if isBroken(s) { + return v, fmt.Errorf("property record [%s] is broken, val [%s], err code %d, err msg %s", propName, s.Value, s.Error, s.ErrorMsg) + } + + return strconv.ParseUint(s.Value, 10, 64) +} + +func setUintField(field *uint64, raw rawMetrics, propName string) (err error) { + var v uint64 + if v, err = parseUintProp(raw, propName); err == nil { + *field = v + } + + return +} + +func makeMetricsFromMap(raw rawMetrics) (m isolate.WorkerMetrics, err error) { + m = isolate.NewWorkerMetrics() + + if err = setUintField(&m.UptimeSec, raw, "time"); err != nil { + return + } + + if err = setUintField(&m.CpuUsageSec, raw, "cpu_usage"); err != nil { + return + } + + if m.UptimeSec > 0 { + m.CpuLoad = float32(m.CpuUsageSec) / float32(nanosPerSecond) / float32(m.UptimeSec) + } + + // Porto's `cpu_usage` is in nanoseconds, seconds in metrics are used. + m.CpuUsageSec /= nanosPerSecond + + if err = setUintField(&m.Mem, raw, "memory_usage"); err != nil { + return + } + // `memory_usage` is in bytes, not in pages + // m.Mem *= pageSize + + for _, netIf := range parseNetValues(raw["net_tx_bytes"]) { + v := m.Net[netIf.name] + v.TxBytes += netIf.bytesCount + m.Net[netIf.name] = v + } + + for _, netIf := range parseNetValues(raw["net_rx_bytes"]) { + v := m.Net[netIf.name] + v.RxBytes += netIf.bytesCount + m.Net[netIf.name] = v + } + + return +} + +func parseMetrics(ctx context.Context, props portoResponse, idToUuid map[string]string) map[string]*isolate.WorkerMetrics { + var parse_errors []string + + metrics := make(map[string]*isolate.WorkerMetrics, len(props)) + for id, rawMetrics := range props { + uuid, ok := idToUuid[id] + if !ok { + continue + } + + if m, err := makeMetricsFromMap(rawMetrics); err == nil { + metrics[uuid] = &m + } else { + parse_errors = append(parse_errors, err.Error()) + } + } + + if len(parse_errors) != 0 { + if len(parse_errors) > errorsToDisplay { + parse_errors = parse_errors[:errorsToDisplay] + } + log.G(ctx).Errorf("Failed to parse raw metrics with errors: %s", strings.Join(parse_errors, ", ")) + } + + return metrics +} + +func makeIdsSlice(idToUuid map[string]string) (ids []string) { + for id := range idToUuid { + ids = append(ids, id) + } + return +} + +func closeApiWithLog(ctx context.Context, portoApi porto.API) { + if err := portoApi.Close(); err != nil { + log.G(ctx).WithError(err).Error("Failed to close connection to Porto service") + } +} + +func (box *Box) gatherMetrics(ctx context.Context) { + startTime := time.Now() + idToUuid := box.getIdUuidMapping() + + portoApi, err := portoConnect() + if err != nil { + log.G(ctx).WithError(err).Error("Failed to connect to Porto service for workers metrics collection") + return + } + defer closeApiWithLog(ctx, portoApi) + + ids := makeIdsSlice(idToUuid) + + var props portoResponse + props, err = portoApi.Get(ids, metricsNames) + if err != nil { + log.G(ctx).WithError(err).Error("Failed to get metrics from Porto service") + return + } + + metrics := parseMetrics(ctx, props, idToUuid) + box.setMetricsMapping(metrics) + + log.G(ctx).WithFields(apexlog.Fields{"time": time.Since(startTime), "boxes": len(ids)}).Debug("Finished metrics gather iteration") +} + +func (box *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { + + if interval == 0 { + log.G(ctx).Info("Porto metrics gatherer disabled (use config to setup)") + return + } + + log.G(ctx).Infof("Initializing Porto metrics gather loop with %v duration", interval) + + // Note that `ctx` would be done (cancelled) at this point, + // but internal logger shoud be still available. + defer log.G(ctx).Info("Porto metrics gather loop cancelled") + + tick := time.NewTicker(interval) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + box.gatherMetrics(ctx) + } + } +} diff --git a/isolate/process/box.go b/isolate/process/box.go index bac73f3..503d46c 100644 --- a/isolate/process/box.go +++ b/isolate/process/box.go @@ -18,11 +18,14 @@ import ( "github.com/noxiouz/stout/pkg/log" "github.com/noxiouz/stout/pkg/semaphore" + "github.com/mitchellh/mapstructure" + apexlog "github.com/apex/log" ) const ( - defaultSpoolPath = "/var/spool/cocaine" + defaultSpoolPath = "/var/spool/cocaine" + expectedWorkersCount = 512 ) var ( @@ -40,7 +43,7 @@ type codeStorage interface { type workerInfo struct { *exec.Cmd - uuid string + uuid string } type Box struct { @@ -50,13 +53,35 @@ type Box struct { spoolPath string storage codeStorage - state isolate.GlobalState + state isolate.GlobalState mu sync.Mutex children map[int]workerInfo wg sync.WaitGroup spawnSm semaphore.Semaphore + + muMetrics sync.Mutex + containersMetrics map[string]*isolate.WorkerMetrics +} + +func getMetricsPollConf(cfg interface{}) (metricsConf isolate.MetricsPollConfig, err error) { + decoderConfig := mapstructure.DecoderConfig{ + WeaklyTypedInput: true, + Result: &metricsConf, + TagName: "json", + } + + var decoder *mapstructure.Decoder + + decoder, err = mapstructure.NewDecoder(&decoderConfig) + if err != nil { + return + } + + err = decoder.Decode(cfg) + + return } func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalState) (isolate.Box, error) { @@ -98,6 +123,16 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta box.sigchldHandler() }() + // TODO: increase box working group count? + if wrkMetricsConf, ok := cfg["workersmetrics"]; ok { + if metConf, err := getMetricsPollConf(wrkMetricsConf); err != nil { + log.G(ctx).Infof("Failed to read `workersmetrics` field, using defaults. Err: %v", err) + } else { + duration, _ := time.ParseDuration(metConf.PollPeriod) + go box.gatherMetricsEvery(ctx, duration) + } + } + return box, nil } @@ -242,8 +277,8 @@ func (b *Box) Spawn(ctx context.Context, config isolate.SpawnConfig, output io.W return nil, err } b.children[pr.cmd.Process.Pid] = workerInfo{ - Cmd: pr.cmd, - uuid: "", + Cmd: pr.cmd, + uuid: config.Args["--uuid"], } b.mu.Unlock() @@ -294,6 +329,45 @@ func (b *Box) Inspect(ctx context.Context, worker string) ([]byte, error) { return []byte("{}"), nil } +func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedWorkerMetrics) { + mm := b.getMetricsMapping() + for _, uuid := range uuids { + if met, ok := mm[uuid]; ok { + r = append(r, isolate.NewMarkedMetrics(uuid, met)) + } + } + + return +} + +func (b *Box) getIdUuidMapping() (result map[int]string) { + // TODO: is len(b.children) safe to use as `expectedWorkersCount` + result = make(map[int]string, expectedWorkersCount) + + b.mu.Lock() + defer b.mu.Unlock() + + for pid, kid := range b.children { + result[pid] = kid.uuid + } + + return +} + +func (b *Box) setMetricsMapping(m map[string]*isolate.WorkerMetrics) { + b.muMetrics.Lock() + defer b.muMetrics.Unlock() + + b.containersMetrics = m +} + +func (b *Box) getMetricsMapping() (m map[string]*isolate.WorkerMetrics) { + b.muMetrics.Lock() + defer b.muMetrics.Unlock() + + return b.containersMetrics +} + func (b *Box) fetch(ctx context.Context, appname string) ([]byte, error) { return b.storage.Spool(ctx, appname) } diff --git a/isolate/process/metrics_gatherer.go b/isolate/process/metrics_gatherer.go new file mode 100644 index 0000000..5383d75 --- /dev/null +++ b/isolate/process/metrics_gatherer.go @@ -0,0 +1,260 @@ +// TODO: +// - log timings +// +package process + +import ( + "bytes" + "bufio" + "context" + "fmt" + "io/ioutil" + "regexp" + "strconv" + "strings" + "syscall" + "time" + + "github.com/noxiouz/stout/isolate" + "github.com/noxiouz/stout/pkg/log" +) + +const clockTicks = 100 // sysconf(_SC_CLK_TCK) + +var ( + pageSize = uint64(syscall.Getpagesize()) + spacesRegexp, _ = regexp.Compile("[ ]+") +) + +type ( + memStat struct { + vms uint64 + rss uint64 + } +) + +// /proc//statm fields (see `man proc` for details) +const ( + statmVMS = iota + statmRSS + + statmShare + statmText + statmLib + statmData + statmDt + + statmFieldsCount +) + +const ( + statUtime = 13 + statStime = 14 + + statStartTime = 21 + + statFieldsCount = 44 +) + +const ( + pairKey = iota + pairVal + pairLen +) + +func readLines(b []byte) (text []string) { + reader := bytes.NewBuffer(b) + scanner := bufio.NewScanner(reader) + + for scanner.Scan() { + text = append(text, scanner.Text()) + } + + return +} + +// `bt` in seconds, Unix time since epoch, as usual +func loadSysBootTime() (bt uint64, err error) { + var b []byte + if b, err = ioutil.ReadFile("/proc/stat"); err != nil { + return + } + + for _, ln := range readLines(b) { + if strings.HasPrefix(ln, "btime") { + fields := strings.Fields(ln) + if len(fields) < pairLen { + return bt, fmt.Errorf("incorrect count of fields in `btime` record: %d", len(fields)) + } + + return strconv.ParseUint(fields[pairVal], 10, 64) + } + } + + return +} + +func getProcPath(pid int, file string) string { + return fmt.Sprintf("/proc/%d/%s", pid, file) +} + +func getProcContent(pid int, file string) (content string, err error) { + var b []byte + + if b, err = ioutil.ReadFile(getProcPath(pid, file)); err != nil { + return + } + + content = string(b) + return +} + +func readMemStat(pid int) (mstat memStat, err error) { + var content string + if content, err = getProcContent(pid, "statm"); err != nil { + return + } + + fields := strings.Fields(content) + if len(fields) < statmFieldsCount { + err = fmt.Errorf("wrong number of fields in `statm` file: %d, but shoud be greater or equal to %d", len(fields), statmFieldsCount) + return + } + + var vms, rss uint64 + vms, err = strconv.ParseUint(fields[statmVMS], 10, 64) + if err != nil { + return + } + + rss, err = strconv.ParseUint(fields[statmRSS], 10, 64) + if err != nil { + return + } + + mstat = memStat{ + vms: vms * pageSize, + rss: rss * pageSize, + } + + return +} + +func readCPUPercent(pid int, bootTime uint64) (cpu float32, uptime uint64, err error) { + var content string + if content, err = getProcContent(pid, "stat"); err != nil { + return + } + + fields := strings.Fields(content) + if len(fields) < statFieldsCount { + err = fmt.Errorf("wrong number of fields in `statm` file: %d, but shoud be greater or equal to %d", len(fields), statFieldsCount) + return + } + + var utime, stime, startedAt uint64 + if utime, err = strconv.ParseUint(fields[statUtime], 10, 64); err != nil { + return + } + + if stime, err = strconv.ParseUint(fields[statStime], 10, 64); err != nil { + return + } + + if startedAt, err = strconv.ParseUint(fields[statStartTime], 10, 64); err != nil { + return + } + + utimeSec := float64(utime) / clockTicks + stimeSec := float64(stime) / clockTicks + + startedAt = bootTime + startedAt / clockTicks + created := time.Unix(0, int64(startedAt * uint64(time.Second))) + + total := float64(utimeSec + stimeSec) + if runtime := time.Since(created).Seconds(); runtime > 0 { + uptime = uint64(runtime) + cpu = float32(100 * total / runtime) + } + + return +} + +func makeNiceName(name string) string { + return spacesRegexp.ReplaceAllString(name, "_") +} + +func readProcStat(pid int, bootTime uint64) (stat isolate.WorkerMetrics,err error) { + var ( + cpuload float32 + uptimeSeconds uint64 + memstat memStat + ) + + if cpuload, uptimeSeconds, err = readCPUPercent(pid, bootTime); err != nil { + return + } + + if memstat, err = readMemStat(pid); err != nil { + return + } + + stat = isolate.WorkerMetrics{ + UptimeSec: uptimeSeconds, + // CpuUsageSec: + + CpuLoad: cpuload, + Mem: memstat.vms, + + // Per process net io stat is unimplemented. + // Net: generateNetStat(netstat), + } + + return +} + +func (b *Box) gatherMetrics(ctx context.Context, bootTime uint64) { + ids := b.getIdUuidMapping() + metrics := make(map[string]*isolate.WorkerMetrics, len(ids)) + + for pid, uuid := range ids { + if stat, err := readProcStat(pid, bootTime); err == nil { + metrics[uuid] = &stat + } else { + log.G(ctx).Errorf("Failed to read stat, pid: %d, err: %v", pid, err) + } + } // for each taskInfo + + b.setMetricsMapping(metrics) +} + +func (b *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { + if interval == 0 { + log.G(ctx).Info("Process metrics gatherer disabled (use config to setup)") + return + } + + log.G(ctx).Infof("Initializing Process metrics gather loop with %v duration", interval) + + bootTimeSec, err := loadSysBootTime() + if err != nil { + log.G(ctx).Errorf("Error while reading system boot time %v", err) + return + } + + // Note that `ctx` would be done (cancelled) at this point, + // but internal logger shoud be still available. + defer log.G(ctx).Info("Process metrics polling has been cancelled") + + tick := time.NewTicker(interval) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + b.gatherMetrics(ctx, bootTimeSec) + } + } +}