Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OrderedConsumer, OptStartTime overrides DeliverPolicy #1734

Closed
bredtape opened this issue Nov 5, 2024 · 1 comment · Fixed by #1742
Closed

OrderedConsumer, OptStartTime overrides DeliverPolicy #1734

bredtape opened this issue Nov 5, 2024 · 1 comment · Fixed by #1742
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@bredtape
Copy link

bredtape commented Nov 5, 2024

Observed behavior

With the following config for OrderedConsumer:

jetstream.OrderedConsumerConfig{
		DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
		OptStartSeq:   2,
		OptStartTime:  &time.Time{}}

All messages are delivered, not starting from sequence 2.

The nats server log have the following:

[TRC] 172.21.0.1:54642 - cid:197 - <<- [PUB $JS.API.CONSUMER.CREATE.c415e8ead2704943a258f30bc180004c_jsconn_test.u4WPoF0Hd2Bst2DxUgjmZ1_2 _INBOX.u4WPoF0Hd2Bst2DxUgjmWK.Ew8O36VG 308]
[TRC] 172.21.0.1:54642 - cid:197 - <<- MSG_PAYLOAD: ["{\"stream_name\":\"c415e8ead2704943a258f30bc180004c_jsconn_test\",\"config\":{\"name\":\"u4WPoF0Hd2Bst2DxUgjmZ1_2\",\"deliver_policy\":\"by_start_time\",\"opt_start_time\":\"0001-01-01T00:00:00Z\",\"ack_policy\":\"none\",\"replay_policy\":\"instant\",\"inactive_threshold\":300000000000,\"num_replicas\":1,\"mem_storage\":true},\"action\":\"\"}"]

Note that the deliver_policy is set to by_start_time.

Expected behavior

Messages starting from sequence 2 should be delivered.

Server and client version

golang v1.23.2
nats-server v2.10.22
github.com/nats-io/nats.go v1.37.0

Host environment

No response

Steps to reproduce

package sync

import (
	"context"
	"testing"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

func TestOrderedConsumerOptStartTimeOverride(t *testing.T) {
	ctx := context.Background()

	urls := "nats://localhost:4222"
	nc, err := nats.Connect(urls)
	if err != nil {
		t.Fatal(err)
	}

	js, err := jetstream.New(nc)
	if err != nil {
		t.Fatal(err)
	}

	streamConfig := jetstream.StreamConfig{
		Name:        "test",
		Description: "desc",
		Subjects:    []string{"test.*"}}

	_, err = js.CreateStream(ctx, streamConfig)
	if err != nil {
		t.Fatalf("failed to create stream: %v", err)
	}

	_, err = js.Publish(ctx, "test.1", []byte("hello"))
	if err != nil {
		t.Fatalf("failed to publish message: %v", err)
	}
	_, err = js.Publish(ctx, "test.2", []byte("hello"))
	if err != nil {
		t.Fatalf("failed to publish message: %v", err)
	}

	cfg := jetstream.OrderedConsumerConfig{
		DeliverPolicy: jetstream.DeliverByStartSequencePolicy,
		OptStartSeq:   2,
		OptStartTime:  &time.Time{}}
	consumer, err := js.OrderedConsumer(ctx, "test", cfg)
	if err != nil {
		t.Fatalf("failed to create consumer: %v", err)
	}

	msg, err := consumer.Next()
	if err != nil {
		t.Fatalf("failed to get message: %v", err)
	}

	if msg.Subject() != "test.2" {
		t.Fatalf("expected subject to be test.2, got %s", msg.Subject())
	}
}

Results in:

=== RUN   TestOrderedConsumerOptStartTimeOverride
    min_test.go:60: expected subject to be test.2, got test.1
--- FAIL: TestOrderedConsumerOptStartTimeOverride (0.00s)
@bredtape bredtape added the defect Suspected defect such as a bug or regression label Nov 5, 2024
@piotrpio piotrpio self-assigned this Nov 6, 2024
@piotrpio
Copy link
Collaborator

Hello @bredtape, thank you for creating the issue! I created a PR which should fix the problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants