Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

httpclientx: implement fast recovery and return endpoint index #1586

Merged
merged 82 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 81 commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
75ef7fd
refactor: consolidate httpx and httpapi
bassosimone Apr 22, 2024
f9210ec
refactor to make testing the whole package easier
bassosimone Apr 23, 2024
587290c
Merge branch 'master' into issue/2700
bassosimone Apr 23, 2024
af394c2
Merge branch 'master' into issue/2700
bassosimone Apr 23, 2024
c6f2f5a
Merge branch 'master' into issue/2700
bassosimone Apr 23, 2024
68c9779
Merge branch 'issue/2700' of github.com:ooni/probe-cli into issue/2700
bassosimone Apr 23, 2024
57e29da
Merge branch 'master' into issue/2700
bassosimone Apr 23, 2024
5c953f0
x
bassosimone Apr 23, 2024
e03e810
x
bassosimone Apr 23, 2024
a6046fd
x
bassosimone Apr 23, 2024
341fcf2
x
bassosimone Apr 23, 2024
8c34524
x
bassosimone Apr 23, 2024
4b464ff
try to entirely remove httpx usages
bassosimone Apr 23, 2024
6d57184
fix: make sure there is nil safety
bassosimone Apr 23, 2024
9c2a226
oxford comma: yes/no?
bassosimone Apr 23, 2024
1123b4e
x
bassosimone Apr 23, 2024
d421d24
fix: unit test needs to be adapted
bassosimone Apr 24, 2024
67e0a10
chore: improve testing for cloudflare IP lookup
bassosimone Apr 24, 2024
a69d981
chore: improve the ubuntu IP lookup tests
bassosimone Apr 24, 2024
cd25c56
Merge branch 'master' into issue/2700
bassosimone Apr 24, 2024
642ae5c
x
bassosimone Apr 24, 2024
548e6bc
doc: document oonirun/v2_test.go tests
bassosimone Apr 24, 2024
40db0e5
Merge branch 'master' into issue/2700
bassosimone Apr 24, 2024
4cf3566
start improving probeservices tests
bassosimone Apr 24, 2024
e736e42
Merge branch 'master' into issue/2700
bassosimone Apr 26, 2024
e8471c4
x
bassosimone Apr 26, 2024
aa1c836
Merge branch 'master' into issue/2700
bassosimone Apr 26, 2024
08e81a9
x
bassosimone Apr 26, 2024
fa74b48
x
bassosimone Apr 26, 2024
a7e748f
x
bassosimone Apr 26, 2024
87146cc
x
bassosimone Apr 26, 2024
dac7b8f
x
bassosimone Apr 26, 2024
04b0071
Merge branch 'master' into issue/2700
bassosimone Apr 26, 2024
79d1fee
Merge branch 'master' into issue/2700
bassosimone Apr 29, 2024
88b399d
Merge branch 'master' into issue/2700
bassosimone Apr 29, 2024
de23e7d
x
bassosimone Apr 29, 2024
9d87673
Merge branch 'master' into issue/2700
bassosimone Apr 29, 2024
a436f1e
x
bassosimone Apr 29, 2024
08f8ca9
Merge branch 'master' into issue/2700
bassosimone Apr 29, 2024
25140f3
x
bassosimone Apr 29, 2024
1bbe0b7
chore: write tests for oonicollector.go
bassosimone Apr 30, 2024
6707d61
Merge branch 'master' into issue/2700
bassosimone Apr 30, 2024
4ddd507
Merge branch 'master' into issue/2700
bassosimone Apr 30, 2024
c453ee2
x
bassosimone Apr 30, 2024
ad3d84f
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
28d64f1
feat(probeservices): use httpclientx for check-in
bassosimone May 2, 2024
2107750
cleanup: remove check-in from ooapi
bassosimone May 2, 2024
c2c8ebf
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
36610a8
feat: start moving TH call into engine/session.go
bassosimone May 2, 2024
b7ccf2f
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
b94a8b8
x
bassosimone May 2, 2024
5f1994c
x
bassosimone May 2, 2024
6e16369
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
8400bde
x
bassosimone May 2, 2024
117fcc2
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
17f9b83
x
bassosimone May 2, 2024
8ca93f0
x
bassosimone May 2, 2024
854da9a
Merge branch 'master' into issue/2700
bassosimone May 2, 2024
f78e32d
x
bassosimone May 2, 2024
5a32450
fix(httpclientx): fast fallback on immediate failure
bassosimone May 2, 2024
eb47e28
x
bassosimone May 2, 2024
dbb7d6e
x
bassosimone May 2, 2024
c63e68d
x
bassosimone May 2, 2024
1c55710
try to write algorithm without deadlocks
bassosimone May 2, 2024
b461aa4
x
bassosimone May 2, 2024
421f179
x
bassosimone May 2, 2024
2a44cd5
x
bassosimone May 2, 2024
13777ba
x
bassosimone May 2, 2024
a4fb49e
x
bassosimone May 2, 2024
cdd0e72
x
bassosimone May 3, 2024
0594703
x
bassosimone May 3, 2024
0b693ce
x
bassosimone May 3, 2024
bf35e09
x
bassosimone May 3, 2024
9c94348
x
bassosimone May 3, 2024
8de9e6b
x
bassosimone May 3, 2024
857ff1f
x
bassosimone May 3, 2024
08691b3
x
bassosimone May 3, 2024
b5ef086
x
bassosimone May 3, 2024
9c87724
x
bassosimone May 3, 2024
c857375
x
bassosimone May 3, 2024
9051155
x
bassosimone May 3, 2024
43c0e8b
Update internal/httpclientx/overlapped.go
bassosimone May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/httpclientx/getjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// This function either returns an error or a valid Output.
func GetJSON[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) {
return NewOverlappedGetJSON[Output](config).Run(ctx, epnt)
return OverlappedIgnoreIndex(NewOverlappedGetJSON[Output](config).Run(ctx, epnt))
}

func getJSON[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/httpclientx/getraw.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// This function either returns an error or a valid Output.
func GetRaw(ctx context.Context, epnt *Endpoint, config *Config) ([]byte, error) {
return NewOverlappedGetRaw(config).Run(ctx, epnt)
return OverlappedIgnoreIndex(NewOverlappedGetRaw(config).Run(ctx, epnt))
}

func getRaw(ctx context.Context, epnt *Endpoint, config *Config) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/httpclientx/getxml.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// This function either returns an error or a valid Output.
func GetXML[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) {
return NewOverlappedGetXML[Output](config).Run(ctx, epnt)
return OverlappedIgnoreIndex(NewOverlappedGetXML[Output](config).Run(ctx, epnt))
}

func getXML[Output any](ctx context.Context, epnt *Endpoint, config *Config) (Output, error) {
Expand Down
186 changes: 133 additions & 53 deletions internal/httpclientx/overlapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
"context"
"errors"
"time"

"github.com/ooni/probe-cli/v3/internal/erroror"
)

// OverlappedDefaultScheduleInterval is the default schedule interval. After this interval
// has elapsed for a URL without seeing a success, we will schedule the next URL.
const OverlappedDefaultScheduleInterval = 15 * time.Second

// OverlappedDefaultWatchdogTimeout is the timeout after which we assume all the API calls
// have gone rogue and forcibly interrupt all of them.
const OverlappedDefaultWatchdogTimeout = 5 * time.Minute

// Overlapped represents the possibility of overlapping HTTP calls for a set of
// functionally equivalent URLs, such that we start a new call if the previous one
// has failed to produce a result within the configured ScheduleInterval.
Expand All @@ -24,7 +26,7 @@ const OverlappedDefaultScheduleInterval = 15 * time.Second
//
// Under very bad networking conditions, [*Overlapped] would cause a new network
// call to start while the previous one is still in progress and very slowly downloading
// a response. A future implementation SHOULD probably account for this possibility.
// a response. A future implementation MIGHT want to account for this possibility.
type Overlapped[Output any] struct {
// RunFunc is the MANDATORY function that fetches the given [*Endpoint].
//
Expand All @@ -42,12 +44,22 @@ type Overlapped[Output any] struct {
//
// If you set it manually, you MUST modify it before calling [*Overlapped.Run].
ScheduleInterval time.Duration

// WatchdogTimeout is the MANDATORY timeout after which the code assumes
// that all API calls must be aborted and give up.
//
// This field is typically initialized by [NewOverlappedGetJSON], [NewOverlappedGetRaw],
// [NewOverlappedGetXML], or [NewOverlappedPostJSON] to be [OverlappedDefaultWatchdogTimeout].
//
// If you set it manually, you MUST modify it before calling [*Overlapped.Run].
WatchdogTimeout time.Duration
}

func newOverlappedWithFunc[Output any](fx func(context.Context, *Endpoint) (Output, error)) *Overlapped[Output] {
return &Overlapped[Output]{
RunFunc: fx,
ScheduleInterval: OverlappedDefaultScheduleInterval,
WatchdogTimeout: OverlappedDefaultWatchdogTimeout,
}
}

Expand Down Expand Up @@ -83,81 +95,149 @@ func NewOverlappedPostJSON[Input, Output any](input Input, config *Config) *Over
var ErrGenericOverlappedFailure = errors.New("overlapped: generic failure")

// Run runs the overlapped operations, returning the result of the first operation
// that succeeds and otherwise returning an error describing what happened.
// that succeeds and its endpoint index, or the error that occurred.
func (ovx *Overlapped[Output]) Run(ctx context.Context, epnts ...*Endpoint) (Output, int, error) {
return OverlappedReduce[Output](ovx.Map(ctx, epnts...))
}

// OverlappedErrorOr combines error information, result information and the endpoint index.
type OverlappedErrorOr[Output any] struct {
// Err is the error or nil.
Err error

// Index is the endpoint index.
Index int

// Value is the result.
Value Output
}

// Map applies the [*Overlapped.RunFunc] function to each epnts entry, thus producing
// a result for each entry. This function will cancel subsequent operations until there
// is a success: subsequent results will be [context.Canceled] errors.
//
// # Limitations
// Note that you SHOULD use [*Overlapped.Run] unless you want to observe the result
// of each operation, which is mostly useful when running unit tests.
//
// This implementation creates a new goroutine for each provided URL under the assumption that
// the overall number of URLs is small. A future revision would address this issue.
func (ovx *Overlapped[Output]) Run(ctx context.Context, epnts ...*Endpoint) (Output, error) {
// create cancellable context for early cancellation
ctx, cancel := context.WithCancel(ctx)
// Note that this function will return a zero length slice if epnts lenth is also zero.
func (ovx *Overlapped[Output]) Map(ctx context.Context, epnts ...*Endpoint) []*OverlappedErrorOr[Output] {
// create cancellable context for early cancellation and also apply the
// watchdog timeout so that eventually this code returns.
//
// we are going to cancel this context as soon as we have a successful response so
// that we do not waste network resources by performing other attempts.
ctx, cancel := context.WithTimeout(ctx, ovx.WatchdogTimeout)
defer cancel()

// construct channel for collecting the results
output := make(chan *erroror.Value[Output])
//
// we're using this channel to communicate results back from goroutines running
// in the background and performing the real API call
output := make(chan *OverlappedErrorOr[Output])

// schedule a measuring goroutine per URL.
for idx := 0; idx < len(epnts); idx++ {
go ovx.transact(ctx, idx, epnts[idx], output)
}
// create ticker for scheduling subsequent attempts
//
// the ticker is going to tick at every schedule interval to start another
// attempt, if the previous attempt has not produced a result in time
ticker := time.NewTicker(ovx.ScheduleInterval)
defer ticker.Stop()

// we expect to see exactly a response for each goroutine
var (
firstOutput *Output
errorv []error
)
for idx := 0; idx < len(epnts); idx++ {
// get a result from one of the goroutines
result := <-output

// handle the error case
if result.Err != nil {
errorv = append(errorv, result.Err)
continue
}
// create index for the next endpoint to try
idx := 0

// possibly record the first success
if firstOutput == nil {
firstOutput = &result.Value
// create vector for collecting results
//
// for simplicity, we're going to collect results from every goroutine
// including the ones cancelled by context after the previous success and
// then we're going to filter the results and produce a final result
results := []*OverlappedErrorOr[Output]{}

// keep looping until we have results for each endpoints
for len(results) < len(epnts) {

// if there are more endpoints to try, spawn a goroutine to try,
// and, otherwise, we can safely stop ticking
if idx < len(epnts) {
go ovx.transact(ctx, idx, epnts[idx], output)
idx++
} else {
ticker.Stop()
}

// make sure we interrupt all the other goroutines
cancel()
select {
// this event means that a child goroutine completed
// so we store the result; on success interrupt all the
// background goroutines and stop ticking
//
// note that we MUST continue reading until we have
// exactly `len(epnts)` results because the inner
// goroutine performs blocking writes on the channel
case res := <-output:
results = append(results, res)
if res.Err == nil {
ticker.Stop()
cancel()
}

// this means the ticker ticked, so we should loop again and
// attempt another endpoint because it's time to do that
case <-ticker.C:
}
}

// handle the case of success
if firstOutput != nil {
return *firstOutput, nil
// just send the results vector back to the caller
return results
}

// OverlappedReduce takes the results of [*Overlapped.Map] and returns either an Output or an error.
//
// Note that you SHOULD use [*Overlapped.Run] unless you want to observe the result
// of each operation, which is mostly useful when running unit tests.
//
// The return value is (output, index, nil) on success and (zero, zero, error) on failure.
func OverlappedReduce[Output any](results []*OverlappedErrorOr[Output]) (Output, int, error) {
// postprocess the results to check for success and
// aggregate all the errors that occurred
errorv := []error{}
for _, res := range results {
if res.Err == nil {
return res.Value, res.Index, nil
}
errorv = append(errorv, res.Err)
}

// handle the case where there's no error
//
// this happens if the user provided no endpoints to measure
if len(errorv) <= 0 {
errorv = append(errorv, ErrGenericOverlappedFailure)
}

// return zero value and errors list
return *new(Output), errors.Join(errorv...)
//
// note thay errors.Join returns nil if all the errors are nil or the
bassosimone marked this conversation as resolved.
Show resolved Hide resolved
// list is nil, which is why we handle the corner case above
return *new(Output), 0, errors.Join(errorv...)
}

// transact performs an HTTP transaction with the given URL and writes results to the output channel.
func (ovx *Overlapped[Output]) transact(ctx context.Context, idx int, epnt *Endpoint, output chan<- *erroror.Value[Output]) {
// wait for our time to start
//
// add one nanosecond to make sure the delay is always positive
timer := time.NewTimer(time.Duration(idx)*ovx.ScheduleInterval + time.Nanosecond)
defer timer.Stop()
select {
case <-ctx.Done():
output <- &erroror.Value[Output]{Err: ctx.Err()}
return
case <-timer.C:
// fallthrough
}

func (ovx *Overlapped[Output]) transact(
ctx context.Context, idx int, epnt *Endpoint, output chan<- *OverlappedErrorOr[Output]) {
// obtain the results
value, err := ovx.RunFunc(ctx, epnt)

// emit the results
output <- &erroror.Value[Output]{Err: err, Value: value}
//
// note that this unconditional channel write REQUIRES that we keep reading from
// the results channel in Run until we have a result per input endpoint
output <- &OverlappedErrorOr[Output]{
Err: err,
Index: idx,
Value: value,
}
}

// OverlappedIgnoreIndex is a filter that removes the index from [*Overlapped.Run] results.
func OverlappedIgnoreIndex[Output any](value Output, _ int, err error) (Output, error) {
return value, err
}
Loading
Loading