Skip to content

Commit

Permalink
Remove FragmentHandler's error response
Browse files Browse the repository at this point in the history
  • Loading branch information
steven-stern committed Nov 30, 2022
1 parent 8e9ec82 commit 5bd9f4b
Show file tree
Hide file tree
Showing 29 changed files with 218 additions and 324 deletions.
7 changes: 3 additions & 4 deletions aeron/fragmentassembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ func (f *FragmentAssembler) OnFragment(
buffer *atomic.Buffer,
offset int32,
length int32,
header *logbuffer.Header) error {
header *logbuffer.Header) {
flags := header.Flags()
if (flags & unfragmented) == unfragmented {
return f.delegate(buffer, offset, length, header)
f.delegate(buffer, offset, length, header)
} else {
if (flags & beginFrag) == beginFrag {
builder, ok := f.builderBySessionIdMap[header.SessionId()]
Expand All @@ -86,7 +86,7 @@ func (f *FragmentAssembler) OnFragment(
buffer.WriteBytes(builder, offset, length)
if (flags & endFrag) == endFrag {
msgLength := builder.Len()
return f.delegate(
f.delegate(
atomic.MakeBuffer(builder.Bytes(), msgLength),
int32(0),
int32(msgLength),
Expand All @@ -96,5 +96,4 @@ func (f *FragmentAssembler) OnFragment(
}
}
}
return nil
}
6 changes: 3 additions & 3 deletions aeron/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import "github.com/lirm/aeron-go/aeron/logbuffer/term"
// the many structs that use image, without deviating from the existing function signatures and code structure.
type Image interface {
IsClosed() bool
Poll(handler term.FragmentHandler, fragmentLimit int) (int, error)
BoundedPoll(handler term.FragmentHandler, limitPosition int64, fragmentLimit int) (int, error)
ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) (int, error)
Poll(handler term.FragmentHandler, fragmentLimit int) int
BoundedPoll(handler term.FragmentHandler, limitPosition int64, fragmentLimit int) int
ControlledPoll(handler term.ControlledFragmentHandler, fragmentLimit int) int
Position() int64
IsEndOfStream() bool
SessionID() int32
Expand Down
29 changes: 12 additions & 17 deletions aeron/image_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package aeron

import (
"fmt"
"github.com/lirm/aeron-go/aeron/atomic"
"github.com/lirm/aeron-go/aeron/logbuffer"
"github.com/lirm/aeron-go/aeron/logbuffer/term"
Expand Down Expand Up @@ -77,23 +76,23 @@ func (image *image) IsClosed() bool {
// early, even if fragmentLimit has not been hit.
//
//go:norace
func (image *image) Poll(handler term.FragmentHandler, fragmentLimit int) (int, error) {
func (image *image) Poll(handler term.FragmentHandler, fragmentLimit int) int {
if image.IsClosed() {
return 0, nil
return 0
}

position := image.subscriberPosition.get()
termOffset := int32(position) & image.termLengthMask
index := indexByPosition(position, image.positionBitsToShift)
termBuffer := image.termBuffers[index]

offset, result, err := term.Read(termBuffer, termOffset, handler, fragmentLimit, &image.header)
offset, result := term.Read(termBuffer, termOffset, handler, fragmentLimit, &image.header)

newPosition := position + int64(offset-termOffset)
if newPosition > position {
image.subscriberPosition.set(newPosition)
}
return result, err
return result
}

// BoundedPoll polls for new messages in a stream. If new messages are found
Expand All @@ -108,9 +107,9 @@ func (image *image) BoundedPoll(
handler term.FragmentHandler,
limitPosition int64,
fragmentLimit int,
) (int, error) {
) int {
if image.IsClosed() {
return 0, nil
return 0
}

fragmentsRead := 0
Expand All @@ -129,11 +128,9 @@ func (image *image) BoundedPoll(
header := &image.header
header.Wrap(termBuffer.Ptr(), termBuffer.Capacity())

var err error
for fragmentsRead < fragmentLimit && offset < limitOffset && err == nil {
for fragmentsRead < fragmentLimit && offset < limitOffset {
length := logbuffer.GetFrameLength(termBuffer, offset)
if length <= 0 {
err = fmt.Errorf("invalid frameLength %d", length)
break
}

Expand All @@ -147,14 +144,14 @@ func (image *image) BoundedPoll(
fragmentsRead++
header.SetOffset(frameOffset)

err = handler(termBuffer, frameOffset+logbuffer.DataFrameHeader.Length,
handler(termBuffer, frameOffset+logbuffer.DataFrameHeader.Length,
length-logbuffer.DataFrameHeader.Length, header)
}
resultingPosition := initialPosition + int64(offset-initialOffset)
if resultingPosition > initialPosition {
image.subscriberPosition.set(resultingPosition)
}
return fragmentsRead, err
return fragmentsRead
}

// ControlledPoll polls for new messages in a stream. If new messages are found
Expand All @@ -168,9 +165,9 @@ func (image *image) BoundedPoll(
func (image *image) ControlledPoll(
handler term.ControlledFragmentHandler,
fragmentLimit int,
) (int, error) {
) int {
if image.IsClosed() {
return 0, nil
return 0
}

fragmentsRead := 0
Expand All @@ -185,11 +182,9 @@ func (image *image) ControlledPoll(
header := &image.header
header.Wrap(termBuffer.Ptr(), termBuffer.Capacity())

var err error
for fragmentsRead < fragmentLimit && offset < capacity {
length := logbuffer.GetFrameLength(termBuffer, offset)
if length <= 0 {
err = fmt.Errorf("invalid frameLength %d", length)
break
}

Expand Down Expand Up @@ -223,7 +218,7 @@ func (image *image) ControlledPoll(
if resultingPosition > initialPosition {
image.subscriberPosition.set(resultingPosition)
}
return fragmentsRead, err
return fragmentsRead
}

// Position returns the position this image has been consumed to by the subscriber.
Expand Down
2 changes: 1 addition & 1 deletion aeron/logbuffer/term/fragmenthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ import (
)

// FragmentHandler is the main callback interface for received data
type FragmentHandler func(buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header) error
type FragmentHandler func(buffer *atomic.Buffer, offset int32, length int32, header *logbuffer.Header)
13 changes: 2 additions & 11 deletions aeron/logbuffer/term/mock_fragmenthandler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 13 additions & 18 deletions aeron/logbuffer/term/reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
Copyright 2016 Stanislav Liberman
Copyright 2022 Steven Stern
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -18,26 +17,22 @@ limitations under the License.
package term

import (
"fmt"
"github.com/lirm/aeron-go/aeron/atomic"
"github.com/lirm/aeron-go/aeron/logbuffer"
"github.com/lirm/aeron-go/aeron/util"
)

// Read will attempt to read the next frame from the term and invoke the callback if successful.
// Method will return a tuple of new term offset, number of fragments read, and an error reading
// fragments. An error will interrupt reading before fragmentLimit, but any fragments read prior
// to the error will still be processed, and that number will be returned with the error.
// Method will return a tuple of new term offset and number of fragments read
//
//go:norace
func Read(termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandler, fragmentLimit int,
header *logbuffer.Header) (int32, int, error) {
header *logbuffer.Header) (int32, int) {

capacity := termBuffer.Capacity()

var fragmentsRead int
var err error
for fragmentsRead < fragmentLimit && termOffset < capacity && err == nil {
for fragmentsRead < fragmentLimit && termOffset < capacity {
frameLength := logbuffer.GetFrameLength(termBuffer, termOffset)
if frameLength <= 0 {
break
Expand All @@ -48,27 +43,26 @@ func Read(termBuffer *atomic.Buffer, termOffset int32, handler FragmentHandler,

if !logbuffer.IsPaddingFrame(termBuffer, fragmentOffset) {
header.Wrap(termBuffer.Ptr(), termBuffer.Capacity())
fragmentsRead++
header.SetOffset(fragmentOffset)
err = handler(termBuffer, fragmentOffset+logbuffer.DataFrameHeader.Length,
handler(termBuffer, fragmentOffset+logbuffer.DataFrameHeader.Length,
frameLength-logbuffer.DataFrameHeader.Length, header)

fragmentsRead++
}
}

return termOffset, fragmentsRead, err
return termOffset, fragmentsRead
}

// BoundedRead will attempt to read frames from the term up to the specified offsetLimit.
// Method will return a tuple of new term offset and number of fragments read
func BoundedRead(termBuffer *atomic.Buffer, termOffset int32, offsetLimit int32, handler FragmentHandler,
fragmentLimit int, header *logbuffer.Header) (int32, int, error) {
fragmentLimit int, header *logbuffer.Header) (int32, int) {

var fragmentsRead int
var err error
for fragmentsRead < fragmentLimit && termOffset < offsetLimit && err == nil {
for fragmentsRead < fragmentLimit && termOffset < offsetLimit {
frameLength := logbuffer.GetFrameLength(termBuffer, termOffset)
if frameLength <= 0 {
err = fmt.Errorf("invalid frameLength %d", frameLength)
break
}

Expand All @@ -77,12 +71,13 @@ func BoundedRead(termBuffer *atomic.Buffer, termOffset int32, offsetLimit int32,

if !logbuffer.IsPaddingFrame(termBuffer, fragmentOffset) {
header.Wrap(termBuffer.Ptr(), termBuffer.Capacity())
fragmentsRead++
header.SetOffset(fragmentOffset)
err = handler(termBuffer, fragmentOffset+logbuffer.DataFrameHeader.Length,
handler(termBuffer, fragmentOffset+logbuffer.DataFrameHeader.Length,
frameLength-logbuffer.DataFrameHeader.Length, header)

fragmentsRead++
}
}

return termOffset, fragmentsRead, err
return termOffset, fragmentsRead
}
33 changes: 6 additions & 27 deletions aeron/mock_image.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5bd9f4b

Please sign in to comment.