Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcp: allow connection pool callers to store protocol state #4131

Merged
merged 2 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions include/envoy/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ class UpstreamCallbacks : public Network::ConnectionCallbacks {
virtual void onUpstreamData(Buffer::Instance& data, bool end_stream) PURE;
};

/**
* ProtocolState is a base class for protocol-specific state that must be maintained across
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This comment is a little confusing in that it talks about state maintained across connections, but then it being destroyed when the connection is closed. Presumably it's up to the user to reattach the state to subsequent connections? Can you clarify? (Though given that we pass the pointer via move below and it's not shared I'm a little confused.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it should "protocol-specific connection state that must be maintained across requests." So state per connection available across requests. I'll make it clearer with an example.

My thought was to have the connection pool caller assign a ProtocolState with a std::unique_ptr<T> to make clear that the connection pool owns the data, and allow it to be accessed/modified via a T* later (which might be null if there is no state yet).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup makes sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And actually, ConnectionState is probably a better name for this anyway. It need not actually be related to a protocol.

* connections. The ProtocolState assigned to a connection is automatically destroyed when the
* connection is closed.
*/
class ProtocolState {
public:
virtual ~ProtocolState() {}
};

typedef std::unique_ptr<ProtocolState> ProtocolStatePtr;

/*
* ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are
* released back to the pool for re-use when their containing ConnectionData is destroyed.
Expand All @@ -70,13 +82,31 @@ class ConnectionData {
*/
virtual Network::ClientConnection& connection() PURE;

/**
* Sets the ProtocolState for this connection. Any existing ProtocolState is destroyed.
* @param ProtocolStatePtr&& new ProtocolState for this connection.
*/
virtual void setProtocolState(ProtocolStatePtr&& state) PURE;

/**
* @return T* the current ProtocolState or nullptr if no state is set or if the state's type
* is not T.
*/
template <class T> T* protocolStateTyped() { return dynamic_cast<T*>(protocolState()); }

/**
* Sets the ConnectionPool::UpstreamCallbacks for the connection. If no callback is attached,
* data from the upstream will cause the connection to be closed. Callbacks cease when the
* connection is released.
* @param callback the UpstreamCallbacks to invoke for upstream data
*/
virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE;

protected:
/**
* @return ProtocolState* pointer to the current ProtocolState or nullptr if not set
*/
virtual ProtocolState* protocolState() PURE;
};

typedef std::unique_ptr<ConnectionData> ConnectionDataPtr;
Expand Down
15 changes: 15 additions & 0 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::

Network::ClientConnection& connection();
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks);
void setProtocolState(ConnectionPool::ProtocolStatePtr&& state) {
parent_.setProtocolState(std::move(state));
};
ConnectionPool::ProtocolState* protocolState() { return parent_.protocolState(); }

void release(bool closed);

void invalidate() { conn_valid_ = false; }
Expand All @@ -60,6 +65,10 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
wrapper_->addUpstreamCallbacks(callbacks);
};
void setProtocolState(ConnectionPool::ProtocolStatePtr&& state) override {
wrapper_->setProtocolState(std::move(state));
}
ConnectionPool::ProtocolState* protocolState() override { return wrapper_->protocolState(); }

ConnectionWrapperSharedPtr wrapper_;
};
Expand Down Expand Up @@ -90,10 +99,16 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void onAboveWriteBufferHighWatermark() override;
void onBelowWriteBufferLowWatermark() override;

void setProtocolState(ConnectionPool::ProtocolStatePtr&& state) {
conn_state_ = std::move(state);
}
ConnectionPool::ProtocolState* protocolState() { return conn_state_.get(); }

ConnPoolImpl& parent_;
Upstream::HostDescriptionConstSharedPtr real_host_description_;
ConnectionWrapperSharedPtr wrapper_;
Network::ClientConnectionPtr conn_;
ConnectionPool::ProtocolStatePtr conn_state_;
Event::TimerPtr connect_timer_;
Stats::TimespanPtr conn_length_;
uint64_t remaining_requests_;
Expand Down
134 changes: 133 additions & 1 deletion test/common/tcp/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ using testing::_;

