Skip to content

Commit

Permalink
feat: async task: send verification email on user sign up
Browse files Browse the repository at this point in the history
  • Loading branch information
hyeonjungko committed Nov 18, 2023
1 parent aac2956 commit d147720
Show file tree
Hide file tree
Showing 15 changed files with 377 additions and 109 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,7 @@ proto:
evans:
evans --host localhost --port 9090 -r repl

.PHONY: postgres createdb dropdb migrateup migratedown migrateup1 migratedown1 db_docs db_schema sqlc test server mock proto evans
redis:
docker run --name redis -p 6379:6379 -d redis:7.2.3-alpine

.PHONY: postgres createdb dropdb migrateup migratedown migrateup1 migratedown1 db_docs db_schema sqlc test server mock proto evans redis
3 changes: 2 additions & 1 deletion app.env
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ HTTP_SERVER_ADDRESS=0.0.0.0:8080
GRPC_SERVER_ADDRESS=0.0.0.0:9090
TOKEN_SYMMETRIC_KEY=01234567890123456789012345678901
ACCESS_TOKEN_DURATION=15m
REFRESH_TOKEN_DURATION=24h
REFRESH_TOKEN_DURATION=24h
REDIS_ADDRESS=0.0.0.0:6379
15 changes: 15 additions & 0 deletions db/mock/store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 1 addition & 84 deletions db/sqlc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type Store interface {
Querier
TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error)
CreateUserTx(ctx context.Context, arg CreateUserTxParams) (CreateUserTxResult, error)
}

// SQLStore provides all functions to execute SQL queries and transactions
Expand Down Expand Up @@ -42,87 +43,3 @@ func (store *SQLStore) execTx(ctx context.Context, fn func(*Queries) error) erro

return tx.Commit()
}

// TransferTxParams contains all input parameters of the transfer transaction
type TransferTxParams struct {
FromAccountID int64 `json:from_account_id`
ToAccountID int64 `json:to_account_id`
Amount int64 `json:amount`
}

// TransferTxResult is the result of the transfer transaction
type TransferTxResult struct {
Transfer Transfer `json:transfer`
FromAccount Account `json:from_account`
ToAccount Account `json:to_account`
FromEntry Entry `json:from_entry`
ToEntry Entry `json:to_entry`
}

// TransferTx performs a money transfer from one account to the other
// It creates a transfer record, add account entries, and update accounts' balance within a single DB transaction
func (store *SQLStore) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult

err := store.execTx(ctx, func(q *Queries) error {
var err error

result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountID,
ToAccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}

result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}

result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}

if arg.FromAccountID < arg.ToAccountID {
result.FromAccount, result.ToAccount, err = addMoney(ctx, q, arg.FromAccountID, -arg.Amount, arg.ToAccountID, arg.Amount)
} else {
result.ToAccount, result.FromAccount, err = addMoney(ctx, q, arg.ToAccountID, arg.Amount, arg.FromAccountID, -arg.Amount)
}

return nil
})

return result, err
}

func addMoney(
ctx context.Context,
q *Queries,
accountID1 int64,
amount1 int64,
accountID2 int64,
amount2 int64,
) (account1 Account, account2 Account, err error) {
account1, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID1,
Amount: amount1,
})
if err != nil {
return
}

account2, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID2,
Amount: amount2,
})
return
}
29 changes: 29 additions & 0 deletions db/sqlc/tx_create_user.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package db

import "context"

type CreateUserTxParams struct {
CreateUserParams
AfterCreate func(user User) error
}

type CreateUserTxResult struct {
User User
}

func (store *SQLStore) CreateUserTx(ctx context.Context, arg CreateUserTxParams) (CreateUserTxResult, error) {
var result CreateUserTxResult

err := store.execTx(ctx, func(q *Queries) error {
var err error

result.User, err = q.CreateUser(ctx, arg.CreateUserParams)
if err != nil {
return err
}

return arg.AfterCreate(result.User)
})

return result, err
}
87 changes: 87 additions & 0 deletions db/sqlc/tx_transfer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package db

import "context"

// TransferTxParams contains all input parameters of the transfer transaction
type TransferTxParams struct {
FromAccountID int64 `json:from_account_id`
ToAccountID int64 `json:to_account_id`
Amount int64 `json:amount`
}

// TransferTxResult is the result of the transfer transaction
type TransferTxResult struct {
Transfer Transfer `json:transfer`
FromAccount Account `json:from_account`
ToAccount Account `json:to_account`
FromEntry Entry `json:from_entry`
ToEntry Entry `json:to_entry`
}

