-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(lib/runtime): Implement ext_offchain_http_request_start_version_1
host function
#1947
Changes from 25 commits
2899a29
8f7a581
4d6167f
faa5752
e08300e
70e6351
ad8c0bb
5606599
f2ca718
81c0fb9
6da5ad8
fb21f11
881fc51
7444590
8896ae5
ee372c4
78bfd0b
42ff50e
13d540a
60a1f11
4cd379f
f1130ad
5f980ab
c8913d2
fcb5da9
dd89472
8624e09
427c183
750caa9
1598499
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,103 @@ | ||||||||
package offchain | ||||||||
|
||||||||
import ( | ||||||||
"errors" | ||||||||
"net/http" | ||||||||
"sync" | ||||||||
) | ||||||||
|
||||||||
const maxConcurrentRequests = 1000 | ||||||||
|
||||||||
var ( | ||||||||
errIntBufferEmpty = errors.New("int buffer exhausted") | ||||||||
errIntBufferFull = errors.New("int buffer is full") | ||||||||
errRequestIDNotAvailable = errors.New("request id not available") | ||||||||
) | ||||||||
|
||||||||
// requestIDBuffer created to control the amount of available non-duplicated ids | ||||||||
type requestIDBuffer chan int16 | ||||||||
|
||||||||
// newIntBuffer creates the request id buffer starting from 1 till @buffSize (by default @buffSize is 1000) | ||||||||
func newIntBuffer(buffSize int16) *requestIDBuffer { | ||||||||
b := make(chan int16, buffSize) | ||||||||
for i := int16(1); i <= buffSize; i++ { | ||||||||
b <- i | ||||||||
} | ||||||||
|
||||||||
intb := requestIDBuffer(b) | ||||||||
return &intb | ||||||||
} | ||||||||
|
||||||||
func (b *requestIDBuffer) get() (int16, error) { | ||||||||
select { | ||||||||
case v := <-*b: | ||||||||
return v, nil | ||||||||
default: | ||||||||
return 0, errIntBufferEmpty | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
func (b *requestIDBuffer) put(i int16) error { | ||||||||
select { | ||||||||
case *b <- i: | ||||||||
return nil | ||||||||
default: | ||||||||
return errIntBufferFull | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
// HTTPSet holds a pool of concurrent http request calls | ||||||||
type HTTPSet struct { | ||||||||
mtx *sync.Mutex | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can embed this, so you don't need to call
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done! |
||||||||
reqs map[int16]*http.Request | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a chance this map will be accesses concurrently? I think potentially There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes! There is mutexes locks in |
||||||||
idBuff *requestIDBuffer | ||||||||
} | ||||||||
|
||||||||
// NewHTTPSet creates a offchain http set that can be used | ||||||||
// by runtime as HTTP clients, the max concurrent requests is 1000 | ||||||||
func NewHTTPSet() *HTTPSet { | ||||||||
return &HTTPSet{ | ||||||||
mtx: new(sync.Mutex), | ||||||||
reqs: make(map[int16]*http.Request), | ||||||||
idBuff: newIntBuffer(maxConcurrentRequests), | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
// StartRequest create a new request using the method and the uri, adds the request into the list | ||||||||
// and then return the position of the request inside the list | ||||||||
func (p *HTTPSet) StartRequest(method, uri string) (int16, error) { | ||||||||
p.mtx.Lock() | ||||||||
defer p.mtx.Unlock() | ||||||||
|
||||||||
id, err := p.idBuff.get() | ||||||||
if err != nil { | ||||||||
return 0, err | ||||||||
} | ||||||||
|
||||||||
if _, ok := p.reqs[id]; ok { | ||||||||
return 0, errRequestIDNotAvailable | ||||||||
} | ||||||||
|
||||||||
req, err := http.NewRequest(method, uri, nil) | ||||||||
if err != nil { | ||||||||
return 0, err | ||||||||
} | ||||||||
|
||||||||
p.reqs[id] = req | ||||||||
return id, nil | ||||||||
} | ||||||||
|
||||||||
// Remove just remove a expecific request from reqs | ||||||||
func (p *HTTPSet) Remove(id int16) error { | ||||||||
p.mtx.Lock() | ||||||||
defer p.mtx.Unlock() | ||||||||
|
||||||||
delete(p.reqs, id) | ||||||||
|
||||||||
return p.idBuff.put(id) | ||||||||
} | ||||||||
|
||||||||
// Get returns a request or nil if request not found | ||||||||
func (p *HTTPSet) Get(id int16) *http.Request { | ||||||||
return p.reqs[id] | ||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package offchain | ||
|
||
import ( | ||
"net/http" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
const defaultTestURI = "http://example.url" | ||
|
||
func TestHTTPSetLimit(t *testing.T) { | ||
t.Parallel() | ||
|
||
set := NewHTTPSet() | ||
var err error | ||
for i := 0; i < maxConcurrentRequests+1; i++ { | ||
_, err = set.StartRequest(http.MethodGet, defaultTestURI) | ||
} | ||
|
||
require.ErrorIs(t, errIntBufferEmpty, err) | ||
} | ||
|
||
func TestHTTPSet_StartRequest_NotAvailableID(t *testing.T) { | ||
t.Parallel() | ||
|
||
set := NewHTTPSet() | ||
set.reqs[0] = &http.Request{} | ||
|
||
_, err := set.StartRequest(http.MethodGet, defaultTestURI) | ||
require.ErrorIs(t, errRequestIDNotAvailable, err) | ||
} | ||
|
||
func TestHTTPSetGet(t *testing.T) { | ||
t.Parallel() | ||
|
||
set := NewHTTPSet() | ||
|
||
id, err := set.StartRequest(http.MethodGet, defaultTestURI) | ||
require.NoError(t, err) | ||
|
||
req := set.Get(id) | ||
require.NotNil(t, req) | ||
|
||
require.Equal(t, http.MethodGet, req.Method) | ||
require.Equal(t, defaultTestURI, req.URL.String()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to return a pointer? you can change the receiver methods below to be
(b requestIDBuffer)
, then you don't need to dereference it in the methodsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!