Skip to content

Not reading the entire response body leaks the connection on the client side #1353

Closed
@sfackler

Description

@sfackler

This program continually creates new connections without ever closing old ones:

extern crate hyper;
extern crate tokio_core;
extern crate tokio_io;
extern crate futures;
extern crate env_logger;

use tokio_core::reactor::{Core, Timeout};
use tokio_core::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
use hyper::{Client, Uri};
use hyper::client::HttpConnector;
use hyper::server::Service;
use std::time::Duration;
use std::cell::Cell;
use futures::{Future, Stream, stream};
use std::io::{self, Read, Write};

fn main() {
    env_logger::init().unwrap();

    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let client = Client::configure()
        .connector(DebugConnector(HttpConnector::new(1, &handle), Cell::new(0)))
        .keep_alive_timeout(Some(Duration::from_secs(5)))
        .build(&handle);

    let fut = stream::repeat(()).for_each(|()| {
        client.get("http://www.google.com".parse().unwrap())
            .and_then(|_| {
                println!("ignoring body");
                let timeout = Timeout::new(Duration::from_secs(1), &handle).unwrap();
                timeout.map_err(|e| e.into())
            })
            .map_err(|e| println!("error {}", e))
    });

    core.run(fut).unwrap();
}

struct DebugConnector(HttpConnector, Cell<u32>);

impl Service for DebugConnector {
    type Request = Uri;
    type Response = DebugStream;
    type Error = io::Error;
    type Future = Box<Future<Item = DebugStream, Error = io::Error>>;

    fn call(&self, uri: Uri) -> Self::Future {
        let id = self.1.get();
        self.1.set(id + 1);
        println!("new connection {}", id);
        Box::new(self.0.call(uri).map(move |s| DebugStream(s, id)))
    }
}

struct DebugStream(TcpStream, u32);

impl Drop for DebugStream {
    fn drop(&mut self) {
        println!("socket {} dropping", self.1);
    }
}

impl Write for DebugStream {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.0.write(buf)
    }

    fn flush(&mut self) -> io::Result<()> {
        self.0.flush()
    }
}

impl AsyncWrite for DebugStream {
    fn shutdown(&mut self) -> futures::Poll<(), io::Error> {
        AsyncWrite::shutdown(&mut self.0)
    }
}

impl Read for DebugStream {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.0.read(buf)
    }
}

impl AsyncRead for DebugStream {}
new connection 0
ignoring body
new connection 1
ignoring body
new connection 2
ignoring body
new connection 3
ignoring body
new connection 4
ignoring body
new connection 5
ignoring body
new connection 6
ignoring body
new connection 7
ignoring body
new connection 8
ignoring body
new connection 9
ignoring body
new connection 10
ignoring body
new connection 11
ignoring body
new connection 12
ignoring body
new connection 13
ignoring body
new connection 14
ignoring body
new connection 15
ignoring body
new connection 16
ignoring body
new connection 17
ignoring body
new connection 18
ignoring body
new connection 19
ignoring body
new connection 20
ignoring body
new connection 21
ignoring body
new connection 22
ignoring body
new connection 23
ignoring body
new connection 24
ignoring body
new connection 25
...

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-clientArea: client.C-bugCategory: bug. Something is wrong. This is bad!

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions