Skip to content

Commit

Permalink
Add norace directives to run under the race detector
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanf authored and peterTalos committed Aug 12, 2022
1 parent 3326ef3 commit b09b9e2
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 0 deletions.
27 changes: 27 additions & 0 deletions aeron/atomic/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,28 @@ func MakeBuffer(args ...interface{}) *Buffer {
}

// Wrap raw memory with this buffer instance
//go:norace
func (buf *Buffer) Wrap(buffer unsafe.Pointer, length int32) *Buffer {
buf.bufferPtr = buffer
buf.length = length
return buf
}

// Ptr will return the raw memory pointer for the underlying buffer
//go:norace
func (buf *Buffer) Ptr() unsafe.Pointer {
return buf.bufferPtr
}

// Capacity of the buffer, which is used for bound checking
//go:norace
func (buf *Buffer) Capacity() int32 {
return buf.length
}

// Fill the buffer with the value of the argument byte. Generally used for initialization,
// since it's somewhat expensive.
//go:norace
func (buf *Buffer) Fill(b uint8) {
if buf.length == 0 {
return
Expand All @@ -113,6 +117,7 @@ func (buf *Buffer) Fill(b uint8) {
}
}

//go:norace
func (buf *Buffer) GetUInt8(offset int32) uint8 {
BoundsCheck(offset, 1, buf.length)

Expand All @@ -121,6 +126,7 @@ func (buf *Buffer) GetUInt8(offset int32) uint8 {
return *(*uint8)(uptr)
}

//go:norace
func (buf *Buffer) GetUInt16(offset int32) uint16 {
BoundsCheck(offset, 2, buf.length)

Expand All @@ -129,6 +135,7 @@ func (buf *Buffer) GetUInt16(offset int32) uint16 {
return *(*uint16)(uptr)
}

//go:norace
func (buf *Buffer) GetInt32(offset int32) int32 {
BoundsCheck(offset, 4, buf.length)

Expand All @@ -137,6 +144,7 @@ func (buf *Buffer) GetInt32(offset int32) int32 {
return *(*int32)(uptr)
}

//go:norace
func (buf *Buffer) GetInt64(offset int32) int64 {
BoundsCheck(offset, 8, buf.length)

Expand All @@ -145,6 +153,7 @@ func (buf *Buffer) GetInt64(offset int32) int64 {
return *(*int64)(uptr)
}

//go:norace
func (buf *Buffer) PutUInt8(offset int32, value uint8) {
BoundsCheck(offset, 1, buf.length)

Expand All @@ -153,6 +162,7 @@ func (buf *Buffer) PutUInt8(offset int32, value uint8) {
*(*uint8)(uptr) = value
}

//go:norace
func (buf *Buffer) PutInt8(offset int32, value int8) {
BoundsCheck(offset, 1, buf.length)

Expand All @@ -161,6 +171,7 @@ func (buf *Buffer) PutInt8(offset int32, value int8) {
*(*int8)(uptr) = value
}

//go:norace
func (buf *Buffer) PutUInt16(offset int32, value uint16) {
BoundsCheck(offset, 2, buf.length)

Expand All @@ -169,6 +180,7 @@ func (buf *Buffer) PutUInt16(offset int32, value uint16) {
*(*uint16)(uptr) = value
}

//go:norace
func (buf *Buffer) PutInt32(offset int32, value int32) {
BoundsCheck(offset, 4, buf.length)

Expand All @@ -177,6 +189,7 @@ func (buf *Buffer) PutInt32(offset int32, value int32) {
*(*int32)(uptr) = value
}

//go:norace
func (buf *Buffer) PutInt64(offset int32, value int64) {
BoundsCheck(offset, 8, buf.length)

Expand All @@ -185,6 +198,7 @@ func (buf *Buffer) PutInt64(offset int32, value int64) {
*(*int64)(uptr) = value
}

//go:norace
func (buf *Buffer) GetAndAddInt64(offset int32, delta int64) int64 {
BoundsCheck(offset, 8, buf.length)

Expand All @@ -194,6 +208,7 @@ func (buf *Buffer) GetAndAddInt64(offset int32, delta int64) int64 {
return int64(newVal) - delta
}

//go:norace
func (buf *Buffer) GetInt32Volatile(offset int32) int32 {
BoundsCheck(offset, 4, buf.length)

Expand All @@ -203,6 +218,7 @@ func (buf *Buffer) GetInt32Volatile(offset int32) int32 {
return int32(cur)
}

//go:norace
func (buf *Buffer) GetInt64Volatile(offset int32) int64 {
BoundsCheck(offset, 8, buf.length)

Expand All @@ -212,48 +228,55 @@ func (buf *Buffer) GetInt64Volatile(offset int32) int64 {
return int64(cur)
}

//go:norace
func (buf *Buffer) PutInt64Ordered(offset int32, value int64) {
BoundsCheck(offset, 8, buf.length)

uptr := unsafe.Pointer(uintptr(buf.bufferPtr) + uintptr(offset))
atomic.StoreInt64((*int64)(uptr), value)
}

//go:norace
func (buf *Buffer) PutInt32Ordered(offset int32, value int32) {
BoundsCheck(offset, 4, buf.length)

uptr := unsafe.Pointer(uintptr(buf.bufferPtr) + uintptr(offset))
atomic.StoreInt32((*int32)(uptr), value)
}

//go:norace
func (buf *Buffer) PutIntOrdered(offset int32, value int) {
BoundsCheck(offset, 4, buf.length)

uptr := unsafe.Pointer(uintptr(buf.bufferPtr) + uintptr(offset))
atomic.StoreInt32((*int32)(uptr), int32(value))
}

//go:norace
func (buf *Buffer) CompareAndSetInt64(offset int32, expectedValue, updateValue int64) bool {
BoundsCheck(offset, 8, buf.length)

uptr := unsafe.Pointer(uintptr(buf.bufferPtr) + uintptr(offset))
return atomic.CompareAndSwapInt64((*int64)(uptr), expectedValue, updateValue)
}

//go:norace
func (buf *Buffer) CompareAndSetInt32(offset int32, expectedValue, updateValue int32) bool {
BoundsCheck(offset, 4, buf.length)

uptr := unsafe.Pointer(uintptr(buf.bufferPtr) + uintptr(offset))
return atomic.CompareAndSwapInt32((*int32)(uptr), expectedValue, updateValue)
}

//go:norace
func (buf *Buffer) PutBytes(index int32, srcBuffer *Buffer, srcint32 int32, length int32) {
BoundsCheck(index, length, buf.length)
BoundsCheck(srcint32, length, srcBuffer.length)

util.Memcpy(uintptr(buf.bufferPtr)+uintptr(index), uintptr(srcBuffer.bufferPtr)+uintptr(srcint32), length)
}

//go:norace
func (buf *Buffer) GetBytesArray(offset int32, length int32) []byte {
BoundsCheck(offset, length, buf.length)

Expand All @@ -266,6 +289,7 @@ func (buf *Buffer) GetBytesArray(offset int32, length int32) []byte {
return bArr
}

//go:norace
func (buf *Buffer) GetBytes(offset int32, b []byte) {
length := len(b)
BoundsCheck(offset, int32(length), buf.length)
Expand All @@ -278,6 +302,7 @@ func (buf *Buffer) GetBytes(offset int32, b []byte) {

// WriteBytes writes data from offset and length to the given dest buffer. This will
// grow the buffer as needed.
//go:norace
func (buf *Buffer) WriteBytes(dest *bytes.Buffer, offset int32, length int32) {
BoundsCheck(offset, length, buf.length)
// grow the buffer all at once to prevent additional allocations.
Expand All @@ -288,6 +313,7 @@ func (buf *Buffer) WriteBytes(dest *bytes.Buffer, offset int32, length int32) {
}
}

//go:norace
func (buf *Buffer) PutBytesArray(index int32, arr *[]byte, srcint32 int32, length int32) {
BoundsCheck(index, length, buf.length)
BoundsCheck(srcint32, length, int32(len(*arr)))
Expand All @@ -299,6 +325,7 @@ func (buf *Buffer) PutBytesArray(index int32, arr *[]byte, srcint32 int32, lengt

// BoundsCheck is helper function to make sure buffer writes and reads to
// not go out of bounds on stated buffer capacity
//go:norace
func BoundsCheck(index int32, length int32, myLength int32) {
if (index + length) > myLength {
log.Fatal(fmt.Sprintf("Out of Bounds. int32: %d + %d Capacity: %d", index, length, myLength))
Expand Down
1 change: 1 addition & 0 deletions aeron/clientconductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ func (cc *ClientConductor) OnSubscriptionReady(correlationID int64, channelStatu

}

//go:norace
func (cc *ClientConductor) OnAvailableImage(streamID int32, sessionID int32, logFilename string, sourceIdentity string,
subscriberPositionID int32, subsRegID int64, corrID int64) {
logger.Debugf("OnAvailableImage: streamId=%d, sessionId=%d, logFilename=%s, sourceIdentity=%s, subsRegID=%d, corrID=%d",
Expand Down
1 change: 1 addition & 0 deletions aeron/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (image *Image) IsClosed() bool {
return image.isClosed.Get()
}

//go:norace
func (image *Image) Poll(handler term.FragmentHandler, fragmentLimit int) int {
if image.IsClosed() {
return 0
Expand Down
1 change: 1 addition & 0 deletions aeron/logbuffer/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Header struct {
positionBitsToShift int32
}

//go:norace
func (hdr *Header) Wrap(ptr unsafe.Pointer, length int32) *Header {
hdr.buffer.Wrap(ptr, length)
return hdr
Expand Down
2 changes: 2 additions & 0 deletions aeron/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func NewPosition(buffer *atomic.Buffer, id int32) Position {
return pos
}

//go:norace
func (pos *Position) get() int64 {
p := pos.buffer.GetInt64(pos.offset)
return p
}

//go:norace
func (pos *Position) set(value int64) {
pos.buffer.PutInt64(pos.offset, value)
}
3 changes: 3 additions & 0 deletions aeron/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (sub *Subscription) ControlledPoll(handler term.ControlledFragmentHandler,
return fragmentsRead
}

//go:norace
func (sub *Subscription) hasImage(sessionID int32) bool {
img := sub.images.Get()
for _, image := range img {
Expand All @@ -195,6 +196,7 @@ func (sub *Subscription) hasImage(sessionID int32) bool {
return false
}

//go:norace
func (sub *Subscription) addImage(image *Image) *[]Image {

images := sub.images.Get()
Expand All @@ -204,6 +206,7 @@ func (sub *Subscription) addImage(image *Image) *[]Image {
return &images
}

//go:norace
func (sub *Subscription) removeImage(correlationID int64) *Image {

img := sub.images.Get()
Expand Down
1 change: 1 addition & 0 deletions aeron/util/bits.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func IsPowerOfTwo(value int64) bool {
}

// Memcpy will copy length bytes from pointer src to dest
//go:nocheckptr
func Memcpy(dest uintptr, src uintptr, length int32) {
var i int32

Expand Down

0 comments on commit b09b9e2

Please sign in to comment.