Skip to content

Commit

Permalink
dslx: post-Func.Apply(Maybe[T])-introduction cleanups (#1384)
Browse files Browse the repository at this point in the history
This diff contains cleanups in dslx after we modified Func.Apply to take
a Maybe[T] as input:

- we don't need anymore to record the failed operation because we can
compute that inline, given that now we can write post-operation filters
that observe whether the operation failed or succeeded;

- likewise, we don't need to use counters anymore;

- likewise, we can remove functions to inspect the first error, because
we can compute that inline.

The reference issue is ooni/probe#2613.
  • Loading branch information
bassosimone authored Oct 25, 2023
1 parent 227bea1 commit e701ddb
Show file tree
Hide file tree
Showing 10 changed files with 0 additions and 192 deletions.
3 changes: 0 additions & 3 deletions internal/dslx/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/ooni/probe-cli/v3/internal/logx"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)

// DomainName is a domain name to resolve.
Expand Down Expand Up @@ -107,7 +106,6 @@ func DNSLookupGetaddrinfo(rt Runtime) Func[*DomainToResolve, *ResolvedAddresses]
return &Maybe[*ResolvedAddresses]{
Error: err,
Observations: maybeTraceToObservations(trace),
Operation: netxlite.ResolveOperation,
State: state,
}
})
Expand Down Expand Up @@ -156,7 +154,6 @@ func DNSLookupUDP(rt Runtime, endpoint string) Func[*DomainToResolve, *ResolvedA
return &Maybe[*ResolvedAddresses]{
Error: err,
Observations: maybeTraceToObservations(trace),
Operation: netxlite.ResolveOperation,
State: state,
}
})
Expand Down
72 changes: 0 additions & 72 deletions internal/dslx/fxcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ package dslx

import (
"context"
"sync/atomic"

"github.com/ooni/probe-cli/v3/internal/netxlite"
"github.com/ooni/probe-cli/v3/internal/runtimex"
)

Expand All @@ -26,7 +24,6 @@ func (op Operation[A, B]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[B] {
return &Maybe[B]{
Error: a.Error,
Observations: a.Observations,
Operation: a.Operation,
State: *new(B), // zero value
}
}
Expand All @@ -42,9 +39,6 @@ type Maybe[State any] struct {
// Observations contains the collected observations.
Observations []*Observations

// Operation contains the name of this operation.
Operation string

// State contains state passed between function calls. You should
// only access State when Error is nil and Skipped is false.
State State
Expand All @@ -55,7 +49,6 @@ func NewMaybeWithValue[State any](value State) *Maybe[State] {
return &Maybe[State]{
Error: nil,
Observations: []*Observations{},
Operation: "",
State: value,
}
}
Expand Down Expand Up @@ -83,81 +76,16 @@ func (h *compose2Func[A, B, C]) Apply(ctx context.Context, a *Maybe[A]) *Maybe[C
return &Maybe[C]{
Error: mb.Error,
Observations: mb.Observations,
Operation: mb.Operation,
State: *new(C), // zero value
}
}

mc := h.g.Apply(ctx, mb)
runtimex.Assert(mc != nil, "h.g.Apply returned a nil pointer")

op := mc.Operation
if op == "" { // propagate the previous operation name, if this operation has none
op = mb.Operation
}
return &Maybe[C]{
Error: mc.Error,
Observations: append(mb.Observations, mc.Observations...), // merge observations
Operation: op,
State: mc.State,
}
}

// NewCounter generates an instance of *Counter
func NewCounter[T any]() *Counter[T] {
return &Counter[T]{}
}

// Counter allows to count how many times
// a Func[T, *Maybe[T]] is invoked.
type Counter[T any] struct {
n atomic.Int64
}

// Value returns the counter's value.
func (c *Counter[T]) Value() int64 {
return c.n.Load()
}

// Func returns a Func[T, *Maybe[T]] that updates the counter.
func (c *Counter[T]) Func() Func[T, T] {
return Operation[T, T](func(ctx context.Context, value T) *Maybe[T] {
c.n.Add(1)
return &Maybe[T]{
Error: nil,
Observations: nil,
Operation: "", // we cannot fail, so no need to store operation name
State: value,
}
})
}

// FirstErrorExcludingBrokenIPv6Errors returns the first error and failed operation in a list of
// *Maybe[T] excluding errors known to be linked with IPv6 issues.
func FirstErrorExcludingBrokenIPv6Errors[T any](entries ...*Maybe[T]) (string, error) {
for _, entry := range entries {
if entry.Error == nil {
continue
}
err := entry.Error
switch err.Error() {
case netxlite.FailureNetworkUnreachable, netxlite.FailureHostUnreachable:
// This class of errors is often times linked with wrongly
// configured IPv6, therefore we skip them.
default:
return entry.Operation, err
}
}
return "", nil
}

// FirstError returns the first error and failed operation in a list of *Maybe[T].
func FirstError[T any](entries ...*Maybe[T]) (string, error) {
for _, entry := range entries {
if entry.Error == nil {
continue
}
return entry.Operation, entry.Error
}
return "", nil
}
109 changes: 0 additions & 109 deletions internal/dslx/fxcore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func (f *fn) Apply(ctx context.Context, i *Maybe[int]) *Maybe[int] {
NetworkEvents: []*model.ArchivalNetworkEvent{{Tags: []string{"apply"}}},
},
},
Operation: f.name,
}
}

