Skip to content

Commit

Permalink
Checkpoint. Some renaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
bobg committed Jul 26, 2024
1 parent dbb0b29 commit 5045e79
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 49 deletions.
7 changes: 3 additions & 4 deletions parallel/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ func Values[T any, F ~func(context.Context, int) (T, error)](ctx context.Context
//
// The caller gets an iterator over the values produced
// and a non-nil pointer to an error.
// Once the iterator has been consumed,
// the caller may dereference the error pointer to see if any worker failed.
// There is the risk of a data race if the caller dereferences the error pointer before the iterator is consumed.
// The caller may dereference the error pointer to see if any worker failed,
// but not before the iterator has been fully consumed.
// The error (if there is one) is of type [Error],
// whose N field indicates which worker failed.
func Producers[T any, F ~func(context.Context, int, func(T) error) error](ctx context.Context, n int, f F) (iter.Seq[T], *error) {
Expand Down Expand Up @@ -104,7 +103,7 @@ func Producers[T any, F ~func(context.Context, int, func(T) error) error](ctx co
close(ch)
}()

return seqs.Chan(ch), &err
return seqs.FromChan(ch), &err
}

// Consumers launches n parallel workers each consuming values supplied by the caller.
Expand Down
34 changes: 26 additions & 8 deletions seqs/accum_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
package seqs

import (
"fmt"
"slices"
"testing"
)

func TestAccum(t *testing.T) {
var (
inp = slices.Values([]int{1, 2, 3, 4})
a = Accum(inp, func(a, b int) int { return a + b })
got = slices.Collect(a)
want = []int{1, 3, 6, 10}
)
if !slices.Equal(got, want) {
t.Errorf("got %v, want [1 3 6 10]", got)
cases := []struct {
inp []int
want []int
}{{}, {
inp: []int{1},
want: []int{1},
}, {
inp: []int{1, 2},
want: []int{1, 3},
}, {
inp: []int{1, 2, 3},
want: []int{1, 3, 6},
}}

for i, tc := range cases {
t.Run(fmt.Sprintf("case_%02d", i+1), func(t *testing.T) {
var (
inp = slices.Values(tc.inp)
a = Accum(inp, func(a, b int) int { return a + b })
got = slices.Collect(a)
)
if !slices.Equal(got, tc.want) {
t.Errorf("got %v, want %v", got, tc.want)
}
})
}
}
12 changes: 7 additions & 5 deletions seqs/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"iter"
)

// Chan produces an [iter.Seq] over the contents of a channel.
func Chan[T any](inp <-chan T) iter.Seq[T] {
// FromChan produces an [iter.Seq] over the contents of a channel.
func FromChan[T any](inp <-chan T) iter.Seq[T] {
return func(yield func(T) bool) {
for x := range inp {
if !yield(x) {
Expand All @@ -16,13 +16,13 @@ func Chan[T any](inp <-chan T) iter.Seq[T] {
}
}

// ChanContext produces an [iter.Seq] over the contents of a channel.
// FromChanContext produces an [iter.Seq] over the contents of a channel.
// It stops at the end of the channel or when the given context is canceled.
//
// The caller can dereference the returned error pointer to check for errors
// (such as context cancellation),
// but only after iteration is done.
func ChanContext[T any](ctx context.Context, inp <-chan T) (iter.Seq[T], *error) {
func FromChanContext[T any](ctx context.Context, inp <-chan T) (iter.Seq[T], *error) {
var err error

f := func(yield func(T) bool) {
Expand Down Expand Up @@ -76,9 +76,11 @@ func ToChanContext[T any](ctx context.Context, f iter.Seq[T]) (<-chan T, *error)
defer close(ch)

for val := range f {
// This extra check helps to ensure that context cancellation "wins" when both cases in the select can proceed.
if err = ctx.Err(); err != nil {
return
}

select {
case ch <- val:
// OK, do nothing.
Expand Down Expand Up @@ -107,5 +109,5 @@ func Go[T any, F ~func(chan<- T) error](f F) (iter.Seq[T], *error) {
close(ch)
}()

return Chan(ch), &err
return FromChan(ch), &err
}
10 changes: 5 additions & 5 deletions seqs/chan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestToChan(t *testing.T) {
func TestToChanContext(t *testing.T) {
var (
ch1 = make(chan int, 1)
seq1 = Chan(ch1)
seq1 = FromChan(ch1)
ctx = context.Background()
)
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestToChanContext(t *testing.T) {
}
}

func TestChan(t *testing.T) {
func TestFromChan(t *testing.T) {
ch := make(chan int)
go func() {
for i := 0; i < 3; i++ {
Expand All @@ -73,7 +73,7 @@ func TestChan(t *testing.T) {
}()

var (
seq = Chan(ch)
seq = FromChan(ch)
got = slices.Collect(seq)
want = []int{0, 1, 2}
)
Expand All @@ -82,7 +82,7 @@ func TestChan(t *testing.T) {
}
}

func TestChanContext(t *testing.T) {
func TestFromChanContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -98,7 +98,7 @@ func TestChanContext(t *testing.T) {
}
}()

it, errptr := ChanContext(ctx, ch)
it, errptr := FromChanContext(ctx, ch)
next, stop := iter.Pull(it)
defer stop()
if _, ok := next(); !ok {
Expand Down
90 changes: 78 additions & 12 deletions seqs/seqs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package seqs

import (
"cmp"
"iter"
"slices"
)
Expand All @@ -16,8 +17,8 @@ type Pair[T, U any] struct {
Y U
}

// Seq1 changes an [iter.Seq2] to an [iter.Seq] of [Pair]s.
func Seq1[T, U any](inp iter.Seq2[T, U]) iter.Seq[Pair[T, U]] {
// ToPairs changes an [iter.Seq2] to an [iter.Seq] of [Pair]s.
func ToPairs[T, U any](inp iter.Seq2[T, U]) iter.Seq[Pair[T, U]] {
return func(yield func(Pair[T, U]) bool) {
for x, y := range inp {
if !yield(Pair[T, U]{X: x, Y: y}) {
Expand All @@ -27,6 +28,28 @@ func Seq1[T, U any](inp iter.Seq2[T, U]) iter.Seq[Pair[T, U]] {
}
}

// Left changes an [iter.Seq2] to an [iter.Seq] by dropping the second value.
func Left[T, U any](inp iter.Seq2[T, U]) iter.Seq[T] {
return func(yield func(T) bool) {
for x := range inp {
if !yield(x) {
return
}
}
}
}

// Right changes an [iter.Seq2] to an [iter.Seq] by dropping the first value.
func Right[T, U any](inp iter.Seq2[T, U]) iter.Seq[U] {
return func(yield func(U) bool) {
for _, y := range inp {
if !yield(y) {
return
}
}
}
}

// Enumerate changes an [iter.Seq] to an [iter.Seq2] of (index, val) pairs.
func Enumerate[T any](inp iter.Seq[T]) iter.Seq2[int, T] {
return func(yield func(int, T) bool) {
Expand All @@ -40,8 +63,8 @@ func Enumerate[T any](inp iter.Seq[T]) iter.Seq2[int, T] {
}
}

// Seq2 changes an [iter.Seq] of [Pair]s to an [iter.Seq2].
func Seq2[T, U any](inp iter.Seq[Pair[T, U]]) iter.Seq2[T, U] {
// FromPairs changes an [iter.Seq] of [Pair]s to an [iter.Seq2].
func FromPairs[T, U any](inp iter.Seq[Pair[T, U]]) iter.Seq2[T, U] {
return func(yield func(T, U) bool) {
for val := range inp {
if !yield(val.X, val.Y) {
Expand All @@ -51,20 +74,63 @@ func Seq2[T, U any](inp iter.Seq[Pair[T, U]]) iter.Seq2[T, U] {
}
}

// String produces an [iter.Seq2] over position-rune pairs in a string.
// The position of each rune is measured in bytes from the beginning of the string.
func String(inp string) iter.Seq2[int, rune] {
return func(yield func(int, rune) bool) {
for i, r := range inp {
if !yield(i, r) {
return
}
// Compare performs an elementwise comparison of two sequences.
// It returns the result of [cmp.Compare] on the first pair of unequal elements.
// If a ends before b, Compare returns -1.
// If b ends before a, Compare returns 1.
// If the sequences are equal, Compare returns 0.
func Compare[T cmp.Ordered](a, b iter.Seq[T]) int {
return CompareFunc(a, b, cmp.Compare)
}

// CompareFunc performs an elementwise comparison of two sequences, using a custom comparison function.
// The function should return a negative number if the first argument is less than the second,
// a positive number if the first argument is greater than the second,
// and zero if the arguments are equal.
//
// CompareFunc returns the result of f on the first pair of unequal elements.
// If a ends before b, CompareFunc returns -1.
// If b ends before a, CompareFunc returns 1.
// If the sequences are equal, CompareFunc returns 0.
func CompareFunc[T any](a, b iter.Seq[T], f func(T, T) int) int {
anext, astop := iter.Pull(a)
defer astop()

bnext, bstop := iter.Pull(b)
defer bstop()

aOK, bOK := true, true

var aVal, bVal T

for {
if aOK {
aVal, aOK = anext()
}
if bOK {
bVal, bOK = bnext()
}
if !aOK && !bOK {
return 0
}
if !aOK {
return -1
}
if !bOK {
return 1
}
if cmp := f(aVal, bVal); cmp != 0 {
return cmp
}
}
}

// Empty is an empty sequence that can be used where an [iter.Seq] is expected.
// Usage note: you generally don't want to call this function,
// just refer to it as Empty[typename].
func Empty[T any](func(T) bool) {}

// Empty2 is an empty sequence that can be used where an [iter.Seq2] is expected.
// Usage note: you generally don't want to call this function,
// just refer to it as Empty2[typename1, typename2].
func Empty2[T, U any](func(T, U) bool) {}
Loading

0 comments on commit 5045e79

Please sign in to comment.