// TransferTx performs a money transfer from one account to the other
// It creates a transfer record, add account entries, and update accounts' balance within a single DB transaction
func (store *SQLStore) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult

err := store.execTx(ctx, func(q *Queries) error {
var err error

result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams{
FromAccountID: arg.FromAccountID,
ToAccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}

result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}

result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}

if arg.FromAccountID < arg.ToAccountID {
result.FromAccount, result.ToAccount, err = addMoney(ctx, q, arg.FromAccountID, -arg.Amount, arg.ToAccountID, arg.Amount)
} else {
result.ToAccount, result.FromAccount, err = addMoney(ctx, q, arg.ToAccountID, arg.Amount, arg.FromAccountID, -arg.Amount)
}

return nil
})

return result, err
}

func addMoney(
ctx context.Context,
q *Queries,
accountID1 int64,
amount1 int64,
accountID2 int64,
amount2 int64,
) (account1 Account, account2 Account, err error) {
account1, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID1,
Amount: amount1,
})
if err != nil {
return
}

account2, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID2,
Amount: amount2,
})
return
}
32 changes: 25 additions & 7 deletions gapi/rpc_create_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package gapi

import (
"context"
"time"

"github.com/hibiken/asynq"
db "github.com/hyeonjungko/bankofasia/db/sqlc"
"github.com/hyeonjungko/bankofasia/pb"
"github.com/hyeonjungko/bankofasia/util"
"github.com/hyeonjungko/bankofasia/val"
"github.com/hyeonjungko/bankofasia/worker"
"github.com/lib/pq"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
Expand All @@ -25,14 +28,29 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest)
return nil, status.Errorf(codes.Internal, "failed to hash password: %s", err)
}

arg := db.CreateUserParams{
Username: req.GetUsername(),
HashedPassword: hashedPassword,
FullName: req.GetFullName(),
Email: req.GetEmail(),
arg := db.CreateUserTxParams{
CreateUserParams: db.CreateUserParams{
Username: req.GetUsername(),
HashedPassword: hashedPassword,
FullName: req.GetFullName(),
Email: req.GetEmail(),
},
AfterCreate: func(user db.User) error {
taskPayload := &worker.PayloadSendVerifyEmail{
Username: user.Username,
}

opts := []asynq.Option{
asynq.MaxRetry(10),
asynq.ProcessIn(10 * time.Second),
asynq.Queue(worker.QueueCritical),
}

return server.taskDistributor.DistributeTaskSendVerifyEmail(ctx, taskPayload, opts...)
},
}

user, err := server.store.CreateUser(ctx, arg)
txResult, err := server.store.CreateUserTx(ctx, arg)
if err != nil {
if pqErr, ok := err.(*pq.Error); ok {
switch pqErr.Code.Name() {
Expand All @@ -44,7 +62,7 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest)
}

rsp := &pb.CreateUserResponse{
User: convertUser(user),
User: convertUser(txResult.User),
}

return rsp, nil
Expand Down
17 changes: 10 additions & 7 deletions gapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,30 @@ import (
"github.com/hyeonjungko/bankofasia/pb"
"github.com/hyeonjungko/bankofasia/token"
"github.com/hyeonjungko/bankofasia/util"
"github.com/hyeonjungko/bankofasia/worker"
)

// Server serves gRPC requests
type Server struct {
pb.UnimplementedBankOfAsiaServer
config util.Config
store db.Store
tokenMaker token.Maker
config util.Config
store db.Store
tokenMaker token.Maker
taskDistributor worker.TaskDistributor
}

// NewServer creates new gRPC server
func NewServer(config util.Config, store db.Store) (*Server, error) {
func NewServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) (*Server, error) {
tokenMaker, err := token.NewPasetoMaker(config.TokenSymmetricKey)
if err != nil {
return nil, fmt.Errorf("cannot create token maker: %w", err)
}

server := &Server{
config: config,
store: store,
tokenMaker: tokenMaker,
config: config,
store: store,
tokenMaker: tokenMaker,
taskDistributor: taskDistributor,
}

return server, nil
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ require (
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/golang-migrate/migrate/v4 v4.16.2
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/google/uuid v1.4.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0
github.com/hibiken/asynq v0.24.1
github.com/lib/pq v1.10.9
github.com/o1egl/paseto v1.0.0
github.com/rs/zerolog v1.31.0
Expand All @@ -28,8 +29,10 @@ require (
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect
github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
Expand All @@ -53,6 +56,8 @@ require (
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.3.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand All @@ -65,6 +70,7 @@ require (
golang.org/x/net v0.15.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.4.0 // indirect
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit d147720

Please sign in to comment.