-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathclient_sendrecvfirst_parallel.go
99 lines (84 loc) · 2.68 KB
/
client_sendrecvfirst_parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Copyright 2021-2024 Nokia
// Licensed under the BSD 3-Clause License.
// SPDX-License-Identifier: BSD-3-Clause
package restful
import (
"context"
"errors"
"net"
"net/http"
"net/url"
"sync"
)
func (c *Client) target2URLs(target string) ([]string, error) {
if len(target) == 0 || target[0] == '/' {
target = c.rootURL + target
}
commonURL, err := url.Parse(target)
if err != nil {
return nil, err
}
ips, err := net.LookupIP(commonURL.Hostname())
if err != nil {
return nil, err
}
targets := make([]string, len(ips))
for i, ip := range ips {
// replace the host in the target URI, keep original port if given.
targetURL := commonURL
if commonURL.Port() == "" {
targetURL.Host = ip.String()
} else {
targetURL.Host = ip.String() + ":" + commonURL.Port()
}
targets[i] = targetURL.String()
}
return targets, nil
}
// SendRecvListFirst2xxParallel acts similarly to SendRecv2xx, but broadcasts the request to all targets defined.
// The first positive (2xx) response is processed, the rest are cancelled.
// If all the responses are negative, then error is returned.
func (c *Client) SendRecvListFirst2xxParallel(ctx context.Context, method string, targets []string, headers http.Header, reqData, respData any) (*http.Response, error) {
body, err := c.makeBodyBytes(reqData)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(len(targets))
respChan := make(chan *http.Response)
waitChan := make(chan any)
for i := range targets {
go func(target string, respChan chan *http.Response) {
defer wg.Done()
resp, err := c.sendRequestBytes(ctx, method, target, headers, &body, false)
if err != nil || resp.StatusCode >= 300 { // Errors are silently omitted
return
}
respChan <- resp
}(targets[i], respChan)
}
go func(chan any) {
wg.Wait()
waitChan <- nil
}(waitChan)
select {
case resp := <-respChan:
return resp, GetResponseData(resp, c.maxBytesToParse, respData)
case <-waitChan:
return nil, errors.New("no positive response")
case <-ctx.Done():
return nil, ctx.Err()
}
}
// SendRecvResolveFirst2xxParallel acts similarly to SendRecv2xx, but broadcasts the request to all resolved servers of the target.
// The first positive (2xx) response is processed, the rest are cancelled.
// If all the responses are negative, then error is returned.
func (c *Client) SendRecvResolveFirst2xxParallel(ctx context.Context, method string, target string, headers http.Header, reqData, respData any) (*http.Response, error) {
targets, err := c.target2URLs(target)
if err != nil {
return nil, err
}
return c.SendRecvListFirst2xxParallel(ctx, method, targets, headers, reqData, respData)
}