-
Notifications
You must be signed in to change notification settings - Fork 0
/
collect.go
83 lines (65 loc) · 2.9 KB
/
collect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package gostreams
import "context"
// DuplicateKeyError is the error used to short-circuit a stream by canceling its context to indicate that
// a key couldn't be added to a map because it already exists.
type DuplicateKeyError[T any, K comparable] struct {
// Element is the upstream producer's element that caused the error.
Element T
// Key is the key that was already in the map.
Key K
}
// CollectSlice returns a collector that collects elements into a slice.
func CollectSlice[T any]() CollectorFunc[T, []T] {
var result []T
return func(_ context.Context, _ context.CancelCauseFunc, elem T, _ uint64) []T {
result = append(result, elem)
return result
}
}
// CollectMap returns a collector that collects elements into a map, using key to map elements to keys,
// and value to map elements to values. If a key is already in the map, it overwrites the map entry.
func CollectMap[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K]V] {
return collectMap[T, K, V](key, value, true)
}
// CollectMapNoDuplicateKeys returns a collector that collects elements into a map, using key to map
// elements to keys, and value to map elements to values. If a key is already in the map, it cancels the stream's context
// with a DuplicateKeyError.
func CollectMapNoDuplicateKeys[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K]V] {
return collectMap[T, K, V](key, value, false)
}
func collectMap[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V], overwriteEntries bool) CollectorFunc[T, map[K]V] {
result := map[K]V{}
return func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) map[K]V {
key := key(ctx, cancel, elem, index)
if !overwriteEntries {
if _, ok := result[key]; ok {
cancel(&DuplicateKeyError[T, K]{
Element: elem,
Key: key,
})
return result
}
}
result[key] = value(ctx, cancel, elem, index)
return result
}
}
// CollectGroup returns a collector that collects elements into a group map, according to key.
// It uses value to map elements to values.
func CollectGroup[T any, K comparable, V any](key MapperFunc[T, K], value MapperFunc[T, V]) CollectorFunc[T, map[K][]V] {
result := map[K][]V{}
return func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) map[K][]V {
key := key(ctx, cancel, elem, index)
result[key] = append(result[key], value(ctx, cancel, elem, index))
return result
}
}
// CollectPartition returns a collector that collects elements into a partition map, according to pred.
// It uses value to map elements to values.
func CollectPartition[T any, V any](pred PredicateFunc[T], value MapperFunc[T, V]) CollectorFunc[T, map[bool][]V] {
return CollectGroup(MapperFunc[T, bool](pred), value)
}
// Error implements error.
func (e *DuplicateKeyError[T, K]) Error() string {
return "duplicate key"
}