Skip to content

Commit

Permalink
fix bad merge
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm committed Aug 26, 2022
1 parent 4780c57 commit f121679
Showing 1 changed file with 0 additions and 126 deletions.
126 changes: 0 additions & 126 deletions sdks/go/pkg/beam/core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,129 +517,3 @@ func MakeMapState[K comparable, V any](k string) Map[K, V] {
Key: k,
}
}

// Map is used to read and write global pipeline state representing a map.
// Key represents the key used to lookup this state (not the key of map entries).
type Map[K comparable, V any] struct {
Key string
}

// Put is used to write a key/value pair to this instance of global map state.
func (s *Map[K, V]) Put(p Provider, key K, val V) error {
return p.WriteMapState(Transaction{
Key: s.Key,
Type: TransactionTypeSet,
MapKey: key,
Val: val,
})
}

// Keys is used to read the keys of this map state.
// When a value is not found, returns an empty list and false.
func (s *Map[K, V]) Keys(p Provider) ([]K, bool, error) {
// This replays any writes that have happened to this value since we last read
// For more detail, see "State Transactionality" below for buffered transactions
initialValue, bufferedTransactions, err := p.ReadMapStateKeys(s.Key)
if err != nil {
return []K{}, false, err
}
cur := []K{}
for _, v := range initialValue {
cur = append(cur, v.(K))
}
for _, t := range bufferedTransactions {
switch t.Type {
case TransactionTypeSet:
seen := false
mk := t.MapKey.(K)
for _, k := range cur {
if k == mk {
seen = true
}
}
if !seen {
cur = append(cur, mk)
}
case TransactionTypeClear:
if t.MapKey == nil {
cur = []K{}
} else {
k := t.MapKey.(K)
for idx, v := range cur {
if v == k {
// Remove this key since its been cleared
cur[idx] = cur[len(cur)-1]
cur = cur[:len(cur)-1]
break
}
}
}
}
}
if len(cur) == 0 {
return cur, false, nil
}
return cur, true, nil
}

// Get is used to read a value given a key.
// When a value is not found, returns the 0 value and false.
func (s *Map[K, V]) Get(p Provider, key K) (V, bool, error) {
// This replays any writes that have happened to this value since we last read
// For more detail, see "State Transactionality" below for buffered transactions
cur, bufferedTransactions, err := p.ReadMapStateValue(s.Key, key)
if err != nil {
var val V
return val, false, err
}
for _, t := range bufferedTransactions {
switch t.Type {
case TransactionTypeSet:
if t.MapKey.(K) == key {
cur = t.Val
}
case TransactionTypeClear:
if t.MapKey == nil || t.MapKey.(K) == key {
cur = nil
}
}
}
if cur == nil {
var val V
return val, false, nil
}
return cur.(V), true, nil
}

// StateKey returns the key for this pipeline state entry.
func (s Map[K, V]) StateKey() string {
if s.Key == "" {
// TODO(#22736) - infer the state from the member variable name during pipeline construction.
panic("Value state exists on struct but has not been initialized with a key.")
}
return s.Key
}

// KeyCoderType returns the type of the value state which should be used for a coder for map keys.
func (s Map[K, V]) KeyCoderType() reflect.Type {
var k K
return reflect.TypeOf(k)
}

// CoderType returns the type of the value state which should be used for a coder for map values.
func (s Map[K, V]) CoderType() reflect.Type {
var v V
return reflect.TypeOf(v)
}

// StateType returns the type of the state (in this case always Map).
func (s Map[K, V]) StateType() TypeEnum {
return TypeMap
}

// MakeValueState is a factory function to create an instance of ValueState with the given key.
func MakeMapState[K comparable, V any](k string) Map[K, V] {
return Map[K, V]{
Key: k,
}
}

0 comments on commit f121679

Please sign in to comment.