diff --git a/sdks/go/pkg/beam/core/state/state.go b/sdks/go/pkg/beam/core/state/state.go index 83649b08152d7..2e700250c849f 100644 --- a/sdks/go/pkg/beam/core/state/state.go +++ b/sdks/go/pkg/beam/core/state/state.go @@ -517,129 +517,3 @@ func MakeMapState[K comparable, V any](k string) Map[K, V] { Key: k, } } - -// Map is used to read and write global pipeline state representing a map. -// Key represents the key used to lookup this state (not the key of map entries). -type Map[K comparable, V any] struct { - Key string -} - -// Put is used to write a key/value pair to this instance of global map state. -func (s *Map[K, V]) Put(p Provider, key K, val V) error { - return p.WriteMapState(Transaction{ - Key: s.Key, - Type: TransactionTypeSet, - MapKey: key, - Val: val, - }) -} - -// Keys is used to read the keys of this map state. -// When a value is not found, returns an empty list and false. -func (s *Map[K, V]) Keys(p Provider) ([]K, bool, error) { - // This replays any writes that have happened to this value since we last read - // For more detail, see "State Transactionality" below for buffered transactions - initialValue, bufferedTransactions, err := p.ReadMapStateKeys(s.Key) - if err != nil { - return []K{}, false, err - } - cur := []K{} - for _, v := range initialValue { - cur = append(cur, v.(K)) - } - for _, t := range bufferedTransactions { - switch t.Type { - case TransactionTypeSet: - seen := false - mk := t.MapKey.(K) - for _, k := range cur { - if k == mk { - seen = true - } - } - if !seen { - cur = append(cur, mk) - } - case TransactionTypeClear: - if t.MapKey == nil { - cur = []K{} - } else { - k := t.MapKey.(K) - for idx, v := range cur { - if v == k { - // Remove this key since its been cleared - cur[idx] = cur[len(cur)-1] - cur = cur[:len(cur)-1] - break - } - } - } - } - } - if len(cur) == 0 { - return cur, false, nil - } - return cur, true, nil -} - -// Get is used to read a value given a key. -// When a value is not found, returns the 0 value and false. -func (s *Map[K, V]) Get(p Provider, key K) (V, bool, error) { - // This replays any writes that have happened to this value since we last read - // For more detail, see "State Transactionality" below for buffered transactions - cur, bufferedTransactions, err := p.ReadMapStateValue(s.Key, key) - if err != nil { - var val V - return val, false, err - } - for _, t := range bufferedTransactions { - switch t.Type { - case TransactionTypeSet: - if t.MapKey.(K) == key { - cur = t.Val - } - case TransactionTypeClear: - if t.MapKey == nil || t.MapKey.(K) == key { - cur = nil - } - } - } - if cur == nil { - var val V - return val, false, nil - } - return cur.(V), true, nil -} - -// StateKey returns the key for this pipeline state entry. -func (s Map[K, V]) StateKey() string { - if s.Key == "" { - // TODO(#22736) - infer the state from the member variable name during pipeline construction. - panic("Value state exists on struct but has not been initialized with a key.") - } - return s.Key -} - -// KeyCoderType returns the type of the value state which should be used for a coder for map keys. -func (s Map[K, V]) KeyCoderType() reflect.Type { - var k K - return reflect.TypeOf(k) -} - -// CoderType returns the type of the value state which should be used for a coder for map values. -func (s Map[K, V]) CoderType() reflect.Type { - var v V - return reflect.TypeOf(v) -} - -// StateType returns the type of the state (in this case always Map). -func (s Map[K, V]) StateType() TypeEnum { - return TypeMap -} - -// MakeValueState is a factory function to create an instance of ValueState with the given key. -func MakeMapState[K comparable, V any](k string) Map[K, V] { - return Map[K, V]{ - Key: k, - } -}