Skip to content

Commit b3cc9ce

Browse files
committed
Add support for client/server CONNECT proxy.
1 parent 2aae3d3 commit b3cc9ce

File tree

10 files changed

+418
-23
lines changed

10 files changed

+418
-23
lines changed

async-http.gemspec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ Gem::Specification.new do |spec|
1717
spec.require_paths = ["lib"]
1818

1919
spec.add_dependency("async", "~> 1.19")
20-
spec.add_dependency("async-io", "~> 1.24")
20+
spec.add_dependency("async-io", "~> 1.25")
2121

2222
spec.add_dependency("protocol-http", "~> 0.12.0")
23-
spec.add_dependency("protocol-http1", "~> 0.8.0")
23+
spec.add_dependency("protocol-http1", "~> 0.9.0")
2424
spec.add_dependency("protocol-http2", "~> 0.9.0")
2525

2626
# spec.add_dependency("openssl")

lib/async/http/body/pipe.rb

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# frozen_string_literal: true
2+
#
3+
# Copyright, 2019, by Samuel G. D. Williams. <http://www.codeotaku.com>
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in
13+
# all copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
# THE SOFTWARE.
22+
23+
require_relative 'writable'
24+
25+
require 'forwardable'
26+
27+
module Async
28+
module HTTP
29+
module Body
30+
class Pipe
31+
extend Forwardable
32+
33+
def initialize(input, output = Writable.new, task: Task.current)
34+
@input = input
35+
@output = output
36+
37+
head, tail = IO::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)
38+
39+
@head = Async::IO::Stream.new(head)
40+
@tail = tail
41+
42+
@reader = nil
43+
@writer = nil
44+
45+
task.async(&self.method(:reader))
46+
task.async(&self.method(:writer))
47+
end
48+
49+
def to_io
50+
@tail
51+
end
52+
53+
def close
54+
@reader&.stop
55+
@writer&.stop
56+
57+
@tail.close
58+
end
59+
60+
private
61+
62+
# Read from the @input stream and write to the head of the pipe.
63+
def reader(task)
64+
@reader = task
65+
66+
task.annotate "pipe reader"
67+
68+
while chunk = @input.read
69+
@head.write(chunk)
70+
@head.flush
71+
end
72+
73+
@head.close_write
74+
ensure
75+
@reader = nil
76+
@input.close($!)
77+
78+
@head.close if @writer.nil?
79+
end
80+
81+
# Read from the head of the pipe and write to the @output stream.
82+
# If the @tail is closed, this will cause chunk to be nil, which in turn will call `@output.close` and `@head.close`
83+
def writer(task)
84+
@writer = task
85+
86+
task.annotate "pipe writer"
87+
88+
while chunk = @head.read_partial
89+
@output.write(chunk)
90+
end
91+
ensure
92+
@writer = nil
93+
@output.close($!)
94+
95+
@head.close if @reader.nil?
96+
end
97+
end
98+
end
99+
end
100+
end

lib/async/http/body/stream.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,31 @@ def read(length = nil, buffer = nil)
7474
return buffer
7575
end
7676

77+
# Read at most `size` bytes from the stream. Will avoid reading from the underlying stream if possible.
78+
def read_partial(size = nil)
79+
if @buffer
80+
buffer = @buffer
81+
@buffer = nil
82+
else
83+
buffer = read_next
84+
end
85+
86+
if buffer and size
87+
if buffer.bytesize > size
88+
@buffer = buffer.byteslice(size, buffer.bytesize)
89+
buffer = buffer.byteslice(0, size)
90+
end
91+
end
92+
93+
return buffer
94+
end
95+
7796
def read_nonblock(length, buffer = nil)
7897
@buffer ||= read_next
7998
chunk = nil
8099

100+
return nil if @buffer.nil?
101+
81102
if @buffer.bytesize > length
82103
chunk = @buffer.byteslice(0, length)
83104
@buffer = @buffer.byteslice(length, @buffer.bytesize)

lib/async/http/endpoint.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ module Async
2929
module HTTP
3030
# Represents a way to connect to a remote HTTP server.
3131
class Endpoint < Async::IO::Endpoint
32-
def self.parse(string, **options)
32+
def self.parse(string, endpoint = nil, **options)
3333
url = URI.parse(string).normalize
3434

35-
return self.new(url, nil, **options)
35+
return self.new(url, endpoint, **options)
3636
end
3737

3838
# @option scheme [String] the scheme to use, overrides the URL scheme.
@@ -46,7 +46,8 @@ def initialize(url, endpoint = nil, **options)
4646
raise ArgumentError, "URL must be absolute (include scheme, host): #{url}" unless url.absolute?
4747

4848
@url = url
49-
@endpoint = endpoint
49+
50+
@endpoint = self.build_endpoint(endpoint)
5051
end
5152

5253
def to_url

