Skip to content

Commit

Permalink
Merge pull request cockroachdb#9 from crowdflux/himanshu-github
Browse files Browse the repository at this point in the history
sync all method in feedline repo
  • Loading branch information
himanshu144141 committed Aug 10, 2016
2 parents 1191561 + 00785e0 commit 5d59cc5
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 71 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ ehthumbs.db
# Folder config file
Desktop.ini

### Experiments golang files ###
experiments/

# Recycle Bin used on file shares
$RECYCLE.BIN/

Expand Down
92 changes: 92 additions & 0 deletions app/DAL/repositories/feed_line_repo/extra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package feed_line_repo

import (
"github.com/crowdflux/angel/app/DAL/clients/postgres"
"github.com/crowdflux/angel/app/models"
"github.com/crowdflux/angel/app/models/uuid"
"github.com/crowdflux/angel/app/plog"
"gopkg.in/mgo.v2/bson"
)

func SyncAll() {

inpQ := NewInputQueue()

existingQFlus := []feedLineInputModel{}

err := inpQ.mgo.C("feedline_input").Find(bson.M{}).All(&existingQFlus)
if err != nil {
plog.Error("feedline", err)
return
}

plog.Info("existingQFlus", len(existingQFlus))

fluRepo := fluRepo{
Db: postgres.GetPostgresClient(),
}

flus := []models.FeedLineUnit{}

_, err = fluRepo.Db.Select(&flus, "SELECT * FROM feed_line")
if err != nil {
plog.Error("feedline", err)
return
}

plog.Info("flus", len(flus))

//flusToUpdate := []feedLineInputModel{}
flusToInsert := []interface{}{}

for _, eflu := range existingQFlus {

if _, ok := existsInList(flus, eflu.ID); !ok {

f := eflu.FeedLineUnit

f.Build = models.JsonF{
"fail": true,
}
flusToInsert = append(flusToInsert, &f)
}
}

//for _, f := range flusToInsert {
// plog.Info("flusToInsert", f.(*models.FeedLineUnit).ID)
//
//}

err = fluRepo.Db.Insert(flusToInsert...)
if err != nil {
plog.Error("feedline", err)
return
}

plog.Info("extra", len(flusToInsert), " new flus inserted")

/*
err = inpQ.mgo.C("feedline_input").Insert(flusToInsert...)
if err != nil {
plog.Error("feedline", err)
}
for _, updateFlu := range flusToUpdate {
err = inpQ.mgo.C("feedline_input").UpdateId(updateFlu.ID, updateFlu)
if err != nil {
plog.Error("feedline", err, updateFlu)
}
}*/
}

func existsInList(list []models.FeedLineUnit, toFindId uuid.UUID) (*models.FeedLineUnit, bool) {
for _, elem := range list {
if elem.ID == toFindId {

plog.Info("asd", elem.ID)
return &elem, true
}
}
return nil, false
}
9 changes: 7 additions & 2 deletions app/DAL/repositories/feed_line_repo/input_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ func (i *inputQueue) GetQueued() ([]models.FeedLineUnit, error) {
return flus, err
}

func (i *inputQueue) MarkFinished() error {
func (i *inputQueue) MarkFinished(flus []models.FeedLineUnit) error {

_, err := i.mgo.C("feedline_input").UpdateAll(bson.M{"status": queued}, bson.M{"$set": bson.M{"status": success}})
fluIdsString := make([]string, len(flus))
for i, flu := range flus {
fluIdsString[i] = flu.ID.String()
}

_, err := i.mgo.C("feedline_input").UpdateAll(bson.M{"id_string": bson.M{"$in": fluIdsString}}, bson.M{"$set": bson.M{"status": success}})
return err
}
1 change: 1 addition & 0 deletions app/plog/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func Trace(tag string, args ...interface{}) {
fmt.Println(time.Now().Format(logFormat), fn, line, tag, args)
}
}

