-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* chore: update dependencies * feat: lambda boilerplate for migration * chore: update dependencies * infra: add deployment files * feat: add mongo functions to get collections * feat: add postgres client * feat: get mongo and postgres apps * feat: add bson tag to structs * feat: insert collections * refactor: postgres client name * feat: get both db documents and compare them * refactor: method identifier * feat: add convertions from repository to mongo model * feat: add Collection type * fix: model types * feat: migrate postgres apps to mongo * fix: change default collections to write * feat: add field to write * fix: only write documents if there's any to write * infra: disable vpc lambda creation
- Loading branch information
1 parent
12251e2
commit 0c750b7
Showing
10 changed files
with
560 additions
and
179 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
156 changes: 156 additions & 0 deletions
156
mongo-migration/cmd/migrate-postgres-to-mongo/lambda/main.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"net/http" | ||
|
||
postgresgateway "github.com/Pocket/global-services/mongo-migration/postgres_client" | ||
"github.com/Pocket/global-services/shared/apigateway" | ||
"github.com/Pocket/global-services/shared/database" | ||
"github.com/Pocket/global-services/shared/gateway/models" | ||
logger "github.com/Pocket/global-services/shared/logger" | ||
"github.com/Pocket/global-services/shared/utils" | ||
"github.com/aws/aws-lambda-go/events" | ||
"github.com/aws/aws-lambda-go/lambda" | ||
log "github.com/sirupsen/logrus" | ||
|
||
"github.com/pokt-foundation/portal-api-go/repository" | ||
"github.com/pokt-foundation/utils-go/environment" | ||
) | ||
|
||
var ( | ||
postgresClientUrl = environment.MustGetString("POSTGRES_CLIENT_URL") | ||
authenticationToken = environment.MustGetString("AUTHENTICATION_TOKEN") | ||
mongoConnectionString = environment.MustGetString("MONGO_CONNECTION_STRING") | ||
mongoDatabase = environment.GetString("MONGO_DATABASE", "gateway") | ||
) | ||
|
||
// LambdaHandler manages the process of getting the postgres apps migrated to mongo | ||
func LambdaHandler(ctx context.Context) (events.APIGatewayProxyResponse, error) { | ||
appsCount, lbsCount, err := migrateToMongo(ctx) | ||
|
||
if err != nil { | ||
return *apigateway.NewJSONResponse(http.StatusOK, map[string]any{ | ||
"err": err.Error(), | ||
}), nil | ||
} | ||
return *apigateway.NewJSONResponse(http.StatusOK, map[string]any{ | ||
"newApps": appsCount, | ||
"newLbs": lbsCount, | ||
}), nil | ||
} | ||
|
||
func main() { | ||
lambda.Start(LambdaHandler) | ||
} | ||
|
||
func migrateToMongo(ctx context.Context) (int, int, error) { | ||
mongo, err := database.ClientFromURI(ctx, mongoConnectionString, mongoDatabase) | ||
if err != nil { | ||
return 0, 0, errors.New("error connecting to mongo: " + err.Error()) | ||
} | ||
|
||
postgres := postgresgateway.NewPostgresClient(postgresClientUrl, authenticationToken) | ||
|
||
mongoApps, postgresApps, err := getApplications(ctx, mongo, postgres) | ||
if err != nil { | ||
return 0, 0, err | ||
} | ||
mongoLBs, postgresLBs, err := getLoadBalancers(ctx, mongo, postgres) | ||
if err != nil { | ||
return 0, 0, err | ||
} | ||
|
||
appsToWrite := convertRepositoryToMongo(getItemsNotInMongo(postgresApps, mongoApps), models.RepositoryToModelApp) | ||
lbsToWrite := convertRepositoryToMongo(getItemsNotInMongo(postgresLBs, mongoLBs), models.RepositoryToModelLoadBalancer) | ||
|
||
if len(appsToWrite) > 0 { | ||
err = mongo.InsertMany(ctx, database.ApplicationCollection, appsToWrite) | ||
if err != nil { | ||
return 0, 0, err | ||
} | ||
} | ||
|
||
if len(lbsToWrite) > 0 { | ||
err = mongo.InsertMany(ctx, database.LoadBalancersCollection, lbsToWrite) | ||
if err != nil { | ||
return 0, 0, err | ||
} | ||
} | ||
|
||
return len(appsToWrite), len(lbsToWrite), nil | ||
} | ||
|
||
func getApplications(ctx context.Context, mongo *database.Mongo, postgres *postgresgateway.Client) (map[string]*models.Application, map[string]*repository.Application, error) { | ||
mongoAppsArr, err := mongo.GetApplications(ctx) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
postgresAppsArr, err := postgres.GetAllApplications() | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
mongoApps := utils.SliceToMappedStruct(mongoAppsArr, func(app *models.Application) string { | ||
return app.ID.Hex() | ||
}) | ||
|
||
postgresApps := utils.SliceToMappedStruct(postgresAppsArr, func(app *repository.Application) string { | ||
return app.ID | ||
}) | ||
|
||
return mongoApps, postgresApps, nil | ||
} | ||
|
||
func getLoadBalancers(ctx context.Context, mongo *database.Mongo, postgres *postgresgateway.Client) (map[string]*models.LoadBalancer, map[string]*repository.LoadBalancer, error) { | ||
mongoLBsArr, err := mongo.GetLoadBalancers(ctx) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
postgresLBsArr, err := postgres.GetAllLoadBalancers() | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
mongoLBs := utils.SliceToMappedStruct(mongoLBsArr, func(lb *models.LoadBalancer) string { | ||
return lb.ID.Hex() | ||
}) | ||
|
||
postgresLBs := utils.SliceToMappedStruct(postgresLBsArr, func(lb *repository.LoadBalancer) string { | ||
return lb.ID | ||
}) | ||
|
||
return mongoLBs, postgresLBs, nil | ||
} | ||
|
||
func getItemsNotInMongo[T any, V any](pgItems map[string]*T, mongoItems map[string]*V) []T { | ||
items := make([]T, 0) | ||
for key, item := range pgItems { | ||
_, ok := mongoItems[key] | ||
if ok { | ||
continue | ||
} | ||
items = append(items, *item) | ||
} | ||
return items | ||
} | ||
|
||
func convertRepositoryToMongo[T any, V any](items []T, convertFn func(*T) (V, error)) []any { | ||
convertedItems := make([]any, 0) | ||
for _, item := range items { | ||
convertedItem, err := convertFn(&item) | ||
if err != nil { | ||
logger.Log.WithFields(log.Fields{ | ||
"error": err.Error(), | ||
}).Error("could not convert repository item to a mongo type") | ||
continue | ||
} | ||
|
||
convertedItems = append(convertedItems, convertedItem) | ||
} | ||
|
||
return convertedItems | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package postgresgateway | ||
|
||
import ( | ||
"encoding/json" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/pokt-foundation/portal-api-go/repository" | ||
) | ||
|
||
type GatewayPostgresRoutes string | ||
|
||
const ( | ||
GetAllBlockchains GatewayPostgresRoutes = "/blockchain" | ||
GetAllApplications GatewayPostgresRoutes = "/application" | ||
GetAllLoadBalancers GatewayPostgresRoutes = "/load_balancer" | ||
) | ||
|
||
type Client struct { | ||
client *http.Client | ||
url string | ||
authenticationToken string | ||
} | ||
|
||
func NewPostgresClient(url, authenticationToken string) *Client { | ||
return &Client{ | ||
client: &http.Client{ | ||
Timeout: 20 * time.Second, | ||
}, | ||
url: url, | ||
authenticationToken: authenticationToken, | ||
} | ||
} | ||
|
||
func (cl *Client) GetAllLoadBalancers() ([]*repository.LoadBalancer, error) { | ||
return getAllItems[repository.LoadBalancer](cl.url+string(GetAllLoadBalancers), cl.client, cl.authenticationToken) | ||
} | ||
|
||
func (cl *Client) GetAllApplications() ([]*repository.Application, error) { | ||
return getAllItems[repository.Application](cl.url+string(GetAllApplications), cl.client, cl.authenticationToken) | ||
} | ||
|
||
func (cl *Client) GetAllBlockchains() ([]*repository.Blockchain, error) { | ||
return getAllItems[repository.Blockchain](cl.url+string(GetAllBlockchains), cl.client, cl.authenticationToken) | ||
} | ||
|
||
func getAllItems[T any](path string, client *http.Client, authToken string) ([]*T, error) { | ||
var items []*T | ||
|
||
req, err := http.NewRequest(http.MethodGet, path, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
req.Header.Set("Authorization", authToken) | ||
|
||
res, err := client.Do(req) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return items, json.NewDecoder(res.Body).Decode(&items) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.