lib/async/http/pool.rb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ def initialize(limit = nil, &block)
4343
@available = Async::Notification.new
4444

4545
@limit = limit
46-
@active = 0
4746

4847
@constructor = block
4948
end
5049

5150
# The number of allocated resources.
52-
attr :active
51+
def active
52+
@resources.count
53+
end
5354

5455
# Whether there are resources which are currently in use.
5556
def busy?
@@ -60,6 +61,11 @@ def busy?
6061
return false
6162
end
6263

64+
# Wait until a pool resource has been freed.
65+
def wait
66+
@available.wait
67+
end
68+
6369
# All allocated resources.
6470
attr :resources
6571

@@ -92,8 +98,6 @@ def release(resource)
9298
def close
9399
@resources.each_key(&:close)
94100
@resources.clear
95-
96-
@active = 0
97101
end
98102

99103
def to_s
@@ -121,8 +125,6 @@ def retire(resource)
121125

122126
@resources.delete(resource)
123127

124-
@active -= 1
125-
126128
resource.close
127129

128130
@available.signal
@@ -163,11 +165,9 @@ def available_resource
163165
end
164166
end
165167

166-
if !@limit or @active < @limit
168+
if !@limit or self.active < @limit
167169
Async.logger.debug(self) {"No resources resources, allocating new one..."}
168170

169-
@active += 1
170-
171171
return create
172172
end
173173

lib/async/http/protocol/http1/client.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ def call(request, task: Task.current)
4848
# If this fails, this connection will be closed.
4949
write_upgrade_body(protocol, body)
5050
end
51+
elsif request.connect?
52+
task.async do |subtask|
53+
subtask.annotate("Tunnelling body.")
54+
55+
write_tunnel_body(@version, body)
56+
end
5157
else
5258
task.async do |subtask|
5359
subtask.annotate("Streaming body.")

lib/async/http/protocol/http1/server.rb

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def next_request
3737
# Read an incoming request:
3838
return unless request = Request.read(self)
3939

40-
unless persistent?(request.version, request.headers)
40+
unless persistent?(request.version, request.method, request.headers)
4141
@persistent = false
4242
end
4343

@@ -58,14 +58,6 @@ def each(task: Task.current)
5858
return if @stream.nil? or @stream.closed?
5959

6060
if response
61-
# Try to avoid holding on to request, to minimse GC overhead:
62-
head = request.head?
63-
64-
unless request.body?
65-
# If there is no body, #finish is a no-op.
66-
request = nil
67-
end
68-
6961
write_response(@version, response.status, response.headers)
7062

7163
body = response.body
@@ -79,8 +71,21 @@ def each(task: Task.current)
7971
# We also don't want to hold on to the response object:
8072
response = nil
8173

74+
body.call(stream)
75+
elsif body and request.connect?
76+
stream = write_tunnel_body(request.version)
77+
78+
# Same as above:
79+
request = nil
80+
response = nil
81+
8282
body.call(stream)
8383
else
84+
head = request.head?
85+
86+
request = nil unless body
87+
response = nil
88+
8489
write_body(@version, body, head)
8590
end
8691
else

lib/async/http/proxy.rb

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright, 2019, by Samuel G. D. Williams. <http://www.codeotaku.com>
2+
#
3+
# Permission is hereby granted, free of charge, to any person obtaining a copy
4+
# of this software and associated documentation files (the "Software"), to deal
5+
# in the Software without restriction, including without limitation the rights
6+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
# copies of the Software, and to permit persons to whom the Software is
8+
# furnished to do so, subject to the following conditions:
9+
#
10+
# The above copyright notice and this permission notice shall be included in
11+
# all copies or substantial portions of the Software.
12+
#
13+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
# THE SOFTWARE.
20+
21+
require_relative 'client'
22+
require_relative 'endpoint'
23+
24+
require_relative 'body/pipe'
25+
26+
module Async
27+
module HTTP
28+
class Proxy
29+
def self.tcp(client, host, port, headers = [])
30+
self.new(client, "#{host}:#{port}", headers)
31+
end
32+
33+
def initialize(client, address, headers = [])
34+
@client = client
35+
@address = address
36+
@headers = headers
37+
end
38+
39+
attr :client
40+
41+
def close
42+
while @client.pool.busy?
43+
@client.pool.wait
44+
end
45+
46+
@client.close
47+
end
48+
49+
def connect(&block)
50+
input = Body::Writable.new
51+
52+
response = @client.connect(@address.to_s, @headers, input)
53+
54+
pipe = Body::Pipe.new(response.body, input)
55+
56+
return pipe.to_io unless block_given?
57+
58+
begin
59+
yield pipe.to_io
60+
ensure
61+
pipe.close
62+
end
63+
end
64+
65+
def endpoint(url, **options)
66+
Endpoint.parse(url, self, **options)
67+
end
68+
end
69+
end
70+
end

0 commit comments

Comments
 (0)