Skip to content

Commit

Permalink
Instrument Export Container writing
Browse files Browse the repository at this point in the history
Gains visibility over export creation. This is also usful for the
offline collector too (when --debug is specified).
  • Loading branch information
scudette committed Jan 2, 2025
1 parent 21aff92 commit 70ed006
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 22 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/Velocidex/sflags v0.3.1-0.20241126160332-cc1a5b66b8f1
github.com/Velocidex/ttlcache/v2 v2.9.1-0.20240517145123-a3f45e86e130
github.com/Velocidex/yaml/v2 v2.2.8
github.com/Velocidex/zip v0.0.0-20210101070220-e7ecefb7aad7
github.com/Velocidex/zip v0.0.0-20250102162034-1a0ec0ec569c
github.com/alecthomas/assert v1.0.0
github.com/alecthomas/chroma v0.7.3
github.com/alecthomas/participle v0.7.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ github.com/Velocidex/ttlcache/v2 v2.9.1-0.20240517145123-a3f45e86e130 h1:+QujZ0D
github.com/Velocidex/ttlcache/v2 v2.9.1-0.20240517145123-a3f45e86e130/go.mod h1:3/pI9BBAF7gydBWvMVtV7W1qRwshEG9lBwed/d8xfFg=
github.com/Velocidex/yaml/v2 v2.2.8 h1:GUrSy4SBJ6RjGt43k6MeBKtw2z/27gh4A3hfFmFY3No=
github.com/Velocidex/yaml/v2 v2.2.8/go.mod h1:PlXIg/Pxmoja48C1vMHo7C5pauAZvLq/UEPOQ3DsjS4=
github.com/Velocidex/zip v0.0.0-20210101070220-e7ecefb7aad7 h1:IAry9WUMrVYA+XPvMF5UMN56ya5II/hoUOtqaHKOHrs=
github.com/Velocidex/zip v0.0.0-20210101070220-e7ecefb7aad7/go.mod h1:1p8CU2cp64BG4334sKzhuyH/vm3k1OXEdeBCwYTssAs=
github.com/Velocidex/zip v0.0.0-20250102162034-1a0ec0ec569c h1:+/E/0rL46fcD0ykUWrY66JBxpjjksxN+ZnC596XgHIQ=
github.com/Velocidex/zip v0.0.0-20250102162034-1a0ec0ec569c/go.mod h1:1p8CU2cp64BG4334sKzhuyH/vm3k1OXEdeBCwYTssAs=
github.com/VirusTotal/gyp v0.9.0 h1:jhOBl93jfStmAcKLa/EcTmdPng5bn5kvJJZqQqJ5R4g=
github.com/VirusTotal/gyp v0.9.0/go.mod h1:nmcW15dQ1657PmMcG9X/EZmp6rTQsyo9g8r6Cz1/AHc=
github.com/alecthomas/assert v0.0.0-20170929043011-405dbfeb8e38/go.mod h1:r7bzyVFMNntcxPZXK3/+KdruV1H5KSlyVY0gc+NgInI=
Expand Down
73 changes: 66 additions & 7 deletions reporting/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,51 @@ type MemberWriter struct {
writer_wg *sync.WaitGroup

owner *Container

stats_provider concurrent_zip.StatsWriter
id uint64
}

func (self *MemberWriter) Write(buff []byte) (int, error) {
self.owner.increaseUncompressedBytes(len(buff))
return self.WriteCloser.Write(buff)
res, err := self.WriteCloser.Write(buff)

ContainerTracker.UpdateContainerWriter(self.owner.id, self.id,
func(info *WriterInfo) {
if self.stats_provider != nil {
stats := self.stats_provider.GetStats()
info.CompressedSize = int(stats.CompressedSize)
info.TmpFile = stats.TmpFile
}
info.UncompressedSize += res
info.LastWrite = utils.GetTime().Now()
})

// FIXME: Use this to instrument a very slow export
// time.Sleep(200 * time.Millisecond)

return res, err
}

// Keep track of all members that are closed to allow the zip to be
// written properly.
func (self *MemberWriter) Close() error {
err := self.WriteCloser.Close()
self.writer_wg.Done()

ContainerTracker.UpdateContainerWriter(self.owner.id, self.id,
func(info *WriterInfo) {
info.Closed = utils.GetTime().Now()
})

return err
}

type Container struct {
config_obj *config_proto.Config

id uint64

// The underlying file writer
fd io.WriteCloser

Expand Down Expand Up @@ -159,11 +186,27 @@ func (self *Container) Create(name string, mtime time.Time) (io.WriteCloser, err
return nil, err
}

return &MemberWriter{
WriteCloser: writer,
writer_wg: &self.writer_wg,
owner: self,
}, nil
stats_provider, _ := writer.(concurrent_zip.StatsWriter)

res := &MemberWriter{
WriteCloser: writer,
stats_provider: stats_provider,
writer_wg: &self.writer_wg,
owner: self,
id: utils.GetId(),
}

ContainerTracker.UpdateContainerWriter(self.id, res.id,
func(info *WriterInfo) {
info.Name = name
info.Created = utils.GetTime().Now()
if stats_provider != nil {
stats := stats_provider.GetStats()
info.TmpFile = stats.TmpFile
}
})

return res, nil
}

func (self *Container) StoreArtifact(
Expand Down Expand Up @@ -671,7 +714,17 @@ func NewContainer(
}
files.Add(path)

return NewContainerFromWriter(path, config_obj, fd, password, level, metadata)
res, err := NewContainerFromWriter(path, config_obj,
fd, password, level, metadata)
if err != nil {
return nil, err
}

ContainerTracker.UpdateContainer(res.id, func(info *ContainerInfo) {
info.BackingFile = path
})

return res, nil
}

