diff --git a/message/publisher.go b/message/publisher.go index e98f9f9f..9cc2bc02 100644 --- a/message/publisher.go +++ b/message/publisher.go @@ -71,11 +71,11 @@ func (p *Publisher) ProducerID() ProducerID { return p.producer } // its marshaled content, with a UUID sequenced for immediate consumption. // An error is returned if: // -// * The Message implements Validator, and it returns an error. -// * The MappingFunc returns an error while mapping the Message to a journal. -// * The journal's Framing returns an error while marshaling the Message, -// or an os.PathError occurs while spooling the frame to a temporary file -// (eg, because local disk is full). +// - The Message implements Validator, and it returns an error. +// - The MappingFunc returns an error while mapping the Message to a journal. +// - The journal's Framing returns an error while marshaling the Message, +// or an os.PathError occurs while spooling the frame to a temporary file +// (eg, because local disk is full). // // A particular MappingFunc error to be aware of is ErrEmptyListResponse, // returned by mapping routines of this package when there are no journals @@ -128,97 +128,6 @@ func (p *Publisher) PublishUncommitted(mapping MappingFunc, msg Message) (*clien return aa, nil } -// PendingPublish is returned from DeferPublishUncommitted, and allows appending a single message -// that had previously been sequenced. -// -// **This is a new and unstable API, that is subject to breaking changes.** -type PendingPublish struct { - publisher *Publisher - journal pb.Journal - contentType string - uuid UUID -} - -// Resolve completes a PendingPublish by appending the finalized content of a message that had -// previously been sequenced. See DeferPublishUncommitted docs for more. -// -// **This is a new and unstable API, that is subject to breaking changes.** -func (pf *PendingPublish) Resolve(msg Message) error { - if pf.publisher == nil { - // Sanity check for if Resolve has already been called, or if PendingPublish is zero-valued - // due to SequenceFutureMessage having returned an error. - panic("Pending publish has already been resolved") - } - if v, ok := msg.(Validator); ok { - if err := v.Validate(); err != nil { - return err - } - } - msg.SetUUID(pf.uuid) - - var framing, err = FramingByContentType(pf.contentType) - if err != nil { - return err - } - - var aa = pf.publisher.ajc.StartAppend(pb.AppendRequest{Journal: pf.journal}, nil) - aa.Require(framing.Marshal(msg, aa.Writer())) - err = aa.Release() - pf.publisher = nil // so that we can sanity check that Resolve isn't called twice - return err -} - -// DeferPublishUncommitted is used to sequence a message that will be published at some future -// point, but before the end of the transaction. It returns a PendingPublish, which can be resolved -// by passing it the actual message to be published. This is used in situations where you need to -// transactionally publish a message when you don't have the content of that message until after the -// ack intents are built. This is an advanced, low level api, and care must be taken to use it -// correctly to avoid corruption of journal content. -// -// The journal and contentType must be known up front, and the acknowledgement Message must also be -// provided by the caller. It's up to the caller to ensure that these things are correct and -// consistent. -// -// The returned PendingPublish does not need to ever be resolved, and can be dropped with no harm -// done. If Resolve is called, then it must be called _before_ the acknowledgements are written. -// Otherwise the resolved message will be ignored by ReadCommitted consumers. Also note that the -// PendingPublish is not safe to Resolve concurrently with other uses of a Publisher. -// -// No other messages should be published to the journal using PublishUncommitted or PublishCommitted -// before the PendingPublish is resolved. It it permissible to publish more than one message using -// DeferPublishUncommitted, as long as all PendingPublish instances are resolved in exactly the -// order in which they were created. -// -// **This is a new and unstable API, that is subject to breaking changes.** -func (p *Publisher) DeferPublishUncommitted(journal pb.Journal, contentType string, ack Message) (fut PendingPublish, err error) { - if p.autoUpdate { - p.clock.Update(time.Now()) - } - - var framing Framing - if framing, err = FramingByContentType(contentType); err != nil { - return - } - - var uuid = BuildUUID(p.producer, p.clock.Tick(), Flag_CONTINUE_TXN) - // Is this the first publish to this journal since our last commit? - if _, ok := p.intentIdx[journal]; !ok { - p.intentIdx[journal] = len(p.intents) - p.intents = append(p.intents, AckIntent{ - Journal: journal, - // Call NewAcknowledgement to create the ack, to ensure that each ack message is unique. - msg: ack.NewAcknowledgement(journal), - framing: framing, - }) - } - return PendingPublish{ - publisher: p, - journal: journal, - contentType: contentType, - uuid: uuid, - }, nil -} - // BuildAckIntents returns the []AckIntents which acknowledge all pending // Messages published since its last invocation. It's the caller's job to // actually append the intents to their respective journals, and only *after* diff --git a/message/publisher_test.go b/message/publisher_test.go index 5f518893..b708fb3a 100644 --- a/message/publisher_test.go +++ b/message/publisher_test.go @@ -215,121 +215,6 @@ func TestIntegrationOfPublisherWithSequencerAndReader(t *testing.T) { require.NoError(t, bk.Tasks.Wait()) } -func TestDeferPublishUncommitted(t *testing.T) { - var etcd = etcdtest.TestClient() - defer etcdtest.Cleanup() - - var ( - clock Clock - ctx = context.Background() - spec = newTestMsgSpec("a/journal") - bk = brokertest.NewBroker(t, etcd, "local", "broker") - ajc = client.NewAppendService(ctx, bk.Client()) - ) - brokertest.CreateJournals(t, bk, spec) - - // Start a long-lived RetryReader of |spec|. - var rr = client.NewRetryReader(ctx, bk.Client(), pb.ReadRequest{ - Journal: spec.Name, - Block: true, - }) - var r = NewReadUncommittedIter(rr, newTestMsg) - - var seq = NewSequencer(nil, nil, 5) - - var seqPump = func() (out []testMsg) { - var env, err = r.Next() - require.NoError(t, err) - - if seq.QueueUncommitted(env) == QueueAckCommitReplay { - // The sequencer buffer is large enough that we should never need to replay for this - // test. - panic("unexpected need to replay") - } - for { - if err := seq.Step(); err == io.EOF { - return - } - require.NoError(t, err) - out = append(out, *seq.Dequeued.Message.(*testMsg)) - } - } - - var mapping = func(Mappable) (pb.Journal, string, error) { - return spec.Name, labels.ContentType_JSONLines, nil - } - var pub = NewPublisher(ajc, &clock) - - // Happy path: An uncommitted message can be written before a deferred one, and should get - // sequenced normally with respect to the deferred message, since the deferred publish is - // started after. - var _, err = pub.PublishUncommitted(mapping, &testMsg{Str: "one"}) - require.NoError(t, err) - require.Equal(t, []testMsg(nil), seqPump()) - - fut, err := pub.DeferPublishUncommitted(spec.Name, labels.ContentType_JSONLines, new(testMsg)) - require.NoError(t, err) - - intents, err := pub.BuildAckIntents() - require.NoError(t, err) - - require.NoError(t, fut.Resolve(&testMsg{Str: "two"})) - require.Equal(t, []testMsg(nil), seqPump()) - - writeIntents(t, ajc, intents) - - var actual = seqPump() - require.Equal(t, 3, len(actual)) - require.Equal(t, "one", actual[0].Str) - require.Equal(t, "two", actual[1].Str) - require.Equal(t, "", actual[2].Str) - - // Sad path cases: - // The deferred publish message will not be seen because it sequences before "three" - fut, err = pub.DeferPublishUncommitted(spec.Name, labels.ContentType_JSONLines, new(testMsg)) - require.NoError(t, err) - - _, err = pub.PublishUncommitted(mapping, &testMsg{Str: "three"}) - require.NoError(t, err) - require.Equal(t, []testMsg(nil), seqPump()) - intents, err = pub.BuildAckIntents() - require.NoError(t, err) - require.NoError(t, fut.Resolve(&testMsg{Str: "wont see four"})) - require.Equal(t, []testMsg(nil), seqPump()) - - writeIntents(t, ajc, intents) - actual = seqPump() - require.Equal(t, 2, len(actual)) - require.Equal(t, "three", actual[0].Str) - require.Equal(t, "", actual[1].Str) - - // The deferred publish isn't resolved until after the acks were written, so will not be seen. - _, err = pub.PublishUncommitted(mapping, &testMsg{Str: "five"}) - require.NoError(t, err) - require.Equal(t, []testMsg(nil), seqPump()) - - fut, err = pub.DeferPublishUncommitted(spec.Name, labels.ContentType_JSONLines, new(testMsg)) - require.NoError(t, err) - - intents, err = pub.BuildAckIntents() - require.NoError(t, err) - writeIntents(t, ajc, intents) - - actual = seqPump() - require.Equal(t, 2, len(actual)) - require.Equal(t, "five", actual[0].Str) - require.Equal(t, "", actual[1].Str) - - require.NoError(t, fut.Resolve(&testMsg{Str: "wont see six"})) - require.Equal(t, []testMsg(nil), seqPump()) - - _, err = pub.PublishCommitted(mapping, &testMsg{Str: "seven"}) - require.NoError(t, err) - actual = seqPump() - require.Equal(t, 1, len(actual)) - require.Equal(t, "seven", actual[0].Str) -} - func readAllMsgs(t require.TestingT, bk *brokertest.Broker, spec *pb.JournalSpec) (out []testMsg) { var rr = client.NewRetryReader(context.Background(), bk.Client(), pb.ReadRequest{Journal: spec.Name}) var r = NewReadUncommittedIter(rr, newTestMsg)