Skip to content

Commit

Permalink
Disable logging and switch event callback writing buffer check
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyhb committed Oct 8, 2024
1 parent f84d07e commit fbe5747
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 10 deletions.
6 changes: 1 addition & 5 deletions internal/test/pg_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const (
DefaultSeed = 123
DefaultAccountUUID = "6db2bd8a-2a2f-52d3-aa79-abb4015d6dbd"
DefaultAccountUUID = "9b332174-2fc5-5781-8aba-b2500384cc1c"
DefaultEmail = "lriai1h2oy1d@example.com"
)

Expand Down Expand Up @@ -89,10 +89,6 @@ func InsertAccounts(t *testing.T, ctx context.Context, cfg pgx.ConnConfig, opts
}
}

func InsertAccountsAndUsers(t *testing.T, ctx context.Context, opts InsertOpts) {
panic("nah")
}

func hash(in any) string {
switch v := in.(type) {
case string:
Expand Down
6 changes: 6 additions & 0 deletions internal/test/pg_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package test
import (
"context"
"fmt"
"log"
"strings"
"testing"

"github.com/docker/docker/pkg/ioutils"
"github.com/inngest/dbcap/pkg/replicator/pgreplicator/pgsetup"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
Expand All @@ -24,6 +26,10 @@ type StartPGOpts struct {
DisableReplicaIdentityFull bool
}

func init() {
tc.Logger = log.New(&ioutils.NopWriter{}, "", 0)
}

func StartPG(t *testing.T, ctx context.Context, opts StartPGOpts) (tc.Container, pgx.ConnConfig) {
t.Helper()
args := []tc.ContainerCustomizer{
Expand Down
9 changes: 4 additions & 5 deletions pkg/eventwriter/callback_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm
buf = make([]*changeset.Changeset, a.batchSize)
i = 0
case msg := <-a.cs:
buf[i] = msg
i++
// Send this batch after at least 5 seconds
timer.Reset(a.batchTimeout)
if i == a.batchSize {
// send this batch, as we're full.
if err := a.onChangeset(buf); err == nil {
Expand All @@ -92,11 +96,6 @@ func (a *cbWriter) Listen(ctx context.Context, committer changeset.WatermarkComm
i = 0
continue
}
// Appoend the
buf[i] = msg
i++
// Send this batch after at least 5 seconds
timer.Reset(a.batchTimeout)
}
}
}()
Expand Down

0 comments on commit fbe5747

Please sign in to comment.