Skip to content

Commit

Permalink
Add queue binding type
Browse files Browse the repository at this point in the history
  • Loading branch information
jbw1991 committed Jan 13, 2023
1 parent 4ec0043 commit f2c7757
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
36 changes: 36 additions & 0 deletions workers_bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
WorkerR2BucketBindingType WorkerBindingType = "r2_bucket"
// WorkerAnalyticsEngineBindingType is the type for Analytics Engine dataset bindings.
WorkerAnalyticsEngineBindingType WorkerBindingType = "analytics_engine"
// WorkerQueueBindingType is the type for queue bindings.
WorkerQueueBindingType WorkerBindingType = "queue"
)

type ListWorkerBindingsParams struct {
Expand Down Expand Up @@ -309,6 +311,34 @@ func (b WorkerAnalyticsEngineBinding) serialize(bindingName string) (workerBindi
}, nil, nil
}

// WorkerQueueBinding is a binding to a Workers Queue.
//
// https://developers.cloudflare.com/workers/platform/bindings/#queue-bindings
type WorkerQueueBinding struct {
Binding string
Queue string
}

// Type returns the type of the binding.
func (b WorkerQueueBinding) Type() WorkerBindingType {
return WorkerQueueBindingType
}

func (b WorkerQueueBinding) serialize(bindingName string) (workerBindingMeta, workerBindingBodyWriter, error) {
if b.Binding == "" {
return nil, nil, fmt.Errorf(`Binding name for binding "%s" cannot be empty`, bindingName)
}
if b.Queue == "" {
return nil, nil, fmt.Errorf(`Queue name for binding "%s" cannot be empty`, bindingName)
}

return workerBindingMeta{
"type": b.Type(),
"name": b.Binding,
"queue_name": b.Queue,
}, nil, nil
}

// Each binding that adds a part to the multipart form body will need
// a unique part name so we just generate a random 128bit hex string.
func getRandomPartName() string {
Expand Down Expand Up @@ -377,6 +407,12 @@ func (api *API) ListWorkerBindings(ctx context.Context, rc *ResourceContainer, p
bindingListItem.Binding = WorkerKvNamespaceBinding{
NamespaceID: namespaceID,
}
case WorkerQueueBindingType:
queueName := jsonBinding["queue_name"].(string)
bindingListItem.Binding = WorkerQueueBinding{
Binding: name,
Queue: queueName,
}
case WorkerWebAssemblyBindingType:
bindingListItem.Binding = WorkerWebAssemblyBinding{
Module: &bindingContentReader{
Expand Down
37 changes: 37 additions & 0 deletions workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,3 +896,40 @@ func TestUploadWorker_WithCompatibilityFlags(t *testing.T) {
})
assert.NoError(t, err)
}

func TestUploadWorker_WithQueueBinding(t *testing.T) {
setup()
defer teardown()

handler := func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, http.MethodPut, r.Method, "Expected method 'PUT', got %s", r.Method)

mpUpload, err := parseMultipartUpload(r)
assert.NoError(t, err)

expectedBindings := map[string]workerBindingMeta{
"b1": {
"name": "b1",
"type": "queue",
"queue_name": "test-queue",
},
}
assert.Equal(t, workerScript, mpUpload.Script)
assert.Equal(t, expectedBindings, mpUpload.BindingMeta)

w.Header().Set("content-type", "application/json")
fmt.Fprint(w, uploadWorkerResponseData)
}
mux.HandleFunc("/accounts/"+testAccountID+"/workers/scripts/bar", handler)

_, err := client.UploadWorker(context.Background(), AccountIdentifier(testAccountID), CreateWorkerParams{
ScriptName: "bar",
Script: workerScript,
Bindings: map[string]WorkerBinding{
"b1": WorkerQueueBinding{
Binding: "b1",
Queue: "test-queue",
},
}})
assert.NoError(t, err)
}

0 comments on commit f2c7757

Please sign in to comment.