func IsTraceEnabled() bool {
return levelTrace <= plogLevel
}
4 changes: 2 additions & 2 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func Start() {
app.Version = "0.0.1"
app.Action = func(c *cli.Context) {

println("Support server started!")
println("Run support-playment -h for help")
println("Angel server started!")
println("Run angel -h for help")
println(
` .__ __
______ | | _____ ___.__. _____ ____ _____/ |_
Expand Down
2 changes: 1 addition & 1 deletion app/services/flu_svc/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ func StartFeedLineSync() {
}
}

c.AddFunc("*/10 * * * * *", syncFeedLine)
c.AddFunc("0 */2 * * * *", syncFeedLine)
c.Start()
}
2 changes: 1 addition & 1 deletion app/services/flu_svc/flu_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (i *fluService) SyncInputFeedLine() error {
}
}()

err = fluInputQueue.MarkFinished()
err = fluInputQueue.MarkFinished(flus)

if err != nil {
plog.Error("Changing queue status failed", err)
Expand Down
40 changes: 40 additions & 0 deletions app/services/work_flow_svc/feed_line/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package feed_line

import (
"github.com/crowdflux/angel/app/models/uuid"
"sync"
)

type Bf struct {
mtx sync.RWMutex
fluMap map[uuid.UUID]FLU
}

func NewBuffer() Bf {
return Bf{fluMap: make(map[uuid.UUID]FLU)}
}

// RLock is read lock i.e. either multiple reads
// or single write can happen at a time
func (b *Bf) Get(id uuid.UUID) (FLU, bool) {
b.mtx.RLock()
defer b.mtx.RUnlock()

flu, ok := b.fluMap[id]
return flu, ok
}

// Write lock part of the read write lock
func (b *Bf) Save(flu FLU) {
b.mtx.Lock()
defer b.mtx.Unlock()

b.fluMap[flu.ID] = flu
}

func (b *Bf) GetAll() map[uuid.UUID]FLU {
b.mtx.RLock()
defer b.mtx.RUnlock()

return b.fluMap
}
41 changes: 1 addition & 40 deletions app/services/work_flow_svc/feed_line/feed_line.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package feed_line

import (
"sync"

"github.com/crowdflux/angel/app/models"
"github.com/crowdflux/angel/app/models/uuid"
)

// ShortHand for channel of FLUs i.e. FeedLine
Expand All @@ -29,42 +26,6 @@ func NewFixedSize(size int) Fl {

//--------------------------------------------------------------------------------//

type Bf struct {
mtx sync.RWMutex
fluMap map[uuid.UUID]FLU
}

func NewBuffer() Bf {
return Bf{fluMap: make(map[uuid.UUID]FLU)}
}

// RLock is read lock i.e. either multiple reads
// or single write can happen at a time
func (b *Bf) Get(id uuid.UUID) (FLU, bool) {
b.mtx.RLock()
defer b.mtx.RUnlock()

flu, ok := b.fluMap[id]
return flu, ok
}

// Write lock part of the read write lock
func (b *Bf) Save(flu FLU) {
b.mtx.Lock()
defer b.mtx.Unlock()

b.fluMap[flu.ID] = flu
}

func (b *Bf) GetAll() map[uuid.UUID]FLU {
b.mtx.RLock()
defer b.mtx.RUnlock()

return b.fluMap
}

//--------------------------------------------------------------------------------//

type FLU struct {
models.FeedLineUnit

Expand All @@ -80,7 +41,7 @@ type Builder interface {
// Returns the step identifier
Step() uint

// Returns the step pending
// Returns true if the step pending
Pending() bool

// Returns the step success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func FluUpdateHandler(updates []FluUpdate) error {

if !ok {
// Handle error
plog.Error("Flu Handler", errors.New("Flu Not present in the buffer"), update.FluId)
plog.Error("Flu Handler crowdy", errors.New("Flu Not present in the buffer"), update.FluId)
continue
}

Expand Down
1 change: 0 additions & 1 deletion app/services/work_flow_svc/step_router/router_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type routeGetter struct {

func (r *routeGetter) GetStartStep(flu feed_line.FLU) (models.Step, error) {
return r.stepRepo.GetStartStep(flu.ProjectId)

}

func (r *routeGetter) GetNextStep(flu feed_line.FLU) (models.Step, error) {
Expand Down
Loading

0 comments on commit 5d59cc5

Please sign in to comment.