Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a small traces table for querying. #460

Merged
merged 2 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ github.com/ServiceWeaver/weaver/runtime/metrics
unicode
unicode/utf8
github.com/ServiceWeaver/weaver/runtime/perfetto
bytes
context
crypto/sha256
database/sql
Expand Down
2 changes: 1 addition & 1 deletion internal/status/templates/traces.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<tr>
<th scope="col">Trace URL</th>
<th scope="col">Start Time</th>
<th scope="col">Duration</th>
<th scope="col">Latency</th>
</tr>
</thead>
<tbody>
Expand Down
99 changes: 62 additions & 37 deletions runtime/perfetto/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package perfetto

import (
"bytes"
"context"
"crypto/sha256"
"database/sql"
Expand Down Expand Up @@ -108,27 +109,30 @@ func Open(ctx context.Context, fname string) (*DB, error) {
}

const initTables = `
-- Trace span data.
CREATE TABLE IF NOT EXISTS spans (
-- Queryable trace data.
CREATE TABLE IF NOT EXISTS traces (
trace_id TEXT NOT NULL,
app TEXT NOT NULL,
version TEXT NOT NULL,
name TEXT,
trace_id TEXT NOT NULL,
span_id TEXT NOT NULL,
parent_span_id TEXT,
start_time_unix_us INTEGER,
end_time_unix_us INTEGER,
encoded_span TEXT,
PRIMARY KEY(trace_id, span_id)
PRIMARY KEY(trace_id)
);

-- Trace span attributes.
CREATE TABLE IF NOT EXISTS span_attributes (
-- Queryable trace attributes.
CREATE TABLE IF NOT EXISTS trace_attributes (
trace_id TEXT NOT NULL,
span_id TEXT NOT NULL,
key TEXT NOT NULL,
val TEXT,
FOREIGN KEY (trace_id, span_id) REFERENCES spans (trace_id, span_id)
FOREIGN KEY (trace_id) REFERENCES traces (trace_id)
);

-- Encoded spans.
CREATE TABLE IF NOT EXISTS encoded_spans (
trace_id TEXT NOT NULL,
data TEXT,
FOREIGN KEY (trace_id) REFERENCES traces (trace_id)
);
`
if _, err := t.execDB(ctx, initTables); err != nil {
Expand All @@ -145,46 +149,62 @@ func (d *DB) Close() error {

// Store stores the given traces in the database.
func (d *DB) Store(ctx context.Context, app, version string, spans *protos.TraceSpans) error {
// NOTE: we insert all rows transactionally, as it is significantly faster
// than inserting one row at a time [1].
//
// [1]: https://stackoverflow.com/questions/1711631/improve-insert-per-second-performance-of-sqlite
tx, err := d.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelLinearizable})
if err != nil {
return err
}
defer tx.Rollback() //nolint:errcheck // supplanted by errs below
var errs []error
for _, span := range spans.Span {
if err := d.storeSpan(ctx, app, version, span); err != nil {
if isRootSpan(span) {
if err := d.storeTrace(ctx, tx, app, version, span); err != nil {
errs = append(errs, err)
}
}
if err := d.storeSpan(ctx, tx, span); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
if errs != nil {
return errors.Join(errs...)
}
return tx.Commit()
}

func (d *DB) storeSpan(ctx context.Context, app, version string, span *protos.Span) error {
encoded, err := proto.Marshal(span)
if err != nil {
return err
}
const spanStmt = `
INSERT INTO spans(app,version,name,trace_id,span_id,parent_span_id,start_time_unix_us,end_time_unix_us,encoded_span)
VALUES (?,?,?,?,?,?,?,?,?)`
res, err := d.execDB(ctx, spanStmt, app, version, span.Name, hex.EncodeToString(span.TraceId), hex.EncodeToString(span.SpanId), hex.EncodeToString(span.ParentSpanId), span.StartMicros, span.EndMicros, encoded)
func (d *DB) storeTrace(ctx context.Context, tx *sql.Tx, app, version string, root *protos.Span) error {
const traceStmt = `
INSERT INTO traces(trace_id,app,version,name,start_time_unix_us,end_time_unix_us)
VALUES (?,?,?,?,?,?)`
_, err := tx.ExecContext(ctx, traceStmt, hex.EncodeToString(root.TraceId), app, version, root.Name, root.StartMicros, root.EndMicros)
if err != nil {
return fmt.Errorf("write span to %s: %w", d.fname, err)
}
id, err := res.LastInsertId()
if err != nil {
return fmt.Errorf("get last insertion id to %s: %w", d.fname, err)
}

// NOTE: we perform attribute inserts non-transactionaly, for performance
// reasons. In terms of DB consistency, this may result in some traces
// getting missed during querying, but that is okay as querying doesn't
// require strong consistency guarantees.
const attrStmt = `INSERT INTO span_attributes VALUES(?,?,?)`
for _, attr := range span.Attributes {
const attrStmt = `INSERT INTO trace_attributes VALUES(?,?,?)`
for _, attr := range root.Attributes {
valStr := attributeValueString(attr)
if _, err := d.execDB(ctx, attrStmt, id, attr.Key, valStr); err != nil {
return fmt.Errorf("write span attributes to %s: %w", d.fname, err)
_, err := tx.ExecContext(ctx, attrStmt, hex.EncodeToString(root.TraceId), attr.Key, valStr)
if err != nil {
return fmt.Errorf("write trace attributes to %s: %w", d.fname, err)
}
}
return nil
}

func (d *DB) storeSpan(ctx context.Context, tx *sql.Tx, span *protos.Span) error {
encoded, err := proto.Marshal(span)
if err != nil {
return err
}
const stmt = `INSERT INTO encoded_spans(trace_id,data) VALUES (?,?)`
_, err = tx.ExecContext(ctx, stmt, hex.EncodeToString(span.TraceId), encoded)
return err
}

// TraceSummary stores summary information about a trace.
type TraceSummary struct {
TraceID string // Unique trace identifier, in hex format.
Expand All @@ -208,9 +228,8 @@ type TraceSummary struct {
func (d *DB) QueryTraces(ctx context.Context, app, version string, startTime, endTime time.Time, durationLower, durationUpper time.Duration, limit int64) ([]TraceSummary, error) {
const query = `
SELECT trace_id, start_time_unix_us, end_time_unix_us
FROM spans
FROM traces
WHERE
(parent_span_id="0000000000000000") AND
(app=? OR ?="") AND (version=? OR ?="") AND
(start_time_unix_us>=? OR ?=0) AND (end_time_unix_us<=? OR ?=0) AND
((end_time_unix_us - start_time_unix_us)>=? OR ?=0) AND
Expand Down Expand Up @@ -256,7 +275,7 @@ LIMIT ?
}

func (d *DB) fetchSpans(ctx context.Context, traceID string) ([]*protos.Span, error) {
const query = `SELECT encoded_span FROM spans WHERE trace_id=?`
const query = `SELECT data FROM encoded_spans WHERE trace_id=?`
rows, err := d.queryDB(ctx, query, traceID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -544,6 +563,12 @@ func isLocked(err error) bool {
return ok && (sqlError.Code() == sqlite3.SQLITE_BUSY || sqlError.Code() == sqlite3.SQLITE_LOCKED)
}

// isRootSpan returns true iff the given span is a root span.
func isRootSpan(span *protos.Span) bool {
var nilSpanID [8]byte
return bytes.Equal(span.ParentSpanId, nilSpanID[:])
}

func fp(name string) int {
hasher := sha256.New()
hasher.Write([]byte(name))
Expand Down
60 changes: 45 additions & 15 deletions runtime/perfetto/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ func makeSpan(name string, tid, sid, pid string, start, end time.Time) *protos.S
}
}

func tid(id int) string {
return fmt.Sprintf("%016d", id)
}

func sid(id int) string {
if id == 0 {
return string(make([]byte, 8))
}
return fmt.Sprintf("%08d", id)
}

func tick(t int) time.Time {
if t == 0 {
return time.Time{}
}
return now.Add(time.Duration(t) * time.Second)
}

func TestQueryTraces(t *testing.T) {
ctx := context.Background()
fname := filepath.Join(t.TempDir(), "tracedb.db_test.db")
Expand All @@ -59,21 +77,6 @@ func TestQueryTraces(t *testing.T) {
}
defer db.Close()

tid := func(id int) string {
return fmt.Sprintf("%016d", id)
}
sid := func(id int) string {
if id == 0 {
return string(make([]byte, 8))
}
return fmt.Sprintf("%08d", id)
}
tick := func(t int) time.Time {
if t == 0 {
return time.Time{}
}
return now.Add(time.Duration(t) * time.Second)
}
dur := func(ts int) time.Duration {
return time.Duration(ts) * time.Second
}
Expand Down Expand Up @@ -198,3 +201,30 @@ func TestQueryTraces(t *testing.T) {
})
}
}

func BenchmarkStore(b *testing.B) {
ctx := context.Background()
s := makeSpan("s1", tid(1), sid(1), sid(1), tick(3), tick(10))
for _, size := range []int{1, 10, 100} {
b.Run(fmt.Sprintf("%d", size), func(b *testing.B) {
fname := filepath.Join(b.TempDir(), "tracedb.db_bench.db")
db, err := Open(ctx, fname)
if err != nil {
b.Fatal(err)
}
b.Cleanup(func() { db.Close() })

spans := &protos.TraceSpans{}
spans.Span = make([]*protos.Span, size)
for i := 0; i < size; i++ {
spans.Span[i] = s
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
if err := db.Store(ctx, "app", "v", spans); err != nil {
b.Fatal(err)
}
}
})
}
}