Skip to content

Commit

Permalink
Use a small traces table for querying.
Browse files Browse the repository at this point in the history
Suggested by @ghemawat.

The traces table now contains only the information requied for querying. The
new format is more space efficient and leads to faster querying.

Also, insert all trace data into the database in a single transaction,
as it is significantly  more efficient:

Before:
cpu: Intel(R) Xeon(R) Gold 6154 CPU @ 3.00GHz
BenchmarkStore
BenchmarkStore/1
BenchmarkStore/1-72  	     100	  10763759 ns/op
BenchmarkStore/10
BenchmarkStore/10-72 	      10	 108713097 ns/op
BenchmarkStore/100
BenchmarkStore/100-72         	       1	1055861276 ns/op

After:
BenchmarkStore
BenchmarkStore/1
BenchmarkStore/1-72  	     100	  10609201 ns/op
BenchmarkStore/10
BenchmarkStore/10-72 	     100	  10700368 ns/op
BenchmarkStore/100
BenchmarkStore/100-72         	      96	  13052743 ns/op
  • Loading branch information
spetrovic77 committed Jul 17, 2023
1 parent 2bafa62 commit 711de94
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 53 deletions.
1 change: 1 addition & 0 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ github.com/ServiceWeaver/weaver/runtime/metrics
unicode
unicode/utf8
github.com/ServiceWeaver/weaver/runtime/perfetto
bytes
context
crypto/sha256
database/sql
Expand Down
2 changes: 1 addition & 1 deletion internal/status/templates/traces.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<tr>
<th scope="col">Trace URL</th>
<th scope="col">Start Time</th>
<th scope="col">Duration</th>
<th scope="col">Latency</th>
</tr>
</thead>
<tbody>
Expand Down
99 changes: 62 additions & 37 deletions runtime/perfetto/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package perfetto

