diff --git a/sqlserver/sqlserver.go b/sqlserver/sqlserver.go index 546854d..e763c33 100644 --- a/sqlserver/sqlserver.go +++ b/sqlserver/sqlserver.go @@ -78,11 +78,13 @@ func (s *SQLServer) Close() error { if err := s.conn.Close(); err != nil { return fmt.Errorf("failed to close connection: %w", err) } + return nil } // Add the message to the data store func (s *SQLServer) Add(ctx context.Context, evt *outboxer.OutboxMessage) error { + // nolint query := fmt.Sprintf(`INSERT INTO [%s].[%s] (payload, options, headers) VALUES (@p1, @p2, @p3)`, s.SchemaName, s.EventStoreTable) if _, err := s.conn.ExecContext(ctx, query, evt.Payload, checkBinaryParam(evt.Options), checkBinaryParam(evt.Headers)); err != nil { return fmt.Errorf("failed to insert message into the data store: %w", err) @@ -103,6 +105,7 @@ func (s *SQLServer) AddWithinTx(ctx context.Context, evt *outboxer.OutboxMessage return err } + // nolint query := fmt.Sprintf(`INSERT INTO [%s].[%s] (payload, options, headers) VALUES (@p1, @p2, @p3)`, s.SchemaName, s.EventStoreTable) if _, err := tx.ExecContext(ctx, query, evt.Payload, checkBinaryParam(evt.Options), checkBinaryParam(evt.Headers)); err != nil { @@ -123,11 +126,13 @@ func checkBinaryParam(p outboxer.DynamicValues) outboxer.DynamicValues { if p == nil { return map[string]interface{}{} } + return p } // SetAsDispatched sets one message as dispatched func (s *SQLServer) SetAsDispatched(ctx context.Context, id int64) error { + // nolint query := fmt.Sprintf(` UPDATE [%s].[%s] SET @@ -162,7 +167,7 @@ WHERE id IN "dispatched_at" < @p1 ) ` - + // nolint query := fmt.Sprintf(q, s.SchemaName, s.EventStoreTable, batchSize) if _, err := tx.ExecContext(ctx, query, dispatchedBefore); err != nil { tx.Rollback() @@ -179,18 +184,22 @@ WHERE id IN // GetEvents retrieves all the relevant events func (s *SQLServer) GetEvents(ctx context.Context, batchSize int32) ([]*outboxer.OutboxMessage, error) { var events []*outboxer.OutboxMessage + // nolint + rows, err := s.conn.QueryContext(ctx, fmt.Sprintf("SELECT TOP %d * FROM [%s].[%s] WHERE dispatched = 0", + batchSize, s.SchemaName, s.EventStoreTable)) - rows, err := s.conn.QueryContext(ctx, fmt.Sprintf("SELECT TOP %d * FROM [%s].[%s] WHERE dispatched = 0", batchSize, s.SchemaName, s.EventStoreTable)) if err != nil { return events, fmt.Errorf("failed to get messages from the store: %w", err) } for rows.Next() { var e outboxer.OutboxMessage + err = rows.Scan(&e.ID, &e.Dispatched, &e.DispatchedAt, &e.Payload, &e.Options, &e.Headers) if err != nil { return events, fmt.Errorf("failed to scan message: %w", err) } + events = append(events, &e) } @@ -218,6 +227,7 @@ func (s *SQLServer) lock(ctx context.Context) error { } s.isLocked = true + return nil } @@ -239,7 +249,9 @@ func (s *SQLServer) unlock(ctx context.Context) error { if _, err := s.conn.ExecContext(ctx, query, aid); err != nil { return err } + s.isLocked = false + return nil } @@ -257,7 +269,7 @@ func (s *SQLServer) ensureTable(ctx context.Context) (err error) { } } }() - + // nolint query := fmt.Sprintf( `IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='%[2]s' and xtype='U') CREATE TABLE %[1]s.%[2]s ( id int IDENTITY(1,1) NOT NULL PRIMARY KEY, diff --git a/sqlserver/sqlserver_example_test.go b/sqlserver/sqlserver_example_test.go index 62e795e..295c7e6 100644 --- a/sqlserver/sqlserver_example_test.go +++ b/sqlserver/sqlserver_example_test.go @@ -24,5 +24,6 @@ func ExampleSQLServer() { fmt.Printf("failed to setup the data store: %s", err) return } + defer ds.Close() } diff --git a/sqlserver/sqlserver_test.go b/sqlserver/sqlserver_test.go index 0286a32..7f0ea8d 100644 --- a/sqlserver/sqlserver_test.go +++ b/sqlserver/sqlserver_test.go @@ -15,16 +15,21 @@ import ( func TestSQLServer_WithInstance_must_return_SQLServerDataStore(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) if err != nil { t.Fatalf("failed to setup the data store: %s", err) } + defer ds.Close() if ds.SchemaName != "test_schema" { @@ -37,12 +42,16 @@ func TestSQLServer_WithInstance_must_return_SQLServerDataStore(t *testing.T) { } func TestSQLServer_WithInstance_should_return_error_when_no_db_selected(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + defer db.Close() + mock.ExpectQuery(`SELECT DB_NAME() `). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME()"}).AddRow("")) @@ -54,16 +63,21 @@ func TestSQLServer_WithInstance_should_return_error_when_no_db_selected(t *testi func TestSQLServer_WithInstance_should_return_error_when_no_schema_selected(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + defer db.Close() + mock.ExpectQuery(`SELECT DB_NAME() `). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME()"}).AddRow("test")) mock.ExpectQuery(`SELECT SCHEMA_NAME()`). WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME()"}).AddRow("")) + _, err = WithInstance(ctx, db) if err != ErrNoSchema { t.Fatalf("Expected ErrNoSchema to be returned when no schema selected : %s", err) @@ -72,18 +86,27 @@ func TestSQLServer_WithInstance_should_return_error_when_no_schema_selected(t *t func TestSQLServer_should_add_message(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + if err != nil { + t.Fatalf("failed to setup the data store: %s", err) + } + defer ds.Close() mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO [test_schema].[event_store]`)). WillReturnResult(sqlmock.NewResult(0, 1)) + if err := ds.Add(ctx, &outboxer.OutboxMessage{ Payload: []byte("test payload"), }); err != nil { @@ -94,21 +117,30 @@ func TestSQLServer_should_add_message(t *testing.T) { func TestSQLServer_should_add_message_with_tx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + if err != nil { + t.Fatalf("failed to setup the data store: %s", err) + } defer ds.Close() + mock.ExpectBegin() mock.ExpectExec(`SELECT (.+) from event_store`). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO [test_schema].[event_store]`)). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() + fn := func(tx outboxer.ExecerContext) error { _, err := tx.ExecContext(ctx, "SELECT * from event_store") return err @@ -124,21 +156,30 @@ func TestSQLServer_should_add_message_with_tx(t *testing.T) { func TestSQLServer_add_message_with_tx_should_rollback_on_error(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + if err != nil { + t.Fatalf("failed to setup the data store: %s", err) + } defer ds.Close() + mock.ExpectBegin() mock.ExpectExec(`SELECT (.+) from event_store`). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO [test_schema].[event_store]`)). WillReturnError(errors.New("Failed to insert")) mock.ExpectRollback() + fn := func(tx outboxer.ExecerContext) error { _, err := tx.ExecContext(ctx, "SELECT * from event_store") return err @@ -154,13 +195,20 @@ func TestSQLServer_add_message_with_tx_should_rollback_on_error(t *testing.T) { func TestSQLServer_should_get_events(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + if err != nil { + t.Fatalf("failed to setup the data store: %s", err) + } defer ds.Close() @@ -181,13 +229,20 @@ func TestSQLServer_should_get_events(t *testing.T) { func TestSQLServer_should_set_as_dispatched(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + if err != nil { + t.Fatalf("failed to setup the data store: %s", err) + } defer ds.Close() @@ -199,29 +254,36 @@ func TestSQLServer_should_set_as_dispatched(t *testing.T) { if err != nil { t.Fatalf("failed to set message as dispatched: %s", err) } - } func TestSQLServer_should_remove_messages(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + db, mock, err := sqlmock.New() if err != nil { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + if err != nil { + t.Fatalf("failed to setup the data store: %s", err) + } defer ds.Close() + mock.ExpectBegin() mock.ExpectExec(regexp.QuoteMeta(`DELETE FROM [test_schema].[event_store] WHERE id IN`)). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() + if err := ds.Remove(ctx, time.Now(), 10); err != nil { t.Fatalf("failed to remove messages: %s", err) } - } func initDatastoreMock(t *testing.T, mock sqlmock.Sqlmock) { mock.ExpectQuery(`SELECT DB_NAME() `).