diff --git a/README.md b/README.md index cbdd569..02b84ac 100644 --- a/README.md +++ b/README.md @@ -3,11 +3,11 @@ Harmful workload generator for PostgreSQL. - idle transactions - rollbacks - waiting transactions +- deadlocks - see built-in help for more runtime options. **ATTENTION: Use only for testing purposes, don't execute against production, reckless usage might cause problems.** --- #### TODO -- deadlocks - temporary files diff --git a/app/app.go b/app/app.go index 6dbb065..902798a 100644 --- a/app/app.go +++ b/app/app.go @@ -45,6 +45,18 @@ func Start(ctx context.Context, c *Config) error { }() } + if c.Deadlocks { + wg.Add(1) + go func() { + defer wg.Done() + + err := runDeadlocksWorkload(ctx, c) + if err != nil { + log.Errorf("deadlocks workload failed: %s", err) + } + }() + } + wg.Wait() return nil diff --git a/app/cmd/main.go b/app/cmd/main.go index 4cf12eb..7866a47 100644 --- a/app/cmd/main.go +++ b/app/cmd/main.go @@ -29,6 +29,7 @@ func main() { waitXacts = kingpin.Flag("wait-xacts", "Run idle transactions workload").Default("false").Envar("NOISIA_IDLE_XACTS").Bool() waitXactsLocktimeMin = kingpin.Flag("wait-xacts.locktime-min", "Min transactions locking time, in seconds").Default("5").Envar("NOISIA_WAIT_XACTS_LOCKTIME_MIN").Int() waitXactsLocktimeMax = kingpin.Flag("wait-xacts.locktime-max", "Max transactions locking time, in seconds").Default("20").Envar("NOISIA_WAIT_XACTS_LOCKTIME_MAX").Int() + deadlocks = kingpin.Flag("deadlocks", "Run deadlocks workload").Default("false").Envar("NOISIA_DEADLOCKS").Bool() ) kingpin.Parse() log.SetLevel(*logLevel) @@ -49,6 +50,7 @@ func main() { WaitXacts: *waitXacts, WaitXactsLocktimeMin: *waitXactsLocktimeMin, WaitXactsLocktimeMax: *waitXactsLocktimeMax, + Deadlocks: *deadlocks, } if err := config.Validate(); err != nil { diff --git a/app/config.go b/app/config.go index 213bf11..0e0097d 100644 --- a/app/config.go +++ b/app/config.go @@ -15,6 +15,7 @@ type Config struct { WaitXacts bool WaitXactsLocktimeMin int WaitXactsLocktimeMax int + Deadlocks bool } func (c *Config) Validate() error { @@ -30,7 +31,7 @@ func (c *Config) Validate() error { return errors.New("wrong 'wait-xact.locktime-min' or 'wait-xact.locktime-max' specified") } - if c.WaitXacts && c.Jobs < 2 { + if (c.WaitXacts || c.Deadlocks) && c.Jobs < 2 { return errors.New("insufficient 'jobs' for this workload") } diff --git a/app/config_test.go b/app/config_test.go index ac3d97c..366fb10 100644 --- a/app/config_test.go +++ b/app/config_test.go @@ -19,6 +19,7 @@ func TestConfig_Validate(t *testing.T) { {config: Config{PostgresConninfo: "127.0.0.1", WaitXacts: true, WaitXactsLocktimeMin: 10, WaitXactsLocktimeMax: 5}, valid: false}, {config: Config{PostgresConninfo: "127.0.0.1", WaitXacts: true, WaitXactsLocktimeMin: 10, WaitXactsLocktimeMax: 5}, valid: false}, {config: Config{PostgresConninfo: "127.0.0.1", WaitXacts: true, Jobs: 1, WaitXactsLocktimeMin: 10, WaitXactsLocktimeMax: 20}, valid: false}, + {config: Config{PostgresConninfo: "127.0.0.1", Deadlocks: true, Jobs: 1, WaitXactsLocktimeMin: 10, WaitXactsLocktimeMax: 20}, valid: false}, } for _, tc := range testcases { if tc.valid { diff --git a/app/deadlocks.go b/app/deadlocks.go new file mode 100644 index 0000000..367e021 --- /dev/null +++ b/app/deadlocks.go @@ -0,0 +1,135 @@ +package app + +import ( + "context" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/lesovsky/noisia/app/internal/log" + "math/rand" + "sync" + "time" +) + +func runDeadlocksWorkload(ctx context.Context, config *Config) error { + log.Infoln("Starting idle transactions workload") + + // connect to postgres + pool, err := pgxpool.Connect(context.Background(), config.PostgresConninfo) + if err != nil { + return err + } + defer pool.Close() + + // prepare workload + if err := prepareDeadlocksWorkload(ctx, pool); err != nil { + return err + } + + // keep specified number of workers using channel - run new workers until there is any free slot + guard := make(chan struct{}, config.Jobs) + for { + select { + // run workers only when it's possible to write into channel (channel is limited by number of jobs) + case guard <- struct{}{}: + go func() { + log.Debugln("starting a pair of deadlock-producing transactions") + err := executeDeadlock(context.Background(), pool) + if err != nil { + log.Errorln(err) + } + + // when worker finished, read from the channel to allow starting another workers + <-guard + }() + case <-ctx.Done(): + log.Info("exit signaled, stop waiting transaction workload") + // TODO: cleanup is not working - workload table still exists in the database (no suspicious logs) + return cleanupDeadlocksWorkload(ctx, pool) + } + } +} + +func prepareDeadlocksWorkload(ctx context.Context, pool *pgxpool.Pool) error { + tx, err := pool.Begin(ctx) + if err != nil { + return err + } + defer func() { + if err := tx.Commit(ctx); err != nil { + log.Warnln(err) + } + }() + + _, err = tx.Exec(ctx, "CREATE TABLE IF NOT EXISTS noisia_deadlocks_workload (id bigint, payload text)") + if err != nil { + return err + } + + // return with no explicit commit, transaction will be committed using 'defer' construction + return nil +} + +func cleanupDeadlocksWorkload(ctx context.Context, pool *pgxpool.Pool) error { + _, err := pool.Exec(ctx, "DROP TABLE noisia_deadlocks_workload") + if err != nil { + return err + } + return nil +} + +func executeDeadlock(ctx context.Context, pool *pgxpool.Pool) error { + // insert two rows + id1, id2 := rand.Int(), rand.Int() + _, err := pool.Exec(ctx, "INSERT INTO noisia_deadlocks_workload (id, payload) VALUES ($1, md5(random()::text)), ($2, md5(random()::text))", id1, id2) + if err != nil { + return err + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + if err := runUpdateXact(ctx, pool, id1, id2); err != nil { + log.Debugln(err) + } + wg.Done() + }() + + wg.Add(1) + go func() { + if err := runUpdateXact(ctx, pool, id2, id1); err != nil { + log.Debugln(err) + } + wg.Done() + }() + + wg.Wait() + return nil +} + +func runUpdateXact(ctx context.Context, pool *pgxpool.Pool, id1 int, id2 int) error { + tx, err := pool.Begin(ctx) + if err != nil { + return nil + } + defer func() { + if err := tx.Commit(ctx); err != nil { + log.Debugln(err) + } + }() + + // update row 1 + _, err = tx.Exec(ctx, "UPDATE noisia_deadlocks_workload SET payload = md5(random()::text) WHERE id = $1", id1) + if err != nil { + return err + } + + time.Sleep(10 * time.Millisecond) + + // update row 2 by tx 1 + _, err = tx.Exec(ctx, "UPDATE noisia_deadlocks_workload SET payload = md5(random()::text) WHERE id = $1", id2) + if err != nil { + return err + } + + return nil +}