func NewContainerFromWriter(
Expand All @@ -690,6 +743,7 @@ func NewContainerFromWriter(
sha_sum := sha256.New()

result := &Container{
id: utils.GetId(),
config_obj: config_obj,
name: name,
fd: fd,
Expand Down Expand Up @@ -743,5 +797,10 @@ func NewContainerFromWriter(
}
}

ContainerTracker.UpdateContainer(result.id, func(info *ContainerInfo) {
info.Name = result.name
info.CreateTime = utils.GetTime().Now()
})

return result, nil
}
108 changes: 108 additions & 0 deletions reporting/profile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package reporting

import (
"context"
"sync"
"time"

"github.com/Velocidex/ordereddict"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
"www.velocidex.com/golang/velociraptor/services/debug"
"www.velocidex.com/golang/vfilter"
)

var (
ContainerTracker = NewContainerTracker()
)

type WriterInfo struct {
Name string
TmpFile string
CompressedSize int
UncompressedSize int
Created time.Time
LastWrite time.Time
Closed time.Time
}

type ContainerInfo struct {
Name string
BackingFile string
Stats *api_proto.ContainerStats
InFlightWriters map[uint64]*WriterInfo
CreateTime time.Time
CloseTime time.Time
}

type _ContainerTracker struct {
mu sync.Mutex
containers map[uint64]*ContainerInfo
}

func (self *_ContainerTracker) UpdateContainerWriter(
container_id, writer_id uint64, cb func(info *WriterInfo)) {

self.UpdateContainer(container_id, func(info *ContainerInfo) {
writer_info, pres := info.InFlightWriters[writer_id]
if !pres {
writer_info = &WriterInfo{}
info.InFlightWriters[writer_id] = writer_info
}

cb(writer_info)
})
}

func (self *_ContainerTracker) UpdateContainer(
id uint64, cb func(info *ContainerInfo)) {
self.mu.Lock()
defer self.mu.Unlock()

record, pres := self.containers[id]
if !pres {
record = &ContainerInfo{
InFlightWriters: make(map[uint64]*WriterInfo),
}
self.containers[id] = record
}

cb(record)
}

func (self *_ContainerTracker) WriteMetrics(
ctx context.Context, scope vfilter.Scope,
output_chan chan vfilter.Row) {

self.mu.Lock()
defer self.mu.Unlock()

for _, container := range self.containers {
for _, w := range container.InFlightWriters {
output_chan <- ordereddict.NewDict().
Set("ContainerName", container.Name).
Set("ZipCreateTime", container.CreateTime).
Set("ZipCloseTime", container.CloseTime).
Set("MemberName", w.Name).
Set("MemberTmpFile", w.TmpFile).
Set("MemberCreate", w.Created).
Set("MemberCompressedSize", w.CompressedSize).
Set("MemberUncompressedSize", w.UncompressedSize).
Set("MemberLastWrite", w.LastWrite).
Set("MemberClosed", w.Closed)
}
}
}

func NewContainerTracker() *_ContainerTracker {
return &_ContainerTracker{
containers: make(map[uint64]*ContainerInfo),
}
}

func init() {
debug.RegisterProfileWriter(debug.ProfileWriterInfo{
Name: "ExportContainers",
Description: "Report the state of current exports",
ProfileWriter: ContainerTracker.WriteMetrics,
})
}
29 changes: 29 additions & 0 deletions reporting/tmpfiles.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package reporting

import (
"os"

concurrent_zip "github.com/Velocidex/zip"
"www.velocidex.com/golang/velociraptor/utils/tempfile"
)

type TmpfileFactory int

func (self TmpfileFactory) TempFile() (*os.File, error) {
tmpfile, err := tempfile.TempFile("zip")
if err != nil {
return nil, err
}
tempfile.AddTmpFile(tmpfile.Name())

return tmpfile, nil
}

func (self TmpfileFactory) RemoveTempFile(filename string) {
err := os.Remove(filename)
tempfile.RemoveTmpFile(filename, err)
}

func init() {
concurrent_zip.SetTmpfileProvider(TmpfileFactory(0))
}
4 changes: 2 additions & 2 deletions vql/windows/etw/context.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build windows && cgo
// +build windows,cgo
//go:build windows && cgo && amd64
// +build windows,cgo,amd64

package etw

Expand Down
4 changes: 2 additions & 2 deletions vql/windows/etw/options.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build windows && cgo
// +build windows,cgo
//go:build windows && cgo && amd64
// +build windows,cgo,amd64

package etw

Expand Down
4 changes: 2 additions & 2 deletions vql/windows/etw/protocols.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build windows && cgo
// +build windows,cgo
//go:build windows && cgo && amd64
// +build windows,cgo,amd64

package etw

Expand Down
4 changes: 2 additions & 2 deletions vql/windows/etw/stats.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build windows && cgo
// +build windows,cgo
//go:build windows && cgo && amd64
// +build windows,cgo,amd64

package etw

Expand Down
4 changes: 2 additions & 2 deletions vql/windows/etw/watch_etw.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build windows && cgo
// +build windows,cgo
//go:build windows && cgo && amd64
// +build windows,cgo,amd64

package etw

Expand Down
4 changes: 2 additions & 2 deletions vql/windows/etw/watcher.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build windows && cgo
// +build windows,cgo
//go:build windows && cgo && amd64
// +build windows,cgo,amd64

package etw

Expand Down

0 comments on commit 70ed006

Please sign in to comment.