Expand All @@ -52,7 +51,6 @@ func TestStageAdapter(t *testing.T) {
input := &Maybe[*DomainToResolve]{
Error: errors.New("mocked error"),
Observations: []*Observations{},
Operation: "",
State: nil,
}

Expand Down Expand Up @@ -92,9 +90,6 @@ func TestCompose2(t *testing.T) {
if r.Error != tt.err {
t.Fatalf("unexpected error")
}
if tt.err != nil && r.Operation != "maybe fail" {
t.Fatalf("unexpected operation string")
}
if len(r.Observations) != tt.numObs {
t.Fatalf("unexpected number of (merged) observations")
}
Expand All @@ -115,9 +110,6 @@ func TestGen(t *testing.T) {
if r.State != 14 {
t.Fatalf("unexpected result state")
}
if r.Operation != "succeed" {
t.Fatal("unexpected operation string")
}
})
}

Expand All @@ -137,104 +129,3 @@ func TestObservations(t *testing.T) {
}
})
}

/*
Test cases:
- Success counter:
- pipeline succeeds
- pipeline fails
*/
func TestCounter(t *testing.T) {
t.Run("Success counter", func(t *testing.T) {
tests := map[string]struct {
err error
expect int64
}{
"pipeline succeeds": {err: nil, expect: 1},
"pipeline fails": {err: errors.New("mocked"), expect: 0},
}
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
fn := getFn(tt.err, "maybe fail")
cnt := NewCounter[int]()
composit := Compose2(fn, cnt.Func())
r := composit.Apply(context.Background(), NewMaybeWithValue(42))
cntVal := cnt.Value()
if cntVal != tt.expect {
t.Fatalf("unexpected counter value")
}
if r.Operation != "maybe fail" {
t.Fatal("unexpected operation string")
}
})
}
})
}

