Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve BufferedIO performance #14

Merged
merged 1 commit into from
Aug 31, 2022
Merged

Commits on Aug 20, 2022

  1. Improve BufferedIO performance

    `BufferedIO` is a bit inefficient for reading large responses because
    it use the classic `buffer.slice!` technique which cause a lot of
    unnecessary string copying.
    
    This is particularly visible on line based protocol when reading
    line by line.
    
    Instead of repeatedly shifting the string, we can keep track of
    which offset we're at, to know how many bytes are left in the buffer.
    
    This change also open the door to further optimization by increasing
    the buffer size, as previously `slice!` would get slower the larger
    the buffer is.
    
    Benchmark results:
    
    ```
    === 1k ===
    Warming up --------------------------------------
                      1k     1.234k i/100ms
                  1k opt     1.283k i/100ms
    Calculating -------------------------------------
                      1k     12.615k (± 0.9%) i/s -     64.168k in   5.086995s
                  1k opt     12.856k (± 0.9%) i/s -     65.433k in   5.090051s
    
    Comparison:
                      1k:    12615.2 i/s
                  1k opt:    12856.0 i/s - 1.02x  (± 0.00) faster
    
    === 10k ===
    Warming up --------------------------------------
                     10k     1.165k i/100ms
                 10k opt     1.269k i/100ms
    Calculating -------------------------------------
                     10k     11.550k (± 2.4%) i/s -     58.250k in   5.046378s
                 10k opt     12.736k (± 1.0%) i/s -     64.719k in   5.081969s
    
    Comparison:
                     10k:    11550.3 i/s
                 10k opt:    12736.3 i/s - 1.10x  (± 0.00) faster
    
    === 100k ===
    Warming up --------------------------------------
                    100k   809.000  i/100ms
                100k opt   926.000  i/100ms
    Calculating -------------------------------------
                    100k      8.054k (± 3.0%) i/s -     40.450k in   5.028299s
                100k opt      9.286k (± 2.2%) i/s -     47.226k in   5.088841s
    
    Comparison:
                    100k:     8053.6 i/s
                100k opt:     9285.5 i/s - 1.15x  (± 0.00) faster
    
    === 1M ===
    Warming up --------------------------------------
                      1M   249.000  i/100ms
                  1M opt   315.000  i/100ms
    Calculating -------------------------------------
                      1M      2.448k (± 2.5%) i/s -     12.450k in   5.089744s
                  1M opt      3.119k (± 2.6%) i/s -     15.750k in   5.053772s
    
    Comparison:
                      1M:     2447.8 i/s
                  1M opt:     3118.8 i/s - 1.27x  (± 0.00) faster
    ```
    
    Profiling before (1MB responses):
    
    ```
    ==================================
      Mode: wall(1000)
      Samples: 5276 (0.00% miss rate)
      GC: 394 (7.47%)
    ==================================
         TOTAL    (pct)     SAMPLES    (pct)     FRAME
          1622  (30.7%)        1622  (30.7%)     IO#wait_readable
           777  (14.7%)         777  (14.7%)     IO#read_nonblock
           365   (6.9%)         365   (6.9%)     (sweeping)
          2705  (51.3%)         364   (6.9%)     Net::BufferedIO#rbuf_fill
           264   (5.0%)         264   (5.0%)     String#index
           223   (4.2%)         223   (4.2%)     String#sub
           221   (4.2%)         221   (4.2%)     String#slice!
           185   (3.5%)         185   (3.5%)     String#split
           108   (2.0%)         108   (2.0%)     IO#write_nonblock
           101   (1.9%)         101   (1.9%)     String#downcase
            66   (1.3%)          66   (1.3%)     Net::BufferedIO#LOG
            57   (1.1%)          57   (1.1%)     String#count
            51   (1.0%)          51   (1.0%)     String#to_s
           391   (7.4%)          50   (0.9%)     Net::HTTPGenericRequest#write_header
            50   (0.9%)          50   (0.9%)     String#capitalize
            49   (0.9%)          49   (0.9%)     Array#join
            47   (0.9%)          47   (0.9%)     String#b
           106   (2.0%)          36   (0.7%)     Net::HTTPHeader#set_field
            34   (0.6%)          34   (0.6%)     Module#===
            33   (0.6%)          33   (0.6%)     String#[]
           140   (2.7%)          29   (0.5%)     Net::BufferedIO#write0
            29   (0.5%)          29   (0.5%)     (marking)
           281   (5.3%)          27   (0.5%)     Net::BufferedIO#rbuf_consume
          1195  (22.6%)          25   (0.5%)     Net::HTTPResponse#read_body
          1024  (19.4%)          25   (0.5%)     Net::HTTPResponse.each_response_header
            86   (1.6%)          24   (0.5%)     Net::HTTPHeader#set_field
            23   (0.4%)          23   (0.4%)     Net::HTTP#proxy_uri
            51   (1.0%)          23   (0.4%)     Net::HTTPHeader#initialize_http_header
          2225  (42.2%)          22   (0.4%)     Net::BufferedIO#readuntil
            20   (0.4%)          20   (0.4%)     Regexp#===
    ```
    
    Profiling after (1MB responses):
    
    ```
    ==================================
      Mode: wall(1000)
      Samples: 15180 (0.00% miss rate)
      GC: 1688 (11.12%)
    ==================================
         TOTAL    (pct)     SAMPLES    (pct)     FRAME
          4534  (29.9%)        4534  (29.9%)     IO#read_nonblock
         10650  (70.2%)        3944  (26.0%)     Net::HTTPOpt::BufferedIOOpt#rbuf_fill
          2101  (13.8%)        2101  (13.8%)     IO#wait_readable
          1442   (9.5%)        1442   (9.5%)     (sweeping)
           360   (2.4%)         360   (2.4%)     String#sub
           312   (2.1%)         312   (2.1%)     String#split
           265   (1.7%)         265   (1.7%)     String#bytesize
           246   (1.6%)         246   (1.6%)     (marking)
           151   (1.0%)         151   (1.0%)     IO#write_nonblock
           125   (0.8%)         125   (0.8%)     String#downcase
           116   (0.8%)         116   (0.8%)     String#index
           113   (0.7%)         113   (0.7%)     Module#===
           162   (1.1%)          89   (0.6%)     Net::HTTPOpt::BufferedIOOpt#rbuf_consume_all_shareable!
           158   (1.0%)          65   (0.4%)     Net::HTTPHeader#set_field
            63   (0.4%)          63   (0.4%)     String#capitalize
            63   (0.4%)          63   (0.4%)     BasicObject#equal?
            58   (0.4%)          58   (0.4%)     Regexp#match
            58   (0.4%)          58   (0.4%)     String#[]
           449   (3.0%)          56   (0.4%)     Net::HTTPGenericRequest#write_header
            53   (0.3%)          53   (0.3%)     String#to_s
            52   (0.3%)          52   (0.3%)     Net::HTTPOpt::BufferedIOOpt#LOG
            52   (0.3%)          52   (0.3%)     String#count
            44   (0.3%)          44   (0.3%)     String#byteslice
            44   (0.3%)          44   (0.3%)     Array#join
          1096   (7.2%)          42   (0.3%)     Net::HTTPResponse.each_response_header
          2617  (17.2%)          40   (0.3%)     Net::HTTPOpt::BufferedIOOpt#readuntil
           132   (0.9%)          30   (0.2%)     Net::HTTPOpt::BufferedIOOpt#rbuf_consume
            28   (0.2%)          28   (0.2%)     Regexp#===
            27   (0.2%)          27   (0.2%)     Net::HTTP#proxy_uri
          8862  (58.4%)          27   (0.2%)     Net::HTTPResponse#read_body
    ````
    
    Benchmark code:
    
    ```ruby
    
    require "fileutils"
    DIR = "/tmp/www"
    FileUtils.mkdir_p(DIR)
    HOST = "127.0.0.1"
    PORT = 8080
    CONF = <<~EOS
    daemon            off;
    worker_processes  2;
    
    events {
        worker_connections  128;
    }
    
    http {
        server_tokens off;
        charset       utf-8;
    
        server {
            server_name   localhost;
            listen        #{HOST}:#{PORT};
    
            keepalive_requests 10000000;
            keepalive_timeout 3600s;
    
            error_page    500 502 503 504  /50x.html;
    
            location      / {
                root      #{DIR};
            }
    
        }
    
    }
    EOS
    
    File.write(File.join(DIR, "1k.txt"), 'a' * 1024)
    File.write(File.join(DIR, "10k.txt"), 'a' * 1024 * 10)
    File.write(File.join(DIR, "100k.txt"), 'a' * 1024 * 100)
    File.write(File.join(DIR, "1M.txt"), 'a' * 1024 * 1024)
    
    File.write(File.join(DIR, "nginx.conf"), CONF)
    
    require "benchmark/ips"
    require "net/http"
    
    nginx_pid = Process.spawn('nginx', '-c', File.join(DIR, "nginx.conf"))
    
    module Net
      class HTTPOpt < HTTP
    
        class BufferedIOOpt < ::Net::BufferedIO  #:nodoc: internal use only
          def initialize(io, read_timeout: 60, write_timeout: 60, continue_timeout: nil, debug_output: nil)
            @io = io
            @read_timeout = read_timeout
            @write_timeout = write_timeout
            @continue_timeout = continue_timeout
            @debug_output = debug_output
            @rbuf = ''.b
            @rbuf_offset = 0
          end
    
          attr_reader :io
          attr_accessor :read_timeout
          attr_accessor :write_timeout
          attr_accessor :continue_timeout
          attr_accessor :debug_output
    
          def inspect
            "#<#{self.class} io=#{@io}>"
          end
    
          def eof?
            @io.eof?
          end
    
          def closed?
            @io.closed?
          end
    
          def close
            @io.close
          end
    
          #
          # Read
          #
    
          public
    
          def read(len, dest = ''.b, ignore_eof = false)
            LOG "reading #{len} bytes..."
            read_bytes = 0
            begin
              while read_bytes + rbuf_size < len
                if s = rbuf_consume_all_shareable!
                  read_bytes += s.bytesize
                  dest << s
                end
                rbuf_fill
              end
              s = rbuf_consume(len - read_bytes)
              read_bytes += s.bytesize
              dest << s
            rescue EOFError
              raise unless ignore_eof
            end
            LOG "read #{read_bytes} bytes"
            dest
          end
    
          def read_all(dest = ''.b)
            LOG 'reading all...'
            read_bytes = 0
            begin
              while true
                if s = rbuf_consume_all_shareable!
                  read_bytes += s.bytesize
                  dest << s
                end
                rbuf_fill
              end
            rescue EOFError
              ;
            end
            LOG "read #{read_bytes} bytes"
            dest
          end
    
          def readuntil(terminator, ignore_eof = false)
            offset = @rbuf_offset
            begin
              until idx = @rbuf.index(terminator, offset)
                offset = @rbuf.bytesize
                rbuf_fill
              end
              return rbuf_consume(idx + terminator.bytesize - @rbuf_offset)
            rescue EOFError
              raise unless ignore_eof
              return rbuf_consume
            end
          end
    
          def readline
            readuntil("\n").chop
          end
    
          private
    
          BUFSIZE = 1024 * 16
    
          def rbuf_fill
            tmp = @rbuf_empty ? @rbuf : nil
            case rv = @io.read_nonblock(BUFSIZE, tmp, exception: false)
            when String
              @rbuf_empty = false
              if rv.equal?(tmp)
                @rbuf_offset = 0
              else
                @rbuf << rv
                rv.clear
              end
              return
            when :wait_readable
              (io = @io.to_io).wait_readable(@read_timeout) or raise Net::ReadTimeout.new(io)
              # continue looping
            when :wait_writable
              # OpenSSL::Buffering#read_nonblock may fail with IO::WaitWritable.
              # http://www.openssl.org/support/faq.html#PROG10
              (io = @io.to_io).wait_writable(@read_timeout) or raise Net::ReadTimeout.new(io)
              # continue looping
            when nil
              raise EOFError, 'end of file reached'
            end while true
          end
    
          def rbuf_flush
            if @rbuf_empty
              @rbuf.clear
              @rbuf_offset = 0
            end
            nil
          end
    
          def rbuf_size
            @rbuf.bytesize - @rbuf_offset
          end
    
          # Warning: this method may share the buffer to avoid
          # copying. The caller must no longer use the returned
          # string once rbuf_fill has been called again
          def rbuf_consume_all_shareable!
            @rbuf_empty = true
            buf = if @rbuf_offset == 0
              @rbuf
            else
              @rbuf.byteslice(@rbuf_offset..-1)
            end
            @rbuf_offset = @rbuf.bytesize
            buf
          end
    
          def rbuf_consume(len = nil)
            if @rbuf_offset == 0 && (len.nil? || len == @rbuf.bytesize)
              s = @rbuf
              @rbuf = ''.b
              @rbuf_offset = 0
              @rbuf_empty = true
            elsif len.nil?
              s = @rbuf.byteslice(@rbuf_offset..-1)
              @rbuf = ''.b
              @rbuf_offset = 0
              @rbuf_empty = true
            else
              s = @rbuf.byteslice(@rbuf_offset, len)
              @rbuf_offset += len
              @rbuf_empty = @rbuf_offset == @rbuf.bytesize
              rbuf_flush
            end
    
            @debug_output << %Q[-> #{s.dump}\n] if @debug_output
            s
          end
    
          #
          # Write
          #
    
          public
    
          def write(*strs)
            writing {
              write0(*strs)
            }
          end
    
          alias << write
    
          def writeline(str)
            writing {
              write0 str + "\r\n"
            }
          end
    
          private
    
          def writing
            @written_bytes = 0
            @debug_output << '<- ' if @debug_output
            yield
            @debug_output << "\n" if @debug_output
            bytes = @written_bytes
            @written_bytes = nil
            bytes
          end
    
          def write0(*strs)
            @debug_output << strs.map(&:dump).join if @debug_output
            orig_written_bytes = @written_bytes
            strs.each_with_index do |str, i|
              need_retry = true
              case len = @io.write_nonblock(str, exception: false)
              when Integer
                @written_bytes += len
                len -= str.bytesize
                if len == 0
                  if strs.size == i+1
                    return @written_bytes - orig_written_bytes
                  else
                    need_retry = false
                    # next string
                  end
                elsif len < 0
                  str = str.byteslice(len, -len)
                else # len > 0
                  need_retry = false
                  # next string
                end
                # continue looping
              when :wait_writable
                (io = @io.to_io).wait_writable(@write_timeout) or raise Net::WriteTimeout.new(io)
                # continue looping
              end while need_retry
            end
          end
    
          #
          # Logging
          #
    
          private
    
          def LOG_off
            @save_debug_out = @debug_output
            @debug_output = nil
          end
    
          def LOG_on
            @debug_output = @save_debug_out
          end
    
          def LOG(msg)
            return unless @debug_output
            @debug_output << msg + "\n"
          end
        end
        BufferedIO = BufferedIOOpt
    
        # Unchanged from ruby 3.1.1, only allow to lookup the mofidied BufferedIO
        def connect
          if use_ssl?
            # reference early to load OpenSSL before connecting,
            # as OpenSSL may take time to load.
            @ssl_context = OpenSSL::SSL::SSLContext.new
          end
    
          if proxy? then
            conn_addr = proxy_address
            conn_port = proxy_port
          else
            conn_addr = conn_address
            conn_port = port
          end
    
          D "opening connection to #{conn_addr}:#{conn_port}..."
          begin
            s = Socket.tcp conn_addr, conn_port, @local_host, @local_port, connect_timeout: @open_timeout
          rescue => e
            e = Net::OpenTimeout.new(e) if e.is_a?(Errno::ETIMEDOUT) #for compatibility with previous versions
            raise e, "Failed to open TCP connection to " +
              "#{conn_addr}:#{conn_port} (#{e.message})"
          end
          s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
          D "opened"
          if use_ssl?
            if proxy?
              plain_sock = BufferedIO.new(s, read_timeout: @read_timeout,
                                          write_timeout: @write_timeout,
                                          continue_timeout: @continue_timeout,
                                          debug_output: @debug_output)
              buf = "CONNECT #{conn_address}:#{@PORT} HTTP/#{HTTPVersion}\r\n"
              buf << "Host: #{@address}:#{@PORT}\r\n"
              if proxy_user
                credential = ["#{proxy_user}:#{proxy_pass}"].pack('m0')
                buf << "Proxy-Authorization: Basic #{credential}\r\n"
              end
              buf << "\r\n"
              plain_sock.write(buf)
              HTTPResponse.read_new(plain_sock).value
              # assuming nothing left in buffers after successful CONNECT response
            end
    
            ssl_parameters = Hash.new
            iv_list = instance_variables
            SSL_IVNAMES.each_with_index do |ivname, i|
              if iv_list.include?(ivname)
                value = instance_variable_get(ivname)
                unless value.nil?
                  ssl_parameters[SSL_ATTRIBUTES[i]] = value
                end
              end
            end
            @ssl_context.set_params(ssl_parameters)
            @ssl_context.session_cache_mode =
              OpenSSL::SSL::SSLContext::SESSION_CACHE_CLIENT |
              OpenSSL::SSL::SSLContext::SESSION_CACHE_NO_INTERNAL_STORE
            @ssl_context.session_new_cb = proc {|sock, sess| @ssl_session = sess }
            D "starting SSL for #{conn_addr}:#{conn_port}..."
            s = OpenSSL::SSL::SSLSocket.new(s, @ssl_context)
            s.sync_close = true
            # Server Name Indication (SNI) RFC 3546
            s.hostname = @address if s.respond_to? :hostname=
            if @ssl_session and
               Process.clock_gettime(Process::CLOCK_REALTIME) < @ssl_session.time.to_f + @ssl_session.timeout
              s.session = @ssl_session
            end
            ssl_socket_connect(s, @open_timeout)
            if (@ssl_context.verify_mode != OpenSSL::SSL::VERIFY_NONE) && @ssl_context.verify_hostname
              s.post_connection_check(@address)
            end
            D "SSL established, protocol: #{s.ssl_version}, cipher: #{s.cipher[0]}"
          end
          @socket = BufferedIO.new(s, read_timeout: @read_timeout,
                                   write_timeout: @write_timeout,
                                   continue_timeout: @continue_timeout,
                                   debug_output: @debug_output)
          @last_communicated = nil
          on_connect
        rescue => exception
          if s
            D "Conn close because of connect error #{exception}"
            s.close
          end
          raise
        end
        private :connect
      end
    end
    
    begin
      sleep 0.2
    
      connection = Net::HTTP.start(HOST, PORT)
      connection.keep_alive_timeout = 3600
      connection_opt = Net::HTTPOpt.start(HOST, PORT)
      connection_opt.keep_alive_timeout = 3600
    
      unless connection.request_get("/100k.txt").body == connection_opt.request_get("/100k.txt").body
        abort("bug?")
      end
    
      if ARGV.first == "profile"
        require 'stackprof'
        require 'json'
    
        StackProf.run(mode: :wall, out: "/tmp/stackprof-net-http.dump", raw: true) do
          40_000.times do
            connection.request_get("/1M.txt").body
          end
        end
        File.write("/tmp/stackprof-net-http.json", JSON.dump(Marshal.load(File.binread("/tmp/stackprof-net-http.dump"))))
        system("stackprof", "/tmp/stackprof-net-http.rb")
    
        StackProf.run(mode: :wall, out: "/tmp/stackprof-net-http-opt.dump", raw: true) do
          40_000.times do
            connection_opt.request_get("/1M.txt").body
          end
        end
        File.write("/tmp/stackprof-net-http-opt.json", JSON.dump(Marshal.load(File.binread("/tmp/stackprof-net-http-opt.dump"))))
        system("stackprof", "/tmp/stackprof-net-http-opt.dump")
    
      else
        %w(1k 10k 100k 1M).each do |size|
          puts "=== #{size} ==="
          Benchmark.ips do |x|
            path = "/#{size}.txt"
            x.report("#{size}") { connection.request_get(path).body }
            x.report("#{size} opt") { connection_opt.request_get(path).body }
            x.compare!(order: :baseline)
          end
          puts
        end
      end
    ensure
      Process.kill('TERM', nginx_pid)
      Process.wait(nginx_pid)
    end
    
    ```
    byroot committed Aug 20, 2022
    Configuration menu
    Copy the full SHA
    781e400 View commit details
    Browse the repository at this point in the history