Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ For more details see the <a href="https://docs.vllm.ai/en/stable/getting_started
- `tokenizers-cache-dir`: the directory for caching tokenizers
- `hash-seed`: seed for hash generation (if not set, is read from PYTHONHASHSEED environment variable)
- `zmq-endpoint`: ZMQ address to publish events
- `zmq-max-connect-attempts`: the maximum number of ZMQ connection attempts. defaults to 0. maximum: 10
- `event-batch-size`: the maximum number of kv-cache events to be sent together, defaults to 16
- `fake-metrics`: represents a predefined set of metrics to be sent to Prometheus as a substitute for the actual data. When specified, only these fake metrics will be reported — real metrics and fake metrics will never be reported simultaneously. The set should include values for
- `running-requests`
Expand Down
9 changes: 9 additions & 0 deletions manifests/invalid-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
port: 8001
model: "Qwen/Qwen2-0.5B"
max-num-seqs: 5
mode: "random"
time-to-first-token: 2000
inter-token-latency: 1000
kv-cache-transfer-latency: 100
seed: 100100100
zmq-max-connect-attempts: -111
6 changes: 6 additions & 0 deletions pkg/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ type Configuration struct {

// ZMQEndpoint is the ZMQ address to publish events, the default value is tcp://localhost:5557
ZMQEndpoint string `yaml:"zmq-endpoint"`
// ZMQMaxConnectAttempts defines the maximum number (10) of retries when ZMQ connection fails
ZMQMaxConnectAttempts uint `yaml:"zmq-max-connect-attempts"`
// EventBatchSize is the maximum number of kv-cache events to be sent together, defaults to 16
EventBatchSize int `yaml:"event-batch-size"`

Expand Down Expand Up @@ -354,6 +356,9 @@ func (c *Configuration) validate() error {
if c.EventBatchSize < 1 {
return errors.New("event batch size cannot less than 1")
}
if c.ZMQMaxConnectAttempts > 10 {
return errors.New("zmq retries times cannot be more than 10")
}

if c.FakeMetrics != nil {
if c.FakeMetrics.RunningRequests < 0 || c.FakeMetrics.WaitingRequests < 0 {
Expand Down Expand Up @@ -415,6 +420,7 @@ func ParseCommandParamsAndLoadConfig() (*Configuration, error) {
f.StringVar(&config.TokenizersCacheDir, "tokenizers-cache-dir", config.TokenizersCacheDir, "Directory for caching tokenizers")
f.StringVar(&config.HashSeed, "hash-seed", config.HashSeed, "Seed for hash generation (if not set, is read from PYTHONHASHSEED environment variable)")
f.StringVar(&config.ZMQEndpoint, "zmq-endpoint", config.ZMQEndpoint, "ZMQ address to publish events")
f.UintVar(&config.ZMQMaxConnectAttempts, "zmq-max-connect-attempts", config.ZMQMaxConnectAttempts, "Maximum number of times to retry ZMQ requests")
f.IntVar(&config.EventBatchSize, "event-batch-size", config.EventBatchSize, "Maximum number of kv-cache events to be sent together")

// These values were manually parsed above in getParamValueFromArgs, we leave this in order to get these flags in --help
Expand Down
11 changes: 11 additions & 0 deletions pkg/common/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ var _ = Describe("Simulator configuration", func() {
"{\"name\":\"lora4\",\"path\":\"/path/to/lora4\"}",
}
c.EventBatchSize = 5
c.ZMQMaxConnectAttempts = 1
test = testCase{
name: "config file with command line args",
args: []string{"cmd", "--model", model, "--config", "../../manifests/config.yaml", "--port", "8002",
"--served-model-name", "alias1", "alias2", "--seed", "100",
"--lora-modules", "{\"name\":\"lora3\",\"path\":\"/path/to/lora3\"}", "{\"name\":\"lora4\",\"path\":\"/path/to/lora4\"}",
"--event-batch-size", "5",
"--zmq-max-connect-attempts", "1",
},
expectedConfig: c,
}
Expand All @@ -121,6 +123,7 @@ var _ = Describe("Simulator configuration", func() {
c.LoraModulesString = []string{
"{\"name\":\"lora3\",\"path\":\"/path/to/lora3\"}",
}
c.ZMQMaxConnectAttempts = 0
test = testCase{
name: "config file with command line args with different format",
args: []string{"cmd", "--model", model, "--config", "../../manifests/config.yaml", "--port", "8002",
Expand Down Expand Up @@ -377,6 +380,14 @@ var _ = Describe("Simulator configuration", func() {
args: []string{"cmd", "--fake-metrics", "{\"running-requests\":10,\"waiting-requests\":30,\"kv-cache-usage\":40}",
"--config", "../../manifests/config.yaml"},
},
{
name: "invalid (negative) zmq-max-connect-attempts for argument",
args: []string{"cmd", "zmq-max-connect-attempts", "-1", "--config", "../../manifests/config.yaml"},
},
{
name: "invalid (negative) zmq-max-connect-attempts for config file",
args: []string{"cmd", "--config", "../../manifests/invalid-config.yaml"},
},
}

for _, test := range invalidTests {
Expand Down
33 changes: 22 additions & 11 deletions pkg/common/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"sync/atomic"
"time"

zmq "github.com/pebbe/zmq4"
"github.com/vmihailenco/msgpack/v5"
Expand All @@ -38,24 +39,34 @@ type Publisher struct {

// NewPublisher creates a new ZMQ publisher.
// endpoint is the ZMQ address to bind to (e.g., "tcp://*:5557").
func NewPublisher(endpoint string) (*Publisher, error) {
// retries is the maximum number of connection attempts.
func NewPublisher(endpoint string, retries uint) (*Publisher, error) {
socket, err := zmq.NewSocket(zmq.PUB)
if err != nil {
return nil, fmt.Errorf("failed to create ZMQ PUB socket: %w", err)
}

if err := socket.Connect(endpoint); err != nil {
errClose := socket.Close()
return nil, errors.Join(
fmt.Errorf("failed to connect to %s: %w", endpoint, err),
errClose,
)
// Retry connection with specified retry times and intervals
for i := uint(0); i <= retries; i++ {
err = socket.Connect(endpoint)
if err == nil {
return &Publisher{
socket: socket,
endpoint: endpoint,
}, nil
}

// If not the last attempt, wait before retrying
if i < retries {
time.Sleep(1 * time.Second)
}
}

return &Publisher{
socket: socket,
endpoint: endpoint,
}, nil
errClose := socket.Close()
return nil, errors.Join(
fmt.Errorf("failed to connect to %s after %d retries: %w", endpoint, retries+1, err),
errClose,
)
}

// PublishEvent publishes a KV cache event batch to the ZMQ topic.
Expand Down
39 changes: 38 additions & 1 deletion pkg/common/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
subEndpoint = "tcp://*:5557"
pubEndpoint = "tcp://localhost:5557"
data = "Hello"
retries = 0
)

var _ = Describe("Publisher", func() {
Expand All @@ -50,7 +51,7 @@ var _ = Describe("Publisher", func() {

time.Sleep(100 * time.Millisecond)

pub, err := NewPublisher(pubEndpoint)
pub, err := NewPublisher(pubEndpoint, retries)
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -78,4 +79,40 @@ var _ = Describe("Publisher", func() {
Expect(err).NotTo(HaveOccurred())
Expect(payload).To(Equal(data))
})
It("should fail when connection attempts exceed maximum retries", func() {
// Use invalid address format, which will cause connection to fail
invalidEndpoint := "invalid-address-format"

pub, err := NewPublisher(invalidEndpoint, 2)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("failed to connect"))
Expect(err.Error()).To(ContainSubstring("after 3 retries")) // 2 retries = 3 total attempts

if pub != nil {
//nolint
pub.Close()
}
})
It("should retry connection successfully", func() {
// Step 1: Try to connect to a temporarily non-existent service
// This will trigger the retry mechanism
go func() {
// Delay starting the server to simulate service recovery
time.Sleep(2 * time.Second)

// Start subscriber as server
sub, err := zmq.NewSocket(zmq.SUB)
Expect(err).NotTo(HaveOccurred())
//nolint
defer sub.Close()
err = sub.Bind(subEndpoint)
Expect(err).NotTo(HaveOccurred())
}()

// Step 2: Publisher will retry connection and eventually succeed
pub, err := NewPublisher(pubEndpoint, 5) // 5 retries
Expect(err).NotTo(HaveOccurred()) // Should eventually succeed
//nolint
defer pub.Close()
})
})
2 changes: 1 addition & 1 deletion pkg/kv-cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCach
// TODO read size of channel from config
eChan := make(chan EventData, 10000)

publisher, err := common.NewPublisher(config.ZMQEndpoint)
publisher, err := common.NewPublisher(config.ZMQEndpoint, config.ZMQMaxConnectAttempts)
if err != nil {
return nil, err
}
Expand Down
29 changes: 16 additions & 13 deletions pkg/kv-cache/kv_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,12 @@ var _ = Describe("KV cache", Ordered, func() {
time.Sleep(300 * time.Millisecond)

config := &common.Configuration{
Port: 1234,
Model: "model",
KVCacheSize: test.cacheSize,
ZMQEndpoint: pubEndpoint,
EventBatchSize: 1,
Port: 1234,
Model: "model",
KVCacheSize: test.cacheSize,
ZMQEndpoint: pubEndpoint,
ZMQMaxConnectAttempts: 3,
EventBatchSize: 1,
}

sub, topic := createSub(config)
Expand Down Expand Up @@ -304,10 +305,11 @@ var _ = Describe("KV cache", Ordered, func() {

It("should send events correctly", func() {
config := &common.Configuration{
Port: 1234,
Model: "model",
KVCacheSize: 4,
ZMQEndpoint: pubEndpoint,
Port: 1234,
Model: "model",
KVCacheSize: 4,
ZMQEndpoint: pubEndpoint,
ZMQMaxConnectAttempts: 3,
}

sub, topic := createSub(config)
Expand Down Expand Up @@ -413,10 +415,11 @@ var _ = Describe("KV cache", Ordered, func() {
for _, testCase := range testCases {
It(testCase.name, func() {
config := common.Configuration{
Port: 1234,
Model: "model",
KVCacheSize: testCase.cacheSize,
ZMQEndpoint: pubEndpoint,
Port: 1234,
Model: "model",
KVCacheSize: testCase.cacheSize,
ZMQEndpoint: pubEndpoint,
ZMQMaxConnectAttempts: 3,
}
blockCache, err := newBlockCache(&config, GinkgoLogr)
Expect(err).NotTo(HaveOccurred())
Expand Down
Loading