import (
"bytes"
"context"
"crypto/sha256"
"database/sql"
Expand Down Expand Up @@ -108,27 +109,30 @@ func Open(ctx context.Context, fname string) (*DB, error) {
}

const initTables = `
-- Trace span data.
CREATE TABLE IF NOT EXISTS spans (
-- Queryable trace data.
CREATE TABLE IF NOT EXISTS traces (
trace_id TEXT NOT NULL,
app TEXT NOT NULL,
version TEXT NOT NULL,
name TEXT,
trace_id TEXT NOT NULL,
span_id TEXT NOT NULL,
parent_span_id TEXT,
start_time_unix_us INTEGER,
end_time_unix_us INTEGER,
encoded_span TEXT,
PRIMARY KEY(trace_id, span_id)
PRIMARY KEY(trace_id)
);
-- Trace span attributes.
CREATE TABLE IF NOT EXISTS span_attributes (
-- Queryable trace attributes.
CREATE TABLE IF NOT EXISTS trace_attributes (
trace_id TEXT NOT NULL,
span_id TEXT NOT NULL,
key TEXT NOT NULL,
val TEXT,
FOREIGN KEY (trace_id, span_id) REFERENCES spans (trace_id, span_id)
FOREIGN KEY (trace_id) REFERENCES traces (trace_id)
);
-- Encoded spans.
CREATE TABLE IF NOT EXISTS encoded_spans (
trace_id TEXT NOT NULL,
data TEXT,
FOREIGN KEY (trace_id) REFERENCES traces (trace_id)
);
`
if _, err := t.execDB(ctx, initTables); err != nil {
Expand All @@ -145,46 +149,62 @@ func (d *DB) Close() error {

// Store stores the given traces in the database.
func (d *DB) Store(ctx context.Context, app, version string, spans *protos.TraceSpans) error {
// NOTE: we insert all rows transactionally, as it is significantly faster
// than inserting one row at a time [1].
//
// [1]: https://stackoverflow.com/questions/1711631/improve-insert-per-second-performance-of-sqlite
tx, err := d.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelLinearizable})
if err != nil {
return err
}
defer tx.Rollback()
var errs []error
for _, span := range spans.Span {
if err := d.storeSpan(ctx, app, version, span); err != nil {
if isRootSpan(span) {
if err := d.storeTrace(ctx, tx, app, version, span); err != nil {
errs = append(errs, err)
}
}
if err := d.storeSpan(ctx, tx, span); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
if errs != nil {
return errors.Join(errs...)
}
return tx.Commit()
}

func (d *DB) storeSpan(ctx context.Context, app, version string, span *protos.Span) error {
encoded, err := proto.Marshal(span)
if err != nil {
return err
}
const spanStmt = `
INSERT INTO spans(app,version,name,trace_id,span_id,parent_span_id,start_time_unix_us,end_time_unix_us,encoded_span)
VALUES (?,?,?,?,?,?,?,?,?)`
res, err := d.execDB(ctx, spanStmt, app, version, span.Name, hex.EncodeToString(span.TraceId), hex.EncodeToString(span.SpanId), hex.EncodeToString(span.ParentSpanId), span.StartMicros, span.EndMicros, encoded)
func (d *DB) storeTrace(ctx context.Context, tx *sql.Tx, app, version string, root *protos.Span) error {
const traceStmt = `
INSERT INTO traces(trace_id,app,version,name,start_time_unix_us,end_time_unix_us)
VALUES (?,?,?,?,?,?)`
_, err := tx.ExecContext(ctx, traceStmt, hex.EncodeToString(root.TraceId), app, version, root.Name, root.StartMicros, root.EndMicros)
if err != nil {
return fmt.Errorf("write span to %s: %w", d.fname, err)
}
id, err := res.LastInsertId()
if err != nil {
return fmt.Errorf("get last insertion id to %s: %w", d.fname, err)
}

// NOTE: we perform attribute inserts non-transactionaly, for performance
// reasons. In terms of DB consistency, this may result in some traces
// getting missed during querying, but that is okay as querying doesn't
// require strong consistency guarantees.
const attrStmt = `INSERT INTO span_attributes VALUES(?,?,?)`
for _, attr := range span.Attributes {
const attrStmt = `INSERT INTO trace_attributes VALUES(?,?,?)`
for _, attr := range root.Attributes {
valStr := attributeValueString(attr)
if _, err := d.execDB(ctx, attrStmt, id, attr.Key, valStr); err != nil {
return fmt.Errorf("write span attributes to %s: %w", d.fname, err)
_, err := tx.ExecContext(ctx, attrStmt, hex.EncodeToString(root.TraceId), attr.Key, valStr)
if err != nil {
return fmt.Errorf("write trace attributes to %s: %w", d.fname, err)
}
}
return nil
}

func (d *DB) storeSpan(ctx context.Context, tx *sql.Tx, span *protos.Span) error {
encoded, err := proto.Marshal(span)
if err != nil {
return err
}
const stmt = `INSERT INTO encoded_spans(trace_id,data) VALUES (?,?)`
_, err = tx.ExecContext(ctx, stmt, hex.EncodeToString(span.TraceId), encoded)
return err
}

// TraceSummary stores summary information about a trace.
type TraceSummary struct {
TraceID string // Unique trace identifier, in hex format.
Expand All @@ -208,9 +228,8 @@ type TraceSummary struct {
func (d *DB) QueryTraces(ctx context.Context, app, version string, startTime, endTime time.Time, durationLower, durationUpper time.Duration, limit int64) ([]TraceSummary, error) {
const query = `
SELECT trace_id, start_time_unix_us, end_time_unix_us
FROM spans
FROM traces
WHERE
(parent_span_id="0000000000000000") AND
(app=? OR ?="") AND (version=? OR ?="") AND
(start_time_unix_us>=? OR ?=0) AND (end_time_unix_us<=? OR ?=0) AND
((end_time_unix_us - start_time_unix_us)>=? OR ?=0) AND
Expand Down Expand Up @@ -256,7 +275,7 @@ LIMIT ?
}

func (d *DB) fetchSpans(ctx context.Context, traceID string) ([]*protos.Span, error) {
const query = `SELECT encoded_span FROM spans WHERE trace_id=?`
const query = `SELECT data FROM encoded_spans WHERE trace_id=?`
rows, err := d.queryDB(ctx, query, traceID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -544,6 +563,12 @@ func isLocked(err error) bool {
return ok && (sqlError.Code() == sqlite3.SQLITE_BUSY || sqlError.Code() == sqlite3.SQLITE_LOCKED)
}

// isRootSpan returns true iff the given span is a root span.
func isRootSpan(span *protos.Span) bool {
var nilSpanID [8]byte
return bytes.Equal(span.ParentSpanId, nilSpanID[:])
}

func fp(name string) int {
hasher := sha256.New()
hasher.Write([]byte(name))
Expand Down
60 changes: 45 additions & 15 deletions runtime/perfetto/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ func makeSpan(name string, tid, sid, pid string, start, end time.Time) *protos.S
}
}

func tid(id int) string {
return fmt.Sprintf("%016d", id)
}

func sid(id int) string {
if id == 0 {
return string(make([]byte, 8))
}
return fmt.Sprintf("%08d", id)
}

func tick(t int) time.Time {
if t == 0 {
return time.Time{}
}
return now.Add(time.Duration(t) * time.Second)
}

func TestQueryTraces(t *testing.T) {
ctx := context.Background()
fname := filepath.Join(t.TempDir(), "tracedb.db_test.db")
Expand All @@ -59,21 +77,6 @@ func TestQueryTraces(t *testing.T) {
}
defer db.Close()

tid := func(id int) string {
return fmt.Sprintf("%016d", id)
}
sid := func(id int) string {
if id == 0 {
return string(make([]byte, 8))
}
return fmt.Sprintf("%08d", id)
}
tick := func(t int) time.Time {
if t == 0 {
return time.Time{}
}
return now.Add(time.Duration(t) * time.Second)
}
dur := func(ts int) time.Duration {
return time.Duration(ts) * time.Second
}
Expand Down Expand Up @@ -198,3 +201,30 @@ func TestQueryTraces(t *testing.T) {
})
}
}

func BenchmarkStore(b *testing.B) {
ctx := context.Background()
s := makeSpan("s1", tid(1), sid(1), sid(1), tick(3), tick(10))
for _, size := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("%d", size), func(b *testing.B) {
fname := filepath.Join(b.TempDir(), "tracedb.db_bench.db")
db, err := Open(ctx, fname)
if err != nil {
b.Fatal(err)
}
b.Cleanup(func() { db.Close() })

spans := &protos.TraceSpans{}
spans.Span = make([]*protos.Span, size)
for i := 0; i < size; i++ {
spans.Span[i] = s
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
if err := db.Store(ctx, "app", "v", spans); err != nil {
b.Fatal(err)
}
}
})
}
}

0 comments on commit 711de94

Please sign in to comment.