From db1788601db95c5a8e07ecc400673d8688b04bae Mon Sep 17 00:00:00 2001 From: M4ver1k Date: Sat, 12 Oct 2019 18:12:39 +0530 Subject: [PATCH] Issue #6 | Add SqlServer support | Adarsh --- sqlserver/sqlserver.go | 277 ++++++++++++++++++++++++++++ sqlserver/sqlserver_example_test.go | 28 +++ sqlserver/sqlserver_test.go | 258 ++++++++++++++++++++++++++ 3 files changed, 563 insertions(+) create mode 100644 sqlserver/sqlserver.go create mode 100644 sqlserver/sqlserver_example_test.go create mode 100644 sqlserver/sqlserver_test.go diff --git a/sqlserver/sqlserver.go b/sqlserver/sqlserver.go new file mode 100644 index 0000000..546854d --- /dev/null +++ b/sqlserver/sqlserver.go @@ -0,0 +1,277 @@ +package sqlserver + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/italolelis/outboxer" + "github.com/italolelis/outboxer/lock" +) + +const ( + // DefaultEventStoreTable is the default table name + DefaultEventStoreTable = "event_store" +) + +var ( + // ErrLocked is used when we can't acquire an explicit lock + ErrLocked = errors.New("can't acquire lock") + + // ErrNoDatabaseName is used when the database name is blank + ErrNoDatabaseName = errors.New("no database name") + + // ErrNoSchema is used when the schema name is blank + ErrNoSchema = errors.New("no schema") +) + +// SQLServer implementation of the data store +type SQLServer struct { + conn *sql.Conn + isLocked bool + + SchemaName string + DatabaseName string + EventStoreTable string +} + +// WithInstance creates a SQLServer data store with an existing db connection +func WithInstance(ctx context.Context, db *sql.DB) (*SQLServer, error) { + conn, err := db.Conn(ctx) + if err != nil { + return nil, err + } + + s := SQLServer{conn: conn} + + if err := conn.QueryRowContext(ctx, `SELECT DB_NAME()`).Scan(&s.DatabaseName); err != nil { + return nil, err + } + + if len(s.DatabaseName) == 0 { + return nil, ErrNoDatabaseName + } + + if err := conn.QueryRowContext(ctx, `SELECT SCHEMA_NAME()`).Scan(&s.SchemaName); err != nil { + return nil, err + } + + if len(s.SchemaName) == 0 { + return nil, ErrNoSchema + } + + if len(s.EventStoreTable) == 0 { + s.EventStoreTable = DefaultEventStoreTable + } + + if err := s.ensureTable(ctx); err != nil { + return nil, err + } + + return &s, nil +} + +// Close closes the db connection +func (s *SQLServer) Close() error { + if err := s.conn.Close(); err != nil { + return fmt.Errorf("failed to close connection: %w", err) + } + return nil +} + +// Add the message to the data store +func (s *SQLServer) Add(ctx context.Context, evt *outboxer.OutboxMessage) error { + query := fmt.Sprintf(`INSERT INTO [%s].[%s] (payload, options, headers) VALUES (@p1, @p2, @p3)`, s.SchemaName, s.EventStoreTable) + if _, err := s.conn.ExecContext(ctx, query, evt.Payload, checkBinaryParam(evt.Options), checkBinaryParam(evt.Headers)); err != nil { + return fmt.Errorf("failed to insert message into the data store: %w", err) + } + + return nil +} + +// AddWithinTx creates a transaction and then tries to execute anything within it +func (s *SQLServer) AddWithinTx(ctx context.Context, evt *outboxer.OutboxMessage, fn func(outboxer.ExecerContext) error) error { + tx, err := s.conn.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return fmt.Errorf("transaction start failed: %w", err) + } + + err = fn(tx) + if err != nil { + return err + } + + query := fmt.Sprintf(`INSERT INTO [%s].[%s] (payload, options, headers) VALUES (@p1, @p2, @p3)`, s.SchemaName, s.EventStoreTable) + + if _, err := tx.ExecContext(ctx, query, evt.Payload, checkBinaryParam(evt.Options), checkBinaryParam(evt.Headers)); err != nil { + tx.Rollback() + return fmt.Errorf("failed to insert message into the data store: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("transaction commit failed: %w", err) + } + + return nil +} + +//checkBinaryParam is a fix for issue with mssql driver converting nil value in varbinary to nvarchar. +//https://github.com/denisenkom/go-mssqldb/issues/530 +func checkBinaryParam(p outboxer.DynamicValues) outboxer.DynamicValues { + if p == nil { + return map[string]interface{}{} + } + return p +} + +// SetAsDispatched sets one message as dispatched +func (s *SQLServer) SetAsDispatched(ctx context.Context, id int64) error { + query := fmt.Sprintf(` +UPDATE [%s].[%s] +SET + dispatched = 1, + dispatched_at = GETDATE(), + options = null, + headers = null +WHERE id = @p1; +`, s.SchemaName, s.EventStoreTable) + if _, err := s.conn.ExecContext(ctx, query, id); err != nil { + return fmt.Errorf("failed to set message as dispatched: %w", err) + } + + return nil +} + +// Remove removes old messages from the data store +func (s *SQLServer) Remove(ctx context.Context, dispatchedBefore time.Time, batchSize int32) error { + tx, err := s.conn.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return fmt.Errorf("transaction start failed: %w", err) + } + + q := ` +DELETE FROM [%[1]s].[%[2]s] +WHERE id IN +( + SELECT TOP %d id + FROM %[%[1]s].[%[2]s] + WHERE + "dispatched" = true AND + "dispatched_at" < @p1 +) +` + + query := fmt.Sprintf(q, s.SchemaName, s.EventStoreTable, batchSize) + if _, err := tx.ExecContext(ctx, query, dispatchedBefore); err != nil { + tx.Rollback() + return fmt.Errorf("failed to remove messages from the data store: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("transaction commit failed: %w", err) + } + + return nil +} + +// GetEvents retrieves all the relevant events +func (s *SQLServer) GetEvents(ctx context.Context, batchSize int32) ([]*outboxer.OutboxMessage, error) { + var events []*outboxer.OutboxMessage + + rows, err := s.conn.QueryContext(ctx, fmt.Sprintf("SELECT TOP %d * FROM [%s].[%s] WHERE dispatched = 0", batchSize, s.SchemaName, s.EventStoreTable)) + if err != nil { + return events, fmt.Errorf("failed to get messages from the store: %w", err) + } + + for rows.Next() { + var e outboxer.OutboxMessage + err = rows.Scan(&e.ID, &e.Dispatched, &e.DispatchedAt, &e.Payload, &e.Options, &e.Headers) + if err != nil { + return events, fmt.Errorf("failed to scan message: %w", err) + } + events = append(events, &e) + } + + return events, nil +} + +// Lock implements explicit locking +func (s *SQLServer) lock(ctx context.Context) error { + if s.isLocked { + return ErrLocked + } + + aid, err := lock.Generate(s.DatabaseName, s.SchemaName) + if err != nil { + return err + } + + query := `EXEC sp_getapplock + @Resource = @p1, + @LockOwner='Session', + @LockMode = 'Exclusive';` + + if _, err := s.conn.ExecContext(ctx, query, aid); err != nil { + return fmt.Errorf("try lock failed: %w", err) + } + + s.isLocked = true + return nil +} + +// Unlock is the implementation of the unlock for explicit locking +func (s *SQLServer) unlock(ctx context.Context) error { + if !s.isLocked { + return nil + } + + aid, err := lock.Generate(s.DatabaseName, s.SchemaName) + if err != nil { + return err + } + + query := `EXEC sp_releaseapplock + @Resource = @p1, + @LockOwner='Session';` + + if _, err := s.conn.ExecContext(ctx, query, aid); err != nil { + return err + } + s.isLocked = false + return nil +} + +func (s *SQLServer) ensureTable(ctx context.Context) (err error) { + if err = s.lock(ctx); err != nil { + return err + } + + defer func() { + if e := s.unlock(ctx); e != nil { + if err == nil { + err = e + } else { + err = fmt.Errorf("failed to unlock table: %w", err) + } + } + }() + + query := fmt.Sprintf( + `IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='%[2]s' and xtype='U') CREATE TABLE %[1]s.%[2]s ( + id int IDENTITY(1,1) NOT NULL PRIMARY KEY, + dispatched BIT NOT NULL DEFAULT 0, + dispatched_at DATETIME, + payload VARBINARY(MAX) NOT NULL, + options VARBINARY(MAX), + headers VARBINARY(MAX) +); +`, s.SchemaName, s.EventStoreTable) + + if _, err = s.conn.ExecContext(ctx, query); err != nil { + return err + } + + return nil +} diff --git a/sqlserver/sqlserver_example_test.go b/sqlserver/sqlserver_example_test.go new file mode 100644 index 0000000..62e795e --- /dev/null +++ b/sqlserver/sqlserver_example_test.go @@ -0,0 +1,28 @@ +package sqlserver_test + +import ( + "context" + "database/sql" + "fmt" + "os" + + "github.com/italolelis/outboxer/sqlserver" +) + +func ExampleSQLServer() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := sql.Open("sqlserver", os.Getenv("DS_DSN")) + if err != nil { + fmt.Printf("failed to connect to SQLServer: %s", err) + return + } + + ds, err := sqlserver.WithInstance(ctx, db) + if err != nil { + fmt.Printf("failed to setup the data store: %s", err) + return + } + defer ds.Close() +} diff --git a/sqlserver/sqlserver_test.go b/sqlserver/sqlserver_test.go new file mode 100644 index 0000000..0286a32 --- /dev/null +++ b/sqlserver/sqlserver_test.go @@ -0,0 +1,258 @@ +package sqlserver + +import ( + "context" + "errors" + "regexp" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/italolelis/outboxer" + "github.com/italolelis/outboxer/lock" +) + +func TestSQLServer_WithInstance_must_return_SQLServerDataStore(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + if err != nil { + t.Fatalf("failed to setup the data store: %s", err) + } + defer ds.Close() + + if ds.SchemaName != "test_schema" { + t.Errorf("Expected schema name %s but got %s", "test_schema", ds.SchemaName) + } + + if ds.DatabaseName != "test" { + t.Errorf("Expected database name %s but got %s", "test", ds.DatabaseName) + } +} +func TestSQLServer_WithInstance_should_return_error_when_no_db_selected(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + mock.ExpectQuery(`SELECT DB_NAME() `). + WillReturnRows(sqlmock.NewRows([]string{"DB_NAME()"}).AddRow("")) + + _, err = WithInstance(ctx, db) + if err != ErrNoDatabaseName { + t.Fatalf("Expected ErrNoDatabaseName to be returned when no database selected : %s", err) + } +} + +func TestSQLServer_WithInstance_should_return_error_when_no_schema_selected(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + mock.ExpectQuery(`SELECT DB_NAME() `). + WillReturnRows(sqlmock.NewRows([]string{"DB_NAME()"}).AddRow("test")) + mock.ExpectQuery(`SELECT SCHEMA_NAME()`). + WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME()"}).AddRow("")) + _, err = WithInstance(ctx, db) + if err != ErrNoSchema { + t.Fatalf("Expected ErrNoSchema to be returned when no schema selected : %s", err) + } +} +func TestSQLServer_should_add_message(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + + defer ds.Close() + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO [test_schema].[event_store]`)). + WillReturnResult(sqlmock.NewResult(0, 1)) + if err := ds.Add(ctx, &outboxer.OutboxMessage{ + Payload: []byte("test payload"), + }); err != nil { + t.Fatalf("failed to add message in the data store: %s", err) + } +} + +func TestSQLServer_should_add_message_with_tx(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + + defer ds.Close() + mock.ExpectBegin() + mock.ExpectExec(`SELECT (.+) from event_store`). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO [test_schema].[event_store]`)). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + fn := func(tx outboxer.ExecerContext) error { + _, err := tx.ExecContext(ctx, "SELECT * from event_store") + return err + } + + if err := ds.AddWithinTx(ctx, &outboxer.OutboxMessage{ + Payload: []byte("test payload"), + }, fn); err != nil { + t.Fatalf("failed to add message in the data store: %s", err) + } +} + +func TestSQLServer_add_message_with_tx_should_rollback_on_error(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + + defer ds.Close() + mock.ExpectBegin() + mock.ExpectExec(`SELECT (.+) from event_store`). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO [test_schema].[event_store]`)). + WillReturnError(errors.New("Failed to insert")) + mock.ExpectRollback() + fn := func(tx outboxer.ExecerContext) error { + _, err := tx.ExecContext(ctx, "SELECT * from event_store") + return err + } + + if err := ds.AddWithinTx(ctx, &outboxer.OutboxMessage{ + Payload: []byte("test payload"), + }, fn); err == nil { + t.Fatalf("This should fail and rollback: %s", err) + } +} + +func TestSQLServer_should_get_events(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + + defer ds.Close() + + mock.ExpectQuery(regexp.QuoteMeta(`SELECT TOP 10 * FROM [test_schema].[event_store] WHERE dispatched = 0`)). + WillReturnRows(sqlmock.NewRows([]string{"id", "dispatched", "dispatched_at", "payload", "options", "headers"}). + AddRow(1, false, time.Now(), []byte("test payload"), outboxer.DynamicValues{}, outboxer.DynamicValues{})) + + msgs, err := ds.GetEvents(ctx, 10) + if err != nil { + t.Fatalf("failed to retrieve messages from the data store: %s", err) + } + + if len(msgs) != 1 { + t.Fatalf("was expecting 1 message in the data store but got %d", len(msgs)) + } +} + +func TestSQLServer_should_set_as_dispatched(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + + defer ds.Close() + + mock.ExpectExec(regexp.QuoteMeta(`UPDATE [test_schema].[event_store] SET`)). + WithArgs(1). + WillReturnResult(sqlmock.NewResult(0, 1)) + + err = ds.SetAsDispatched(ctx, 1) + if err != nil { + t.Fatalf("failed to set message as dispatched: %s", err) + } + +} + +func TestSQLServer_should_remove_messages(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer db.Close() + initDatastoreMock(t, mock) + ds, err := WithInstance(ctx, db) + + defer ds.Close() + mock.ExpectBegin() + mock.ExpectExec(regexp.QuoteMeta(`DELETE FROM [test_schema].[event_store] WHERE id IN`)). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectCommit() + if err := ds.Remove(ctx, time.Now(), 10); err != nil { + t.Fatalf("failed to remove messages: %s", err) + } + +} +func initDatastoreMock(t *testing.T, mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT DB_NAME() `). + WillReturnRows(sqlmock.NewRows([]string{"DB_NAME()"}).AddRow("test")) + mock.ExpectQuery(`SELECT SCHEMA_NAME()`). + WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME()"}).AddRow("test_schema")) + + initLockMock(t, mock) +} + +func initLockMock(t *testing.T, mock sqlmock.Sqlmock) { + aid, err := lock.Generate("test", "test_schema") + if err != nil { + t.Fatalf("failed to generate the lock value: %s", err) + } + + mock.ExpectExec(`EXEC sp_getapplock + @Resource = @p1, + @LockOwner='Session', + @LockMode = 'Exclusive'; `). + WithArgs(aid). + WillReturnResult(sqlmock.NewResult(0, 1)) + + mock.ExpectExec( + regexp.QuoteMeta(`IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='event_store' and xtype='U') + CREATE TABLE test_schema.event_store`)). + WillReturnResult(sqlmock.NewResult(0, 1)) + + mock.ExpectExec(`EXEC sp_releaseapplock + @Resource = @p1, + @LockOwner='Session'; `). + WithArgs(aid). + WillReturnResult(sqlmock.NewResult(0, 1)) +}