Skip to content

Commit

Permalink
fix(state): fix sql where on tag assign (#245)
Browse files Browse the repository at this point in the history
Add parenthesis so that all tagged tasks are not considered "unassigned"
  • Loading branch information
hannahhoward authored Jun 24, 2021
1 parent 38a758d commit 1c77415
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
4 changes: 2 additions & 2 deletions controller/state/statedb_dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ const (
oldestAvailableTaskWithTagsSQL = `
SELECT uuid, data FROM tasks
WHERE worked_by IS NULL
AND tag is NULL OR tag = ANY($1)
AND (tag is NULL OR tag = ANY($1))
ORDER BY created
LIMIT 1
`

oldestAvailableTaskWithTagsSQLsqlite = `
SELECT uuid, data FROM tasks
WHERE worked_by IS NULL
AND tag is NULL OR tag IN (%s)
AND (tag is NULL OR tag IN (%s))
ORDER BY created
LIMIT 1
`
Expand Down
62 changes: 62 additions & 0 deletions controller/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,68 @@ func TestAssignTaskWithTag(t *testing.T) {
require.NotNil(t, task, "Did not get untagged task")
})
}
func TestAssignConcurrentTaskWithTag(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
withState(ctx, t, func(state *stateDB) {
taskCount := 4

for i := 0; i < taskCount; i++ {
tasktag := "testtag"
rt := tasks.Type.RetrievalTask.Of("t01000", "bafk2bzacedli6qxp43sf54feczjd26jgeyfxv4ucwylujd3xo5s6cohcqbg36", false, tasktag)
task := tasks.Type.Task.New(rt, nil)
err := state.saveTask(ctx, task, tasktag)
require.NoError(t, err)

tasktag = "sometag"
rt = tasks.Type.RetrievalTask.Of("f0127896", "bafk2bzacedli6qxp43sf54feczjd26jgeyfxv4ucwylujd3xo5s6cohcqbg36", false, tasktag)
task = tasks.Type.Task.New(rt, nil)
err = state.saveTask(ctx, task, tasktag)
require.NoError(t, err)

}

release := make(chan struct{})
assigned := make([]tasks.Task, taskCount)
errChan := make(chan error)
t.Log("concurrently assigning", taskCount, "tasks")
for i := 0; i < taskCount; i++ {
go func(n int) {
worker := fmt.Sprintf("worker-%d", n)
<-release
req := tasks.Type.PopTask.Of(worker, tasks.InProgress, "testtag")

task, err := state.AssignTask(ctx, req)
if err != nil {
errChan <- err
return
}
assigned[n] = task
errChan <- nil
}(i)
}

close(release)
for i := 0; i < taskCount; i++ {
err := <-errChan
require.NoError(t, err)
}

for i := 0; i < taskCount; i++ {
task := assigned[i]
if task == nil {
t.Log("did not find task to assign")
continue
}
history, err := state.TaskHistory(ctx, task.UUID.String())
require.NoError(t, err)

assert.Len(t, history, 2)
assert.Equal(t, tasks.Available, history[0].Status, "wrong status for 1st history")
assert.Equal(t, tasks.InProgress, history[1].Status, "wrong status for 2nd history")
}
})
}

func TestUpdateTasks(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 1c77415

Please sign in to comment.