Skip to content

Some improvement suggestions on Offset manipulation for consumers #252

@ThomasShih

Description

@ThomasShih

I was testing into and scoping out how to re-establish communication with a stream and hook up with an existing offset to continue where I left off, and some interesting blockers surfaced with how to use the library. Below is a few notes that might help improve DX. Thank you!

  • Consumer does not return an local currently stored offset. This poses an issue when we try to store the offset on the rabbitmq server
  • I can't call client.close() after consumer.close(), it throws an error of connection already closed.

The context as to how I came to these notes are the follow jest below:

      const referenceName = faker.word.noun();
      const consumer = await streamsService.client.declareConsumer(
        {
          stream: params.stream,
          offset: Offset.offset(0n),
          consumerRef: referenceName,
        },
        onIncomingMessage,
      );
      const publisher = await streamsService.client.declarePublisher(params);
      await publisher.send(Buffer.from(faker.lorem.sentence()));
      await waitForExpect(async () => {
        expect(onIncomingMessage).toHaveBeenCalled();
      });
      expect(onIncomingMessage).toHaveBeenCalledTimes(1);
      // declareConsumer from the lib should probably promise a
      // StreamConsumer instead of a Consumer so we can access the offset
      const localOffset: bigint | undefined = (consumer as StreamConsumer).localOffset.value;

      if (localOffset === undefined) {
        throw new Error('localOffset is undefined');
      }

      // Perhaps there may be an option to upload the offset to the server directly from the consumer's internal store? Instead of having to fetch for it and then retrieve it
      await consumer.storeOffset(localOffset);
      await consumer.close(false);

      await publisher.send(Buffer.from(faker.lorem.sentence()));
      await publisher.send(Buffer.from(faker.lorem.sentence()));

      const lastMessageOffset = await streamsService.client.queryOffset({
        stream: params.stream,
        reference: referenceName,
      });
      expect(lastMessageOffset).toEqual(0n);

      const resumedOnIncomingMessage = jest.fn();
      const resumedConsumer = await streamsService.client.declareConsumer(
        {
          stream: params.stream,
          offset: Offset.offset(lastMessageOffset + 1n),
          consumerRef: referenceName,
        },
        resumedOnIncomingMessage,
      );
      await waitForExpect(async () => {
        expect(onIncomingMessage).toHaveBeenCalled();
      });
      expect(resumedOnIncomingMessage).toHaveBeenCalledTimes(2);
      expect(
        (resumedConsumer as StreamConsumer).localOffset.value,
      ).toEqual(2n);

      await publisher.close(false);

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions