Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

rodeos reliable amqp connection - develop #9336

Closed
wants to merge 55 commits into from

Conversation

heifner
Copy link
Contributor

@heifner heifner commented Jul 23, 2020

Change Description

Pulling over AMQP work now used in rodeos to develop.

===

As EOSIO gains more support for AMQP it gains the need in some use cases to reliably publish messages to AMQP exchanges. reliable_amqp_publisher is a small class for publishing messages to an exchange that:

  • Reconnects to the AMQP server on connection failure
  • Reconnects to the AMQP server on channel failure (such as when publishing to a non-existing exchange)
  • Publishes messages in an AMQP transaction for positive confirmation the broker received them
  • Queues unconfirmed messages while not connected
  • Saves unconfirmed messages on disk on exit, and restores on relaunch

reliable_amqp_queue expects that the exchange it is configured to publish to will exist. It will not attempt to create it.

I am expecting and fully accept grief due to the lack of integration tests. I would like to explore with the team how we can best add integration tests for AMQP.

===

Getting a bit tired of copy pasting around some amqp connection management stuff in a few places. retrying_amqp_connection tries to solve the typical use case of needing a single channel connected to an AMQP server that is retried on failure. Compared to the code that was in reliable_amqp_publisher previously, a few functional differences are:

  • Doesn’t use AMQP::LibBoostAsioHandler; AMQP-CPP upstream is very clear that this isn’t supported
  • Doesn’t rely on AMQP’s TCP module; that would break a win32 build
  • Retries the connection/channel every second instead of a back off on consecutive failures
  • Doesn’t print the AMQP password to logs (whoops)

But there will likely be future improvements easier to add this way too, like TLS support with peer auth for example.

===

  • Add reliable_amqp_publisher to rodeos streamer to provide reliable publishing of messages.
  • Added ability to make the reliable publishing by block optional so that messages are published immediately as the filter is processed.

Change Type

Select ONE

  • Documentation
  • Stability bug fix
  • Other
  • Other - special case

Consensus Changes

  • Consensus Changes

API Changes

  • API Changes

Documentation Additions

  • Documentation Additions

New options:
Command line option: --stream-delete-unsent
"Delete unsent AMQP stream data retained from previous connections"

Config option: stream-rabbits-immediately=true
"Stream to RabbitMQ immediately instead of batching per block. Disables reliable message delivery."

spoonincode and others added 30 commits July 22, 2020 15:38
it won't be possible to shutdown cleanly if it's stopped
& just support passing an io_context for now. Move most the impl to a .cpp since it's not templated any longer
})
.onFinalize([this]() {
in_flight = 0;
//unfortuately we don't know if an error is due to something recoverable or if an error is due
Copy link
Contributor

@kimjh2005 kimjh2005 Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1. If commitTransaction() fails, we may need to log it. We can get useful information such as how often it happens, the message_id, the number of messages in the envelop, the size of envelop, etc.

#2. Can the size of AMQP::Envelop be too big for rabbitmq if the the number or message with the same message_id is too many and too big?

#3. Can pump_queue() be called continually and consuming all the cpu if it doesn't cause channel_failed and still commitTransaction fails?

Copy link
Contributor

@spoonincode spoonincode Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The default max message size in rabbitmq is 128MB; rabbitmq's max configurable is 512MB; amqp's max is 2GB. These limits were no where close to what I was expecting this to be used for originally. @heifner is rodeos usage a lot of data?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think any error is going to bubble up as a channel error or connection error, both of which have a 1s retry timer on them. You can see this by doing actions like publishing to a non-exsisting exchange, or publishing to an exchange that is bound to a queue that is full (and rejecting publishing due to that). Both of those result in a channel error and a 1s retry before attempting again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. No way they will be anywhere near that size.

boost::filesystem::ofstream o(data_file_path);
FC_ASSERT(o.good(), "Failed to create unconfirmed AMQP message file at ${f}", ("f", (fc::path)data_file_path));
}
boost::filesystem::remove(data_file_path, ec);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If preserving unsent messages is important, Instead of removing file here can it be renamed as a backup file or deleted later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe our current plan is to always persist to disk as we go along to support hard-failure cases like kill -9

Copy link
Contributor

@kimjh2005 kimjh2005 Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think of a case that the file has some unconfirmed messages because of the previous problem. And nodeos restarts and reads the file and crashes or killed by -9 before writing to a data file. Since we deleted the data file already, the next rerun will not have a valid data file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(the reason it works this way) was IMO if nodeos crashes I feel like you're going to have bigger problems. You're going to have to start anew from a snapshot or reply/resync anyways. So you can pick a block to restart from that covers the area affected.

But as Kevin explains, there is a desire to harden usage of file further.

Copy link
Contributor

@kimjh2005 kimjh2005 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good.

Copy link
Contributor

@jeffreyssmith2nd jeffreyssmith2nd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold for product review

@heifner
Copy link
Contributor Author

heifner commented Mar 30, 2021

Way out of date.

@heifner heifner closed this Mar 30, 2021
@heifner heifner deleted the retrying_amqp_connection-develop branch March 30, 2021 19:53
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants