From 4cb78b1ec861e4b5722203180becfb1f974cdd69 Mon Sep 17 00:00:00 2001 From: PabloMK7 Date: Tue, 23 Apr 2024 10:25:08 +0200 Subject: [PATCH] Add MutexMap Atomic/RAtomic and fix crash Fix formatting --- mutex_map.go | 156 ++++++++++++++++++++++++++++++++++------------ sliding_window.go | 21 ++++--- 2 files changed, 128 insertions(+), 49 deletions(-) diff --git a/mutex_map.go b/mutex_map.go index 967e033d..adf82017 100644 --- a/mutex_map.go +++ b/mutex_map.go @@ -2,53 +2,42 @@ package nex import "sync" +type MapInterface[K comparable, V any] struct { + real map[K]V +} + // MutexMap implements a map type with go routine safe accessors through mutex locks. Embeds sync.RWMutex type MutexMap[K comparable, V any] struct { - *sync.RWMutex - real map[K]V + mutex *sync.RWMutex + mapInterface MapInterface[K,V] } // Set sets a key to a given value -func (m *MutexMap[K, V]) Set(key K, value V) { - m.Lock() - defer m.Unlock() - +func (m *MapInterface[K, V]) Set(key K, value V) { m.real[key] = value } // Get returns the given key value and a bool if found -func (m *MutexMap[K, V]) Get(key K) (V, bool) { - m.RLock() - defer m.RUnlock() - +func (m *MapInterface[K, V]) Get(key K) (V, bool) { value, ok := m.real[key] return value, ok } // Has checks if a key exists in the map -func (m *MutexMap[K, V]) Has(key K) bool { - m.RLock() - defer m.RUnlock() - +func (m *MapInterface[K, V]) Has(key K) bool { _, ok := m.real[key] return ok } // Delete removes a key from the internal map -func (m *MutexMap[K, V]) Delete(key K) { - m.Lock() - defer m.Unlock() - +func (m *MapInterface[K, V]) Delete(key K) { delete(m.real, key) } // DeleteIf deletes every element if the predicate returns true. // Returns the amount of elements deleted. -func (m *MutexMap[K, V]) DeleteIf(predicate func(key K, value V) bool) int { - m.Lock() - defer m.Unlock() - +func (m *MapInterface[K, V]) DeleteIf(predicate func(key K, value V) bool) int { amount := 0 for key, value := range m.real { if predicate(key, value) { @@ -61,10 +50,7 @@ func (m *MutexMap[K, V]) DeleteIf(predicate func(key K, value V) bool) int { } // RunAndDelete runs a callback and removes the key afterwards -func (m *MutexMap[K, V]) RunAndDelete(key K, callback func(key K, value V)) { - m.Lock() - defer m.Unlock() - +func (m *MapInterface[K, V]) RunAndDelete(key K, callback func(key K, value V)) { if value, ok := m.real[key]; ok { callback(key, value) delete(m.real, key) @@ -72,20 +58,14 @@ func (m *MutexMap[K, V]) RunAndDelete(key K, callback func(key K, value V)) { } // Size returns the length of the internal map -func (m *MutexMap[K, V]) Size() int { - m.RLock() - defer m.RUnlock() - +func (m *MapInterface[K, V]) Size() int { return len(m.real) } // Each runs a callback function for every item in the map // The map should not be modified inside the callback function // Returns true if the loop was terminated early -func (m *MutexMap[K, V]) Each(callback func(key K, value V) bool) bool { - m.RLock() - defer m.RUnlock() - +func (m *MapInterface[K, V]) Each(callback func(key K, value V) bool) bool { for key, value := range m.real { if callback(key, value) { return true @@ -97,10 +77,7 @@ func (m *MutexMap[K, V]) Each(callback func(key K, value V) bool) bool { // Clear removes all items from the `real` map // Accepts an optional callback function ran for every item before it is deleted -func (m *MutexMap[K, V]) Clear(callback func(key K, value V)) { - m.Lock() - defer m.Unlock() - +func (m *MapInterface[K, V]) Clear(callback func(key K, value V)) { for key, value := range m.real { if callback != nil { callback(key, value) @@ -109,10 +86,109 @@ func (m *MutexMap[K, V]) Clear(callback func(key K, value V)) { } } +// Set sets a key to a given value +func (m *MutexMap[K, V]) Set(key K, value V) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.mapInterface.Set(key, value) +} + +// Get returns the given key value and a bool if found +func (m *MutexMap[K, V]) Get(key K) (V, bool) { + m.mutex.RLock() + defer m.mutex.RUnlock() + + value, ok := m.mapInterface.Get(key) + + return value, ok +} + +// Has checks if a key exists in the map +func (m *MutexMap[K, V]) Has(key K) bool { + m.mutex.RLock() + defer m.mutex.RUnlock() + + return m.mapInterface.Has(key) +} + +// Delete removes a key from the internal map +func (m *MutexMap[K, V]) Delete(key K) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.mapInterface.Delete(key) +} + +// DeleteIf deletes every element if the predicate returns true. +// Returns the amount of elements deleted. +func (m *MutexMap[K, V]) DeleteIf(predicate func(key K, value V) bool) int { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.mapInterface.DeleteIf(predicate) +} + +// RunAndDelete runs a callback and removes the key afterwards +func (m *MutexMap[K, V]) RunAndDelete(key K, callback func(key K, value V)) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.mapInterface.RunAndDelete(key, callback) +} + +// Size returns the length of the internal map +func (m *MutexMap[K, V]) Size() int { + m.mutex.RLock() + defer m.mutex.RUnlock() + + return m.mapInterface.Size() +} + +// Each runs a callback function for every item in the map +// The map should not be modified inside the callback function +// Returns true if the loop was terminated early +func (m *MutexMap[K, V]) Each(callback func(key K, value V) bool) bool { + m.mutex.RLock() + defer m.mutex.RUnlock() + + return m.mapInterface.Each(callback) +} + +// Clear removes all items from the `real` map +// Accepts an optional callback function ran for every item before it is deleted +func (m *MutexMap[K, V]) Clear(callback func(key K, value V)) { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.mapInterface.Clear(callback) +} + +// RAtomic read-locks the map and runs the provided callback. All the operations to +// the map interface in the callback will be done atomically. Do not perform write +// operations to the map interface. +func (m *MutexMap[K, V]) RAtomic(callback func(mapInterface *MapInterface[K,V])) { + m.mutex.RLock() + defer m.mutex.RUnlock() + + callback(&m.mapInterface) +} + +// Atomic write-locks the map and runs the provided callback. All the operations to +// the map interface in the callback will be done atomically. +func (m *MutexMap[K, V]) Atomic(callback func(mapInterface *MapInterface[K,V])) { + m.mutex.Lock() + defer m.mutex.Unlock() + + callback(&m.mapInterface) +} + // NewMutexMap returns a new instance of MutexMap with the provided key/value types func NewMutexMap[K comparable, V any]() *MutexMap[K, V] { return &MutexMap[K, V]{ - RWMutex: &sync.RWMutex{}, - real: make(map[K]V), + mutex: &sync.RWMutex{}, + mapInterface: MapInterface[K,V]{ + real: make(map[K]V), + }, } } diff --git a/sliding_window.go b/sliding_window.go index 17835623..e0d793d8 100644 --- a/sliding_window.go +++ b/sliding_window.go @@ -17,16 +17,19 @@ type SlidingWindow struct { func (sw *SlidingWindow) Update(packet PRUDPPacketInterface) []PRUDPPacketInterface { packets := make([]PRUDPPacketInterface, 0) - if packet.SequenceID() >= sw.incomingSequenceIDCounter.Value && !sw.pendingPackets.Has(packet.SequenceID()) { - sw.pendingPackets.Set(packet.SequenceID(), packet) - - for sw.pendingPackets.Has(sw.incomingSequenceIDCounter.Value) { - storedPacket, _ := sw.pendingPackets.Get(sw.incomingSequenceIDCounter.Value) - packets = append(packets, storedPacket) - sw.pendingPackets.Delete(sw.incomingSequenceIDCounter.Value) - sw.incomingSequenceIDCounter.Next() + sw.pendingPackets.Atomic(func(mapInterface *MapInterface[uint16, PRUDPPacketInterface]) { + if packet.SequenceID() >= sw.incomingSequenceIDCounter.Value && !mapInterface.Has(packet.SequenceID()) { + mapInterface.Set(packet.SequenceID(), packet) + + for mapInterface.Has(sw.incomingSequenceIDCounter.Value) { + if storedPacket, ok := mapInterface.Get(sw.incomingSequenceIDCounter.Value); ok { + packets = append(packets, storedPacket) + } + mapInterface.Delete(sw.incomingSequenceIDCounter.Value) + sw.incomingSequenceIDCounter.Next() + } } - } + }) return packets }