From 7ef556520733b5abadc3b62edcf5f7172f019e5b Mon Sep 17 00:00:00 2001 From: Lawrence Jones Date: Tue, 5 Mar 2019 15:19:40 +0000 Subject: [PATCH] Don't duplicate log entries for replay Previously we weren't deleting our item from the unbounds when we bound it in response to detail statements. This leads to a duplication of items that this commit tests for and fixes. --- pkg/pgreplay/parse.go | 7 ++-- pkg/pgreplay/parse_test.go | 85 +++++++++++++++++++++++++++++++++++++- pkg/pgreplay/types.go | 10 ++--- 3 files changed, 93 insertions(+), 9 deletions(-) diff --git a/pkg/pgreplay/parse.go b/pkg/pgreplay/parse.go index dfd1a18..df67ffb 100644 --- a/pkg/pgreplay/parse.go +++ b/pkg/pgreplay/parse.go @@ -131,6 +131,7 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) ( // for this session, as this log line will confirm the unbound query has no parameters. if strings.HasPrefix(msg, LogDuration) { if unbound, ok := unbounds[details.SessionID]; ok { + delete(unbounds, details.SessionID) return unbound.Bind(nil), nil } @@ -139,7 +140,7 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) ( // LOG: statement: select pg_reload_conf(); if strings.HasPrefix(msg, LogStatement) { - return &Statement{details, strings.TrimPrefix(msg, LogStatement)}, nil + return Statement{details, strings.TrimPrefix(msg, LogStatement)}, nil } // LOG: execute : select pg_sleep($1) @@ -200,12 +201,12 @@ func ParseItem(logline string, unbounds map[SessionID]*Execute, buffer []byte) ( // LOG: connection authorized: user=postgres database=postgres if strings.HasPrefix(msg, LogConnectionAuthorized) { - return &Connect{details}, nil + return Connect{details}, nil } // LOG: disconnection: session time: 0:00:03.861 user=postgres database=postgres host=192.168.99.1 port=51529 if strings.HasPrefix(msg, LogConnectionDisconnect) { - return &Disconnect{details}, nil + return Disconnect{details}, nil } // LOG: connection received: host=192.168.99.1 port=52188 diff --git a/pkg/pgreplay/parse_test.go b/pkg/pgreplay/parse_test.go index acda909..866f9bd 100644 --- a/pkg/pgreplay/parse_test.go +++ b/pkg/pgreplay/parse_test.go @@ -2,12 +2,94 @@ package pgreplay import ( "strings" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" + _ "github.com/onsi/gomega/gstruct" ) +var time20190225, _ = time.Parse(time.RFC3339, "2019-02-25T15:08:27.222+00:00") + +var _ = Describe("Parse", func() { + DescribeTable("Parses", + func(input string, expected []Item) { + var items = []Item{} + itemsChan, errs, done := Parse(strings.NewReader(input)) + go func() { + for _ = range errs { + // no-op, just drain the channel + } + }() + + for item := range itemsChan { + if item != nil { + items = append(items, item) + } + } + + Eventually(done).Should(BeClosed()) + Expect(len(items)).To(Equal(len(expected))) + + for idx, item := range items { + Expect(item).To(BeEquivalentTo(expected[idx])) + } + }, + Entry( + "Extended protocol with duration logs", + ` +2019-02-25 15:08:27.232 GMT|[unknown]|[unknown]|5c7404eb.d6bd|LOG: connection received: host=127.0.0.1 port=59103 +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: connection authorized: user=alice database=pgreplay_test +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.968 ms parse : select t.oid +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 1.100 ms bind : select t.oid +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: execute : select t.oid +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.326 ms + +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.042 ms parse : insert into logs (author, message) ($1, $2) +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.045 ms bind : insert into logs (author, message) ($1, $2) +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|DETAIL: parameters: $1 = 'alice', $2 = 'bob' +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: execute : insert into logs (author, message) ($1, $2) +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|DETAIL: parameters: $1 = 'alice', $2 = 'bob' +2019-02-25 15:08:27.222 GMT|alice|pgreplay_test|5c7404eb.d6bd|LOG: duration: 0.042 ms`, + []Item{ + Connect{ + Details{ + Timestamp: time20190225, + SessionID: "5c7404eb.d6bd", + User: "alice", + Database: "pgreplay_test", + }, + }, + BoundExecute{ + Execute: &Execute{ + Details: Details{ + Timestamp: time20190225, + SessionID: "5c7404eb.d6bd", + User: "alice", + Database: "pgreplay_test", + }, + Query: "select t.oid", + }, + Parameters: []interface{}{}, + }, + BoundExecute{ + Execute: &Execute{ + Details: Details{ + Timestamp: time20190225, + SessionID: "5c7404eb.d6bd", + User: "alice", + Database: "pgreplay_test", + }, + Query: "insert into logs (author, message) ($1, $2)", + }, + Parameters: []interface{}{"alice", "bob"}, + }, + }, + ), + ) +}) + var _ = Describe("ParseBindParameters", func() { DescribeTable("Parses", func(input string, expected []interface{}) { @@ -44,7 +126,8 @@ var _ = Describe("LogScanner", func() { ), Entry( "Multiple lines", - `2010-12-31 10:59:52.243 UTC|postgres + ` +2010-12-31 10:59:52.243 UTC|postgres 2010-12-31 10:59:53.000 UTC|paysvc`, []string{ `2010-12-31 10:59:52.243 UTC|postgres`, diff --git a/pkg/pgreplay/types.go b/pkg/pgreplay/types.go index cc93073..1e7899c 100644 --- a/pkg/pgreplay/types.go +++ b/pkg/pgreplay/types.go @@ -26,10 +26,10 @@ type Item interface { } type Details struct { - Timestamp time.Time - SessionID SessionID - User string - Database string + Timestamp time.Time `json:"timestamp"` + SessionID SessionID `json:"session_id"` + User string `json:"user"` + Database string `json:"database"` } func (e Details) GetTimestamp() time.Time { return e.Timestamp } @@ -51,7 +51,7 @@ func (_ Disconnect) Handle(conn *pgx.Conn) error { type Statement struct { Details - Query string + Query string `json:"query"` } func (s Statement) Handle(conn *pgx.Conn) error {