Skip to content

Commit

Permalink
Add MutexMap Atomic/RAtomic and fix crash
Browse files Browse the repository at this point in the history
Fix formatting
  • Loading branch information
PabloMK7 committed Apr 27, 2024
1 parent 5f38bf3 commit 4cb78b1
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 49 deletions.
156 changes: 116 additions & 40 deletions mutex_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,42 @@ package nex

import "sync"

type MapInterface[K comparable, V any] struct {
real map[K]V
}

// MutexMap implements a map type with go routine safe accessors through mutex locks. Embeds sync.RWMutex
type MutexMap[K comparable, V any] struct {
*sync.RWMutex
real map[K]V
mutex *sync.RWMutex
mapInterface MapInterface[K,V]
}

// Set sets a key to a given value
func (m *MutexMap[K, V]) Set(key K, value V) {
m.Lock()
defer m.Unlock()

func (m *MapInterface[K, V]) Set(key K, value V) {
m.real[key] = value
}

// Get returns the given key value and a bool if found
func (m *MutexMap[K, V]) Get(key K) (V, bool) {
m.RLock()
defer m.RUnlock()

func (m *MapInterface[K, V]) Get(key K) (V, bool) {
value, ok := m.real[key]

return value, ok
}

// Has checks if a key exists in the map
func (m *MutexMap[K, V]) Has(key K) bool {
m.RLock()
defer m.RUnlock()

func (m *MapInterface[K, V]) Has(key K) bool {
_, ok := m.real[key]
return ok
}

// Delete removes a key from the internal map
func (m *MutexMap[K, V]) Delete(key K) {
m.Lock()
defer m.Unlock()

func (m *MapInterface[K, V]) Delete(key K) {
delete(m.real, key)
}

// DeleteIf deletes every element if the predicate returns true.
// Returns the amount of elements deleted.
func (m *MutexMap[K, V]) DeleteIf(predicate func(key K, value V) bool) int {
m.Lock()
defer m.Unlock()

func (m *MapInterface[K, V]) DeleteIf(predicate func(key K, value V) bool) int {
amount := 0
for key, value := range m.real {
if predicate(key, value) {
Expand All @@ -61,31 +50,22 @@ func (m *MutexMap[K, V]) DeleteIf(predicate func(key K, value V) bool) int {
}

// RunAndDelete runs a callback and removes the key afterwards
func (m *MutexMap[K, V]) RunAndDelete(key K, callback func(key K, value V)) {
m.Lock()
defer m.Unlock()

func (m *MapInterface[K, V]) RunAndDelete(key K, callback func(key K, value V)) {
if value, ok := m.real[key]; ok {
callback(key, value)
delete(m.real, key)
}
}

// Size returns the length of the internal map
func (m *MutexMap[K, V]) Size() int {
m.RLock()
defer m.RUnlock()

func (m *MapInterface[K, V]) Size() int {
return len(m.real)
}

// Each runs a callback function for every item in the map
// The map should not be modified inside the callback function
// Returns true if the loop was terminated early
func (m *MutexMap[K, V]) Each(callback func(key K, value V) bool) bool {
m.RLock()
defer m.RUnlock()

func (m *MapInterface[K, V]) Each(callback func(key K, value V) bool) bool {
for key, value := range m.real {
if callback(key, value) {
return true
Expand All @@ -97,10 +77,7 @@ func (m *MutexMap[K, V]) Each(callback func(key K, value V) bool) bool {

// Clear removes all items from the `real` map
// Accepts an optional callback function ran for every item before it is deleted
func (m *MutexMap[K, V]) Clear(callback func(key K, value V)) {
m.Lock()
defer m.Unlock()

func (m *MapInterface[K, V]) Clear(callback func(key K, value V)) {
for key, value := range m.real {
if callback != nil {
callback(key, value)
Expand All @@ -109,10 +86,109 @@ func (m *MutexMap[K, V]) Clear(callback func(key K, value V)) {
}
}

// Set sets a key to a given value
func (m *MutexMap[K, V]) Set(key K, value V) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.mapInterface.Set(key, value)
}

// Get returns the given key value and a bool if found
func (m *MutexMap[K, V]) Get(key K) (V, bool) {
m.mutex.RLock()
defer m.mutex.RUnlock()

value, ok := m.mapInterface.Get(key)

return value, ok
}

// Has checks if a key exists in the map
func (m *MutexMap[K, V]) Has(key K) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()

return m.mapInterface.Has(key)
}

// Delete removes a key from the internal map
func (m *MutexMap[K, V]) Delete(key K) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.mapInterface.Delete(key)
}

// DeleteIf deletes every element if the predicate returns true.
// Returns the amount of elements deleted.
func (m *MutexMap[K, V]) DeleteIf(predicate func(key K, value V) bool) int {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.mapInterface.DeleteIf(predicate)
}

// RunAndDelete runs a callback and removes the key afterwards
func (m *MutexMap[K, V]) RunAndDelete(key K, callback func(key K, value V)) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.mapInterface.RunAndDelete(key, callback)
}

// Size returns the length of the internal map
func (m *MutexMap[K, V]) Size() int {
m.mutex.RLock()
defer m.mutex.RUnlock()

return m.mapInterface.Size()
}

// Each runs a callback function for every item in the map
// The map should not be modified inside the callback function
// Returns true if the loop was terminated early
func (m *MutexMap[K, V]) Each(callback func(key K, value V) bool) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()

return m.mapInterface.Each(callback)
}

// Clear removes all items from the `real` map
// Accepts an optional callback function ran for every item before it is deleted
func (m *MutexMap[K, V]) Clear(callback func(key K, value V)) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.mapInterface.Clear(callback)
}

// RAtomic read-locks the map and runs the provided callback. All the operations to
// the map interface in the callback will be done atomically. Do not perform write
// operations to the map interface.
func (m *MutexMap[K, V]) RAtomic(callback func(mapInterface *MapInterface[K,V])) {
m.mutex.RLock()
defer m.mutex.RUnlock()

callback(&m.mapInterface)
}

// Atomic write-locks the map and runs the provided callback. All the operations to
// the map interface in the callback will be done atomically.
func (m *MutexMap[K, V]) Atomic(callback func(mapInterface *MapInterface[K,V])) {
m.mutex.Lock()
defer m.mutex.Unlock()

callback(&m.mapInterface)
}

// NewMutexMap returns a new instance of MutexMap with the provided key/value types
func NewMutexMap[K comparable, V any]() *MutexMap[K, V] {
return &MutexMap[K, V]{
RWMutex: &sync.RWMutex{},
real: make(map[K]V),
mutex: &sync.RWMutex{},
mapInterface: MapInterface[K,V]{
real: make(map[K]V),
},
}
}
21 changes: 12 additions & 9 deletions sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ type SlidingWindow struct {
func (sw *SlidingWindow) Update(packet PRUDPPacketInterface) []PRUDPPacketInterface {
packets := make([]PRUDPPacketInterface, 0)

if packet.SequenceID() >= sw.incomingSequenceIDCounter.Value && !sw.pendingPackets.Has(packet.SequenceID()) {
sw.pendingPackets.Set(packet.SequenceID(), packet)

for sw.pendingPackets.Has(sw.incomingSequenceIDCounter.Value) {
storedPacket, _ := sw.pendingPackets.Get(sw.incomingSequenceIDCounter.Value)
packets = append(packets, storedPacket)
sw.pendingPackets.Delete(sw.incomingSequenceIDCounter.Value)
sw.incomingSequenceIDCounter.Next()
sw.pendingPackets.Atomic(func(mapInterface *MapInterface[uint16, PRUDPPacketInterface]) {
if packet.SequenceID() >= sw.incomingSequenceIDCounter.Value && !mapInterface.Has(packet.SequenceID()) {
mapInterface.Set(packet.SequenceID(), packet)

for mapInterface.Has(sw.incomingSequenceIDCounter.Value) {
if storedPacket, ok := mapInterface.Get(sw.incomingSequenceIDCounter.Value); ok {
packets = append(packets, storedPacket)
}
mapInterface.Delete(sw.incomingSequenceIDCounter.Value)
sw.incomingSequenceIDCounter.Next()
}
}
}
})

return packets
}
Expand Down

0 comments on commit 4cb78b1

Please sign in to comment.