namespace Envoy {
namespace Tcp {
namespace {

struct TestProtocolState : public ConnectionPool::ProtocolState {
TestProtocolState(int id, std::function<void()> on_destructor)
: id_(id), on_destructor_(on_destructor) {}
~TestProtocolState() { on_destructor_(); }

int id_;
std::function<void()> on_destructor_;
};

} // namespace

/**
* Mock callbacks used for conn pool testing.
Expand Down Expand Up @@ -309,6 +321,9 @@ TEST_F(TcpConnPoolImplTest, VerifyBufferLimits) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that upstream callback fire for assigned connections.
*/
TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
Buffer::OwnedImpl buffer;

Expand Down Expand Up @@ -343,6 +358,9 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that upstream callback close event fires for assigned connections.
*/
TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) {
Buffer::OwnedImpl buffer;

Expand All @@ -360,6 +378,9 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that a connection pool functions without upstream callbacks.
*/
TEST_F(TcpConnPoolImplTest, NoUpstreamCallbacks) {
Buffer::OwnedImpl buffer;

Expand Down Expand Up @@ -400,6 +421,45 @@ TEST_F(TcpConnPoolImplTest, MultipleRequestAndResponse) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Tests ProtocolState assignment, lookup and destruction.
*/
TEST_F(TcpConnPoolImplTest, ProtocolStateLifecycle) {
InSequence s;

bool state_destroyed = false;

// Request 1 should kick off a new connection.
ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection);

auto* state = new TestProtocolState(1, [&]() -> void { state_destroyed = true; });
c1.callbacks_.conn_data_->setProtocolState(std::unique_ptr<TestProtocolState>(state));

EXPECT_EQ(state, c1.callbacks_.conn_data_->protocolStateTyped<TestProtocolState>());

EXPECT_CALL(conn_pool_, onConnReleasedForTest());
c1.releaseConn();

EXPECT_FALSE(state_destroyed);

// Request 2 should not.
ActiveTestConn c2(*this, 0, ActiveTestConn::Type::Immediate);

EXPECT_EQ(state, c2.callbacks_.conn_data_->protocolStateTyped<TestProtocolState>());

EXPECT_CALL(conn_pool_, onConnReleasedForTest());
c2.releaseConn();

EXPECT_FALSE(state_destroyed);

// Cause the connection to go away.
EXPECT_CALL(conn_pool_, onConnDestroyedForTest());
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_TRUE(state_destroyed);
}

/**
* Test when we overflow max pending requests.
*/
Expand Down Expand Up @@ -555,6 +615,9 @@ TEST_F(TcpConnPoolImplTest, DisconnectWhileBound) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test upstream disconnection of one request while another is pending.
*/
TEST_F(TcpConnPoolImplTest, DisconnectWhilePending) {
InSequence s;

Expand Down Expand Up @@ -664,6 +727,9 @@ TEST_F(TcpConnPoolImplTest, MaxRequestsPerConnection) {
EXPECT_EQ(1U, cluster_->stats_.upstream_cx_max_requests_.value());
}

/*
* Test that multiple connections can be assigned at once.
*/
TEST_F(TcpConnPoolImplTest, ConcurrentConnections) {
InSequence s;

Expand Down Expand Up @@ -691,6 +757,61 @@ TEST_F(TcpConnPoolImplTest, ConcurrentConnections) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Tests ProtocolState lifecycle with multiple concurrent connections.
*/
TEST_F(TcpConnPoolImplTest, ProtocolStateWithConcurrentConnections) {
InSequence s;

int state_destroyed = 0;
auto* s1 = new TestProtocolState(1, [&]() -> void { state_destroyed |= 1; });
auto* s2 = new TestProtocolState(2, [&]() -> void { state_destroyed |= 2; });
auto* s3 = new TestProtocolState(2, [&]() -> void { state_destroyed |= 4; });

cluster_->resource_manager_.reset(
new Upstream::ResourceManagerImpl(runtime_, "fake_key", 2, 1024, 1024, 1));
ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection);
c1.callbacks_.conn_data_->setProtocolState(std::unique_ptr<TestProtocolState>(s1));
ActiveTestConn c2(*this, 1, ActiveTestConn::Type::CreateConnection);
c2.callbacks_.conn_data_->setProtocolState(std::unique_ptr<TestProtocolState>(s2));
ActiveTestConn c3(*this, 0, ActiveTestConn::Type::Pending);

EXPECT_EQ(0, state_destroyed);

// Finish c1, which gets c3 going.
EXPECT_CALL(conn_pool_, onConnReleasedForTest());
conn_pool_.expectEnableUpstreamReady();
c3.expectNewConn();
c1.releaseConn();

conn_pool_.expectAndRunUpstreamReady();

// c3 now has the state set by c1.
EXPECT_EQ(s1, c3.callbacks_.conn_data_->protocolStateTyped<TestProtocolState>());
EXPECT_EQ(s2, c2.callbacks_.conn_data_->protocolStateTyped<TestProtocolState>());

// replace c3's state
c3.callbacks_.conn_data_->setProtocolState(std::unique_ptr<TestProtocolState>(s3));
EXPECT_EQ(1, state_destroyed);

EXPECT_CALL(conn_pool_, onConnReleasedForTest()).Times(2);
c2.releaseConn();
c3.releaseConn();

EXPECT_EQ(1, state_destroyed);

// Disconnect both connections.
EXPECT_CALL(conn_pool_, onConnDestroyedForTest()).Times(2);
conn_pool_.test_conns_[1].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(7, state_destroyed);
}

/**
* Tests that the DrainCallback is invoked when the number of connections goes to zero.
*/
TEST_F(TcpConnPoolImplTest, DrainCallback) {
InSequence s;
ReadyWatcher drained;
Expand All @@ -711,7 +832,9 @@ TEST_F(TcpConnPoolImplTest, DrainCallback) {
dispatcher_.clearDeferredDeleteList();
}

// Test draining a connection pool that has a pending connection.
/**
* Test draining a connection pool that has a pending connection.
*/
TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) {
InSequence s;
ReadyWatcher drained;
Expand All @@ -731,6 +854,9 @@ TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that the DrainCallback is invoked when a connection is closed.
*/
TEST_F(TcpConnPoolImplTest, DrainOnClose) {
ReadyWatcher drained;
EXPECT_CALL(drained, ready());
Expand All @@ -754,6 +880,9 @@ TEST_F(TcpConnPoolImplTest, DrainOnClose) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that busy connections are closed when the connection pool is destroyed.
*/
TEST_F(TcpConnPoolImplDestructorTest, TestBusyConnectionsAreClosed) {
prepareConn();

Expand All @@ -762,6 +891,9 @@ TEST_F(TcpConnPoolImplDestructorTest, TestBusyConnectionsAreClosed) {
conn_pool_.reset();
}

/**
* Test that ready connections are closed when the connection pool is destroyed.
*/
TEST_F(TcpConnPoolImplDestructorTest, TestReadyConnectionsAreClosed) {
prepareConn();

Expand Down
4 changes: 4 additions & 0 deletions test/mocks/tcp/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class MockConnectionData : public ConnectionData {
// Tcp::ConnectionPool::ConnectionData
MOCK_METHOD0(connection, Network::ClientConnection&());
MOCK_METHOD1(addUpstreamCallbacks, void(ConnectionPool::UpstreamCallbacks&));
void setProtocolState(ProtocolStatePtr&& state) override { setProtocolState_(state); }
MOCK_METHOD0(protocolState, ConnectionPool::ProtocolState*());

MOCK_METHOD1(setProtocolState_, void(ConnectionPool::ProtocolStatePtr& state));

// If set, invoked in ~MockConnectionData, which indicates that the connection pool
// caller has relased a connection.
Expand Down