Skip to content

Commit 7a74c90

Browse files
authored
feat: support pluggable queue (#125)
* feat: abstract queue * refactor: refine event and queue abstraction * refactor: support lazy init for queue * refactor: support queue config * refactor: refine event abstraction * refactor: refactor configuration * refactor: separate queues * refactor: refine queue * refactor: abstract queue in perf * ci: bench for memory queue * style: fix lint issues * style: add missing license header
1 parent 34316b0 commit 7a74c90

13 files changed

+455
-122
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -104,5 +104,6 @@ dist
104104
.tern-port
105105

106106
.idea
107+
.run
107108
bin/
108109
.DS_Store

Makefile

+16
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
113
.PHONY: build
14+
.PHONY: license
215
build:
316
go build -v -o bin/function-stream ./cmd
417

@@ -25,3 +38,6 @@ gen_rest_client:
2538
--global-property apiDocs,apis,models,supportingFiles
2639
rm -r restclient/go.mod restclient/go.sum restclient/.travis.yml restclient/.openapi-generator-ignore \
2740
restclient/git_push.sh restclient/.openapi-generator restclient/api restclient/test
41+
42+
license:
43+
./license-checker/license-checker.sh

benchmark/bench_test.go

+73-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/apache/pulsar-client-go/pulsaradmin"
2020
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
2121
"github.com/functionstream/functionstream/common"
22+
"github.com/functionstream/functionstream/lib"
2223
"github.com/functionstream/functionstream/perf"
2324
"github.com/functionstream/functionstream/restclient"
2425
"github.com/functionstream/functionstream/server"
@@ -77,9 +78,9 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
7778
createTopic(inputTopic)
7879
createTopic(outputTopic)
7980

80-
pConfig := perf.Config{
81+
pConfig := &perf.Config{
8182
PulsarURL: "pulsar://localhost:6650",
82-
RequestRate: 100000.0,
83+
RequestRate: 200000.0,
8384
Func: &restclient.Function{
8485
Archive: "./bin/example_basic.wasm",
8586
Inputs: []string{inputTopic},
@@ -116,3 +117,73 @@ func BenchmarkStressForBasicFunc(b *testing.B) {
116117
b.Fatal(err)
117118
}
118119
}
120+
121+
func BenchmarkStressForBasicFuncWithMemoryQueue(b *testing.B) {
122+
prepareEnv()
123+
124+
memoryQueueFactory := lib.NewMemoryQueueFactory()
125+
126+
svrConf := &lib.Config{
127+
QueueBuilder: func(ctx context.Context, config *lib.Config) (lib.EventQueueFactory, error) {
128+
return memoryQueueFactory, nil
129+
},
130+
}
131+
132+
fm, err := lib.NewFunctionManager(svrConf)
133+
if err != nil {
134+
b.Fatal(err)
135+
}
136+
s := server.NewWithFM(fm)
137+
go func() {
138+
common.RunProcess(func() (io.Closer, error) {
139+
go s.Run()
140+
return s, nil
141+
})
142+
}()
143+
144+
inputTopic := "test-input-" + strconv.Itoa(rand.Int())
145+
outputTopic := "test-output-" + strconv.Itoa(rand.Int())
146+
147+
replicas := int32(15)
148+
149+
pConfig := &perf.Config{
150+
RequestRate: 200000.0,
151+
Func: &restclient.Function{
152+
Archive: "./bin/example_basic.wasm",
153+
Inputs: []string{inputTopic},
154+
Output: outputTopic,
155+
Replicas: &replicas,
156+
},
157+
QueueBuilder: func(ctx context.Context, c *lib.Config) (lib.EventQueueFactory, error) {
158+
return memoryQueueFactory, nil
159+
},
160+
}
161+
162+
b.ReportAllocs()
163+
164+
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
165+
defer cancel()
166+
167+
profile := "BenchmarkStressForBasicFunc.pprof"
168+
file, err := os.Create(profile)
169+
if err != nil {
170+
b.Fatal(err)
171+
}
172+
defer func() {
173+
_ = file.Close()
174+
}()
175+
176+
err = pprof.StartCPUProfile(file)
177+
if err != nil {
178+
b.Fatal(err)
179+
}
180+
181+
perf.New(pConfig).Run(ctx)
182+
183+
pprof.StopCPUProfile()
184+
185+
err = s.Close()
186+
if err != nil {
187+
b.Fatal(err)
188+
}
189+
}

cmd/perf/cmd.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ var (
3030
Run: exec,
3131
}
3232

33-
config = perf.Config{}
33+
config = &perf.Config{}
3434
)
3535

3636
func init() {

lib/config.go

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package lib
16+
17+
import (
18+
"context"
19+
)
20+
21+
type QueueBuilder func(ctx context.Context, config *Config) (EventQueueFactory, error)
22+
23+
type Config struct {
24+
ListenAddr string
25+
PulsarURL string
26+
QueueBuilder QueueBuilder
27+
}

lib/event_queue.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package lib
16+
17+
import (
18+
"context"
19+
)
20+
21+
type Event interface {
22+
GetPayload() []byte
23+
Ack()
24+
}
25+
26+
type SourceQueueConfig struct {
27+
Topics []string
28+
SubName string
29+
}
30+
31+
type SinkQueueConfig struct {
32+
Topic string
33+
}
34+
35+
//type EventQueueFactory func(ctx context.Context, config *QueueConfig, function *model.Function) (EventQueue, error)
36+
37+
type EventQueueFactory interface {
38+
NewSourceChan(ctx context.Context, config *SourceQueueConfig) (<-chan Event, error)
39+
NewSinkChan(ctx context.Context, config *SinkQueueConfig) (chan<- Event, error)
40+
}
41+
42+
type EventQueue interface {
43+
GetSendChan() (chan<- Event, error)
44+
GetRecvChan() (<-chan Event, error)
45+
}
46+
47+
type AckableEvent struct {
48+
payload []byte
49+
ackFunc func()
50+
}
51+
52+
func NewAckableEvent(payload []byte, ackFunc func()) *AckableEvent {
53+
return &AckableEvent{
54+
payload: payload,
55+
ackFunc: ackFunc,
56+
}
57+
}
58+
59+
func (e *AckableEvent) GetPayload() []byte {
60+
return e.payload
61+
}
62+
63+
func (e *AckableEvent) Ack() {
64+
e.ackFunc()
65+
}

lib/instance.go

+28-62
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package lib
1717
import (
1818
"context"
1919
"fmt"
20-
"github.com/apache/pulsar-client-go/pulsar"
2120
"github.com/functionstream/functionstream/common"
2221
"github.com/functionstream/functionstream/common/model"
2322
"github.com/pkg/errors"
@@ -31,27 +30,27 @@ import (
3130
)
3231

3332
type FunctionInstance struct {
34-
ctx context.Context
35-
cancelFunc context.CancelFunc
36-
definition *model.Function
37-
pc pulsar.Client
38-
readyCh chan error
39-
index int32
33+
ctx context.Context
34+
cancelFunc context.CancelFunc
35+
definition *model.Function
36+
queueFactory EventQueueFactory
37+
readyCh chan error
38+
index int32
4039
}
4140

42-
func NewFunctionInstance(definition *model.Function, pc pulsar.Client, index int32) *FunctionInstance {
41+
func NewFunctionInstance(definition *model.Function, queueFactory EventQueueFactory, index int32) *FunctionInstance {
4342
ctx, cancelFunc := context.WithCancel(context.Background())
4443
ctx.Value(logrus.Fields{
4544
"function-name": definition.Name,
4645
"function-index": index,
4746
})
4847
return &FunctionInstance{
49-
ctx: ctx,
50-
cancelFunc: cancelFunc,
51-
definition: definition,
52-
pc: pc,
53-
readyCh: make(chan error),
54-
index: index,
48+
ctx: ctx,
49+
cancelFunc: cancelFunc,
50+
definition: definition,
51+
queueFactory: queueFactory,
52+
readyCh: make(chan error),
53+
index: index,
5554
}
5655
}
5756

@@ -86,30 +85,6 @@ func (instance *FunctionInstance) Run() {
8685
return
8786
}
8887

89-
consumer, err := instance.pc.Subscribe(pulsar.ConsumerOptions{
90-
Topics: instance.definition.Inputs,
91-
SubscriptionName: fmt.Sprintf("function-stream-%s", instance.definition.Name),
92-
Type: pulsar.Failover,
93-
})
94-
if err != nil {
95-
instance.readyCh <- errors.Wrap(err, "Error creating consumer")
96-
return
97-
}
98-
defer func() {
99-
consumer.Close()
100-
}()
101-
102-
producer, err := instance.pc.CreateProducer(pulsar.ProducerOptions{
103-
Topic: instance.definition.Output,
104-
})
105-
if err != nil {
106-
instance.readyCh <- errors.Wrap(err, "Error creating producer")
107-
return
108-
}
109-
defer func() {
110-
producer.Close()
111-
}()
112-
11388
handleErr := func(ctx context.Context, err error, message string, args ...interface{}) {
11489
if errors.Is(err, context.Canceled) {
11590
slog.InfoContext(instance.ctx, "function instance has been stopped")
@@ -134,40 +109,31 @@ func (instance *FunctionInstance) Run() {
134109
return
135110
}
136111

137-
instance.readyCh <- nil
138-
139-
for {
140-
msg, err := consumer.Receive(instance.ctx)
141-
if err != nil {
142-
handleErr(instance.ctx, err, "Error receiving message")
143-
return
144-
}
145-
stdin.ResetBuffer(msg.Payload())
112+
sourceChan, err := instance.queueFactory.NewSourceChan(instance.ctx, &SourceQueueConfig{Topics: instance.definition.Inputs, SubName: fmt.Sprintf("function-stream-%s", instance.definition.Name)})
113+
if err != nil {
114+
instance.readyCh <- errors.Wrap(err, "Error creating source event queue")
115+
return
116+
}
117+
sinkChan, err := instance.queueFactory.NewSinkChan(instance.ctx, &SinkQueueConfig{Topic: instance.definition.Output})
118+
if err != nil {
119+
instance.readyCh <- errors.Wrap(err, "Error creating sink event queue")
120+
return
121+
}
146122

123+
instance.readyCh <- nil
124+
for e := range sourceChan {
125+
stdin.ResetBuffer(e.GetPayload())
147126
_, err = process.Call(instance.ctx)
148127
if err != nil {
149128
handleErr(instance.ctx, err, "Error calling process function")
150129
return
151130
}
152-
153131
output := stdout.GetAndReset()
154-
producer.SendAsync(instance.ctx, &pulsar.ProducerMessage{
155-
Payload: output,
156-
}, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
157-
if err != nil {
158-
handleErr(instance.ctx, err, "Error sending message", "error", err, "messageId", id)
159-
return
160-
}
161-
err = consumer.Ack(msg)
162-
if err != nil {
163-
handleErr(instance.ctx, err, "Error acknowledging message", "error", err, "messageId", id)
164-
return
165-
}
166-
})
132+
sinkChan <- NewAckableEvent(output, e.Ack)
167133
}
168134
}
169135

170-
func (instance *FunctionInstance) WaitForReady() chan error {
136+
func (instance *FunctionInstance) WaitForReady() <-chan error {
171137
return instance.readyCh
172138
}
173139

0 commit comments

Comments
 (0)