Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MongoDb Activity #328

Merged
merged 6 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 30 additions & 37 deletions activity/mongodb/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/mongodb/mongo-go-driver/bson"
"github.com/mongodb/mongo-go-driver/mongo"
"github.com/TIBCOSoftware/flogo-lib/core/activity"
"github.com/TIBCOSoftware/flogo-lib/logger"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

// ActivityLog is the default logger for the Log Activity
Expand All @@ -31,7 +33,7 @@ const (
ivData = "data"

ovOutput = "output"
ovCount = "count"
ovCount = "count"
)

func init() {
Expand Down Expand Up @@ -72,24 +74,26 @@ func (a *MongoDbActivity) Eval(ctx activity.Context) (done bool, err error) {
//todo implement shared sessions
// client, err := mongo.NewClient(connectionURI)
/*
The above function was giving below error;
"data not inserted topology is closed"
The above function was giving below error;
"data not inserted topology is closed"
*/

client, err := mongo.Connect(context.Background(), connectionURI, nil)
bCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
client, err := mongo.Connect(bCtx, options.Client().ApplyURI(connectionURI))

defer cancel()

defer client.Disconnect(context.Background())
if err != nil {
activityLog.Errorf("Connection error: %v", err)
return false, err
}

db := client.Database(dbName)

coll := db.Collection(collectionName)

switch strings.ToUpper(method) {
case methodGet:
result := coll.FindOne(context.Background(), bson.NewDocument(bson.EC.String(keyName, keyValue)))
result := coll.FindOne(bCtx, bson.M{keyName: keyValue})
val := make(map[string]interface{})
err := result.Decode(val)
if err != nil {
Expand All @@ -100,12 +104,7 @@ func (a *MongoDbActivity) Eval(ctx activity.Context) (done bool, err error) {

ctx.SetOutput(ovOutput, val)
case methodDelete:
result, err := coll.DeleteMany(
context.Background(),
bson.NewDocument(
bson.EC.String(keyName, keyValue),
),
)
result, err := coll.DeleteOne(bCtx, bson.M{keyName: keyValue}, nil)
if err != nil {
return false, err
}
Expand All @@ -114,24 +113,26 @@ func (a *MongoDbActivity) Eval(ctx activity.Context) (done bool, err error) {

ctx.SetOutput(ovCount, result.DeletedCount)
case methodInsert:
result, err := coll.InsertOne(
context.Background(),
value,
)
if err != nil {
return false, err
if value == nil && keyValue == "" {
// should we throw an error or warn?
activityLog.Warnf("Nothing to insert")
return true, nil
}

var result *mongo.InsertOneResult

if value != nil && keyValue == "" {
result, err = coll.InsertOne(bCtx, value)

} else {
result, err = coll.InsertOne(bCtx, bson.M{keyName: keyValue})
}

activityLog.Debugf("Insert Results $#v", result)

ctx.SetOutput(ovOutput, result.InsertedID)
case methodReplace:
result, err := coll.ReplaceOne(
context.Background(),
bson.NewDocument(
bson.EC.String(keyName, keyValue),
),
value,
)
result, err := coll.ReplaceOne(bCtx, bson.M{keyName: keyValue}, value)
if err != nil {
return false, err
}
Expand All @@ -141,15 +142,7 @@ func (a *MongoDbActivity) Eval(ctx activity.Context) (done bool, err error) {
ctx.SetOutput(ovCount, result.ModifiedCount)

case methodUpdate:
result, err := coll.UpdateOne(
context.Background(),
bson.NewDocument(
bson.EC.String(keyName, keyValue),
),
bson.NewDocument(
bson.EC.Interface("$set", value),
),
)
result, err := coll.UpdateOne(bCtx, bson.M{keyName: keyValue}, bson.M{"$set": value})
if err != nil {
return false, err
}
Expand Down
19 changes: 12 additions & 7 deletions activity/mongodb/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package mongodb

import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"reflect"
"testing"
"time"

"github.com/TIBCOSoftware/flogo-contrib/action/flow/test"
"github.com/TIBCOSoftware/flogo-lib/core/activity"
"github.com/mongodb/mongo-go-driver/bson"
"github.com/mongodb/mongo-go-driver/mongo"
"github.com/stretchr/testify/assert"
"github.com/TIBCOSoftware/flogo-lib/logger"
"github.com/mongodb/mongo-go-driver/bson/objectid"
"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

const (
Expand All @@ -31,7 +34,7 @@ func init() {
//To remove below error:
// data not inserted topology is closed

client, err := mongo.Connect(context.Background(), TEST_URI, nil)
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(TEST_URI))
if err != nil {
// warn and skip tests
}
Expand Down Expand Up @@ -82,8 +85,8 @@ func insert(dataVal interface{}) (interface{}, error) {
}

func delete(id interface{}) {
oid := id.(objectid.ObjectID)
_, err := coll.DeleteOne(context.Background(), bson.NewDocument(bson.EC.ObjectID("_id", oid)))
oid := id.(primitive.ObjectID)
_, err := coll.DeleteOne(context.Background(), bson.M{"_id": oid})
if err != nil {
logger.Debugf("Error Deleting [%s] : %s", id, err.Error())
return
Expand Down Expand Up @@ -133,6 +136,8 @@ func TestInsert(t *testing.T) {
name := randomString(5)
val := map[string]interface{}{"name": name, "value1": "foo", "value2": "foo2"}
tc.SetInput(ivData, val)
//tc.SetInput(ivKeyName, "key")
//tc.SetInput(ivKeyValue, "value")

_, insertErr := act.Eval(tc)
if insertErr != nil {
Expand Down