/*
Test cases:
- Extract first error from list of *Maybe:
- without errors
- with errors
- Extract first error excluding broken IPv6 errors:
- without errors
- with errors
*/
func TestFirstError(t *testing.T) {
networkUnreachable := errors.New(netxlite.FailureNetworkUnreachable)
mockErr := errors.New("mocked")
errRes := []*Maybe[string]{
{Error: nil, Operation: "succeeds"},
{Error: networkUnreachable, Operation: "broken IPv6"},
{Error: mockErr, Operation: "mock error"},
}
noErrRes := []*Maybe[int64]{
{Error: nil, Operation: "succeeds"},
{Error: nil, Operation: "succeeds"},
}

t.Run("Extract first error from list of *Maybe", func(t *testing.T) {
t.Run("without errors", func(t *testing.T) {
failedOp, firstErr := FirstError(noErrRes...)
if firstErr != nil {
t.Fatalf("unexpected error: %s", firstErr)
}
if failedOp != "" {
t.Fatalf("unexpected failed operation")
}
})

t.Run("with errors", func(t *testing.T) {
failedOp, firstErr := FirstError(errRes...)
if firstErr != networkUnreachable {
t.Fatalf("unexpected error: %s", firstErr)
}
if failedOp != "broken IPv6" {
t.Fatalf("unexpected failed operation")
}
})
})

t.Run("Extract first error excluding broken IPv6 errors", func(t *testing.T) {
t.Run("without errors", func(t *testing.T) {
failedOp, firstErrExclIPv6 := FirstErrorExcludingBrokenIPv6Errors(noErrRes...)
if firstErrExclIPv6 != nil {
t.Fatalf("unexpected error: %s", firstErrExclIPv6)
}
if failedOp != "" {
t.Fatalf("unexpected failed operation")
}
})

t.Run("with errors", func(t *testing.T) {
failedOp, firstErrExclIPv6 := FirstErrorExcludingBrokenIPv6Errors(errRes...)
if firstErrExclIPv6 != mockErr {
t.Fatalf("unexpected error: %s", firstErrExclIPv6)
}
if failedOp != "mock error" {
t.Fatalf("unexpected failed operation")
}
})
})
}
1 change: 0 additions & 1 deletion internal/dslx/httpcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func HTTPRequest(rt Runtime, options ...HTTPRequestOption) Func[*HTTPConnection,
return &Maybe[*HTTPResponse]{
Error: err,
Observations: observations,
Operation: netxlite.HTTPRoundTripOperation,
State: state,
}
})
Expand Down
1 change: 0 additions & 1 deletion internal/dslx/httpquic.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func HTTPConnectionQUIC(rt Runtime) Func[*QUICConnection, *HTTPConnection] {
return &Maybe[*HTTPConnection]{
Error: nil,
Observations: nil,
Operation: "", // we cannot fail, so no need to store operation name
State: state,
}
})
Expand Down
1 change: 0 additions & 1 deletion internal/dslx/httptcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func HTTPConnectionTCP(rt Runtime) Func[*TCPConnection, *HTTPConnection] {
return &Maybe[*HTTPConnection]{
Error: nil,
Observations: nil,
Operation: "", // we cannot fail, so no need to store operation name
State: state,
}
})
Expand Down
1 change: 0 additions & 1 deletion internal/dslx/httptls.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func HTTPConnectionTLS(rt Runtime) Func[*TLSConnection, *HTTPConnection] {
return &Maybe[*HTTPConnection]{
Error: nil,
Observations: nil,
Operation: "", // we cannot fail, so no need to store operation name
State: state,
}
})
Expand Down
1 change: 0 additions & 1 deletion internal/dslx/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func QUICHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*Endpoint, *Q
return &Maybe[*QUICConnection]{
Error: err,
Observations: maybeTraceToObservations(trace),
Operation: netxlite.QUICHandshakeOperation,
State: state,
}
})
Expand Down
2 changes: 0 additions & 2 deletions internal/dslx/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/ooni/probe-cli/v3/internal/logx"
"github.com/ooni/probe-cli/v3/internal/netxlite"
)

// TCPConnect returns a function that establishes TCP connections.
Expand Down Expand Up @@ -55,7 +54,6 @@ func TCPConnect(rt Runtime) Func[*Endpoint, *TCPConnection] {
return &Maybe[*TCPConnection]{
Error: err,
Observations: maybeTraceToObservations(trace),
Operation: netxlite.ConnectOperation,
State: state,
}
})
Expand Down
1 change: 0 additions & 1 deletion internal/dslx/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func TLSHandshake(rt Runtime, options ...TLSHandshakeOption) Func[*TCPConnection
return &Maybe[*TLSConnection]{
Error: err,
Observations: maybeTraceToObservations(trace),
Operation: netxlite.TLSHandshakeOperation,
State: state,
}
})
Expand Down

0 comments on commit e701ddb

Please sign in to comment.