Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast txmeta: implement and profile parallel ingestion #2552

Closed
5 tasks done
ire-and-curses opened this issue May 5, 2020 · 60 comments
Closed
5 tasks done

Fast txmeta: implement and profile parallel ingestion #2552

ire-and-curses opened this issue May 5, 2020 · 60 comments
Assignees

Comments

@ire-and-curses
Copy link
Member

ire-and-curses commented May 5, 2020

We know that using AWS Batch we can do a complete historical ingestion x2 + verification in ~20 hrs (for horizon-cmp testing). However in this case we don’t write to a shared database. Adding this component also adds negatives to the AWS Batch solution:

  1. Adding the ability to write to a shared database would be a significant change that is probably hard to do effectively using AWS Batch.
  2. We'd like the offline parallel solution to be generic if possible, rather than tied to any one cloud architecture.
  3. We don’t know when we get bottlenecked on DB writing.

Assuming we are not going with an off-the-shelf solution, then the parallel orchestration needs to happen on the Horizon side. We can start simply, without trying to optimise for historical ledger size changes.

  • Discuss any alternatives to the path forward sketched out below
  • Create a new Horizon “--parallel” option which spawns multiple FSC instances for bulk historical catchup
  • Do some experimenting to determine i) what a reasonable batch size should be (100,000 is a first guess); ii) where the bottlenecks might be (Horizon CPU? Postgres DB write throughput? etc.); iii) what the limits of parallelisation are (how many workers can be efficiently supported?)
  • Confirm reliability of the parallel implementation and data integrity under scenarios such as workers disappearing, hanging or crashing
  • Document all of the above, including memory requirements

Meta issue: #2550

@bartekn bartekn added blocked blocked by another issue and removed blocked blocked by another issue labels May 20, 2020
@bartekn
Copy link
Contributor

bartekn commented May 20, 2020

In Horizon team meeting we decided to try it without using AWS Batch first. If a single instance solution is not fast enough or in case of any issues we can explore AWS Batch again.

#2322 is in a state that can be run and tested with horizon db reingest range command. Let's see if it actually can reingest ledgers correctly and how fast it is.

I think this can be broken down into the following steps (please feel free to change/reorder):

  1. Research which EC2 instance type we want to use to reingest pubnet history and database instance params. x1.32xlarge was suggested as a Horizon machine but maybe there's something better suited for this job. Remember that captive core requires ~2GB for the latest ledger state and that we aim for <24h total ingestion time. Let's discuss the choice here first before we continue.
  2. Start as many workers as possible, when reingestion is complete dump history_* tables (see script in verify-range docker image).
  3. Repeat the step above but with a DB backend.
  4. Ensure history_* tables are exactly the same. Compare running times.

Adding this component also adds negatives to the AWS Batch solution

Even if a single machine prototype works well I think AWS Batch offers a few advantages:

  • In case a DB instance still has some free resources we can add more worker machines (it's possible that we will reach a RAM limit for the largest EC2 type but a DB node will still be fine).
  • It makes querying and analyzing the logs much easier in case we want to find the slowest ledgers or compare DB vs. captive solution on per ledger basis.
  • Better allocation of resources. If we split entire pubnet history into equally-long ranges the oldest ranges will be ingested faster (due to a smaller number of operations inside them). In AWS Batch, a free EC2 instance will reingest the next range in the queue.
  • AWS Batch operates using docker images. If we prepare a generic "reingest" image with a support for AWS Batch env variables it can be used outside of AWS, ex. in a private kubernetes cluster.

@2opremio 2opremio self-assigned this May 26, 2020
@2opremio
Copy link
Contributor

2opremio commented Jun 2, 2020

After discarding Argo Workflows as a solution I have given some thought to having horizon orchestrate the parallelization (both locally and/or in Kubernetes), coming up with these two options.

Captive Core Paralellization(1)

In approaches A and B we the goal is to re-ingest the range of ledgers [a, b] (i.e. commandhorizon db reingest a b) into the Horizon database, parallelizing the use of captive core.

A. In scenario A, we spawn multiple (m) captive core instances, each dedicated to a separate ledger subrange. Horizon collects the metadata from each stellar core instance and ingests it in the database.

B. In scenario B, we spawn multiple (m) pairs of Horizon+Captive core instances. This would be equivalent to independently running horizon db reingest a a+(a+b)/m, horizon db reingest a a+2*(a+b)/m, horizon db reingest a a+3*(a+b)/m .... horizon db reingest b-(a+b)/m b. The horizon instance of each pair is responsible for ingesting its subrange of ledgers directly. This means that the main Horizon process won't be doing any work other than orchestrating the execution of the Horizon+Core pairs.

My preference is approach B.

Both A and B can be run the parallel jobs locally (through subprocesses/goroutines ) or in multiple machines (e.g. in Kubernetes, by running the horizon container as part of a Job. We don't know if we will ever need to do multi-machine parallelization but it's worth noticing that option B is better suited for multiple machines. This is because there is no meaningful comuncation between the parallel jobs and the parent Horizon instance. All the Horizon<->Core communication happens within the Job's realm. In practice this means that we don't need to worry about setting up a fast communication channel and, more importantly, there is no network delay.

Additionally, B requires no buffering. In approach A (at least without major changes to the ingestion system) the ledgers are processed sequentially by the parent Horizon instance. This means that, if we want to take advantage of parallelization, we need to temporarily store the metadata produced by the captive core instances processing future ledgers. In approach B, the Horizon instance of each pair writes directly to the database, requiring no buffering.

Also, scenario B has the potential advantage of parallelizing the metadata processing (consumer side). We suspect this is going to be processing the bottleneck. Although, if the bottleneck is in the database itself, won't really help and it may even be counterproductive if we overload it with too many clients. That said, this approach will make sure we saturate the DB capacity.

I also see a couple of disadvantages of approach B:

  1. It seems like it will make it more complicated to restart ingestion if it's stopped for whatever reason (a crash or a pause), at least without wasting work. This would require recording the state of each Job, which independently writes to the database.
  2. I am not sure approach B can be used (or easily extended) to implement horizon expingest verify-range. Approach A certainly can.

@tamirms
Copy link
Contributor

tamirms commented Jun 2, 2020

I prefer approach B as well for the reasons you have mentioned. My concern about approach A is that it assumes the bottleneck is on the producer side instead of the consumer side. But, I suspect Horizon is slower at ingesting than fast stellar core is at producing ledgers

@ire-and-curses
Copy link
Member Author

I have some questions:

  1. Is it really true that in case A future ledgers must be buffered? I was under the impression that ledgers could be read in parallel on the Horizon side. I think we've discussed many times that we can ingest ledgers non-sequentially (cc @bartekn). If it is true, how much work are the changes that would be needed to support parallel reading to a single Horizon instance?
  2. It would be great to understand the true bottlenecks better before settling on a choice of architecture.
  3. Does option B require more total memory overall than option A?
  4. It seems to me that option B doesn't allow workers to pick up new ranges to ingest. E.g. imagine you have a pool of 4 workers. For optimal speed, you would like all workers to be equally loaded, so that you finish in the shortest time possible. If you performed a naive division, splitting the full ledger range equally across the workers, workers on earlier ledgers would finish much sooner (because early ledgers are sparse). To mitigate this you must either i) give those workers new work from the remaining list of unprocessed ledgers; ii) determine large non-equal ledger loads for each worker; iii) spawn new workers to handle the remaining list of unprocessed ledgers.

If we have to spawn new workers then that overhead must also be taken into account. If a worker is a horizon+core pair then is that overhead trivial or non-trivial?

@2opremio
Copy link
Contributor

2opremio commented Jun 3, 2020

  1. Is it really true that in case A future ledgers must be buffered? I was under the impression that ledgers could be read in parallel on the Horizon side. I think we've discussed many times that we can ingest ledgers non-sequentially (cc @bartekn).

Well, processing the ledgers in parallel on the horizon side would make it into B.

If it is true, how much work are the changes that would be needed to support parallel reading to a single Horizon instance?

Note that this is what I plan to do initially, before going for multiple machines.

  1. It would be great to understand the true bottlenecks better before settling on a choice of architecture.

Good point, I will start measuring today.

  1. Does option B require more total memory overall than option A?

I wouldn't think so, because of the lack of buffering.

  1. It seems to me that option B doesn't allow workers to pick up new ranges to ingest. E.g. imagine you have a pool of 4 workers. For optimal speed, you would like all workers to be equally loaded, so that you finish in the shortest time possible. If you performed a naive division, splitting the full ledger range equally across the workers, workers on earlier ledgers would finish much sooner (because early ledgers are sparse). To mitigate this you must either i) give those workers new work from the remaining list of unprocessed ledgers; ii) determine large non-equal ledger loads for each worker; iii) spawn new workers to handle the remaining list of unprocessed ledgers.

If we have to spawn new workers then that overhead must also be taken into account. If a worker is a horizon+core pair then is that overhead trivial or non-trivial?

I see this as a refinement of B. Processing extra ledgers without respawning workers provides extra complexity though:

  1. You need to setup a more sophisticated communication channel (e.g. a queue) which complicates things when using multiple machines.

  2. You need to modify core to alter which ledgers to process on the fly. Or, we can reuse Horizon but find acceptable to re-spawn core every time the ranges changes.

@bartekn
Copy link
Contributor

bartekn commented Jun 3, 2020

I also think we should pick option B for the reasons mentioned above. If I understand A, it would require changes to Horizon so that a single instance can parse multiple streams but also on core side: it would need to expose meta stream outside the container somehow (or at least an extra tool to do it).

@2opremio: It seems like it will make it more complicated to restart ingestion if it's stopped for whatever reason (a crash or a pause), at least without wasting work. This would require recording the state of each Job, which independently writes to the database.

I think in case of a crash we simply clear a given range and start from scratch. If we pick ranges that are small it shouldn't really be an issue.

@2opremio: I am not sure approach B can be used (or easily extended) to implement horizon expingest verify-range. Approach A certainly can.

I think I'm missing something. Looks like we currently use an approach similar to B in verify-range in AWS Batch.

@ire-and-curses: Is it really true that in case A future ledgers must be buffered? I was under the impression that ledgers could be read in parallel on the Horizon side. I think we've discussed many times that we can ingest ledgers non-sequentially (cc @bartekn). If it is true, how much work are the changes that would be needed to support parallel reading to a single Horizon instance?

I think parallel has at least 3 meanings in the context of ingestion:

  1. In the online mode: we can have multiple ingestion instances that pick a master instance randomly that ingests the latest ledger. This is possible now.
  2. In the offline mode: we can split a network history into ranges and ingest these ranges in parallel. This is also possible now.
  3. In the offline mode: Horizon reads ledgers from multiple streams and ingest them one by one non-sequentially. It is not possible right now because something like that wasn't needed for a DB ledger backend because core DB always store ledgers in ranges. At the same time it would be simple to implement if we decide to go with solution A.

Does it make sense?

As for buffering, I don't quite understand. Can you elaborate? Core meta stream doesn't have an internal buffering so it only writes if there is a reader that reads. If you meant that buffering is needed to build a sequential range (ex. core1: streaming A->B and core2: streaming B+1->C so horizon need to buffer core2 ledgers) that it's not needed either. In such case you can simply run one core at a time.

@ire-and-curses: Does option B require more total memory overall than option A?

I'd say it's the same because they run the same number of stellar-cores. Horizon memory usage when reingesting is negligible (except situations when the ledger is huge: we still keep entire LedgerCloseMeta in memory).

@ire-and-curses: It seems to me that option B doesn't allow workers to pick up new ranges to ingest. E.g. imagine you have a pool of 4 workers. For optimal speed, you would like all workers to be equally loaded, so that you finish in the shortest time possible. If you performed a naive division, splitting the full ledger range equally across the workers, workers on earlier ledgers would finish much sooner (because early ledgers are sparse). To mitigate this you must either i) give those workers new work from the remaining list of unprocessed ledgers; ii) determine large non-equal ledger loads for each worker; iii) spawn new workers to handle the remaining list of unprocessed ledgers.

When running multiple ranges on a single machine I think it's actually trivial and can be done with simple Go script (possibly even a bash script): you start a go routine that adds new ranges to a channel, and N go routines (workers) that read from that channel and run horizon db reingest command. I guess it could be a subcommand in Horizon if a single machine ingestion is proved to be fast.

When using multiple machines: yes, we need some queue implementation. Fortunately, there are out-of-box solutions: AWS Batch, Kubernetes (first result for "kubernetes queue": "Coarse Parallel Processing Using a Work Queue"). In the end, this could also be a simple script (we just really need to be able to execute two commands: 1) start a new job, 2) check the status of the job, which should be available in all platforms).

@ire-and-curses
Copy link
Member Author

Ok cool! I'm happy with going ahead with option B and seeing what we find out about overheads. 👍

@bartekn bartekn added this to the Horizon 1.5.0 milestone Jun 4, 2020
@2opremio
Copy link
Contributor

2opremio commented Jun 7, 2020

I have finally gotten some numbers from horizon db reingest range with and without captive core.

TL;DR: main conclusions:

  1. As we expected, doing horizon db reingest range with captive-core is considerably faster than doing stellar-core catchup(on a local postgres instance) + horizon db reingest range without captive core (60% faster).
  2. Using a single captive core + horizon instance (i.e. what the code does today) the Horizon postgres instance isn't the bottleneck (see throughput metrics below). This makes it suitable to parallelize multiple instances of Horizon + Captive core (approach B).
  3. Judging by the CPU graphs of Horizon and Captive Core during horizon db reingest range, and contrary to what we thought, it seems Core is a bigger bottleneck than Horizon in the re-ingestion pipeline.
  4. I think Captive Core can be made faster by parallelizing the verification and download of checkpoints and processing them (I may be wrong, but it seems that this is done sequentially ( @graydon ?). This may help with (3).
  5. I think approach (B) should work well, at least for our AWS environment. The DB allows for a lot for IOPS and network bandwidth than what a single Horizon+Captive-core instance provides.

Environment

Mimicking the production environment, I created a c5.xlarge ec2 instance in AWS, where I run Horizon. I used the same machine for running stellar-core catchup and the Core postgres DB (it seemed overkill to have a dedicated machine just for this).

Horizon, Core and Postgres (when run locally) where all run in containers (I couldn't be bothered to use debian packages), connected to the Stellar production network.

As for the Horizon postgres DB, I run tests both using a local DB and a r4.8xlarge RDS instance.

Tests

The tests consisted in reingesting 14 hours worth of ledgers (10048 ledgers, with sequence numbers between 29944063 and 29954111, for which I have a assumed a close time of 5 seconds). Recent ledgers were picked to ensure they were reasonably full.

Without Captive Core, using a local Horizon Postgres instance

  1. Running stellar-core catchup 29954111/10048 took 40 minutes and 6 seconds
  2. Running horizon db reingest core took 16 minutes and 45 seconds

With Captive Core, using a local Horizon Postgres instance

Running horizon db reingest core took 22 minutes and 41 seconds.

Here are the CPU + memory graphs (obtained with psrecord ) of:

  • Horizon:
    horizon-local-postgres

  • Captive-core:
    core-local-postgres

  • Horizon local postgres instance:
    local-postgres

Sorry for the noisy graphs, I wrongly assumed psrecord would apply a moving average.

The graphs show a resident memory consumption of ~3GB for Captive Core and ~40MB for Horizon.

It's worth noting that Captive Core operates much closer to 100% CPU than Horizon, probably debunking our assumption that Horizon was the reingestion pipeline bottleneck. A data throughput graph would be more conclusive.

With Captive Core, using a production-like Horizon RDS instance

Running horizon db reingest core took 21 minutes and 30 seconds. Note how the execution time is very similar, indicating that the DB isn't a determining factor for a single Horizon instance (more on this below).

Here are the CPU + memory graphs of:

  • Horizon:
    horizon-remote-postgres

  • Captive-core:
    core-remote-postgres

Here are some graphs from the RDS instance, indicating that it's far from saturating. This means it would accept multiple Horizon + Captive-core pairs running in parallel.

  • IOPS
    IOPS

  • Write throughput (in MB/s)
    Write throughput

  • Network throughput (in Mb/s)
    Network throughput

At peak, the reingestion requires ~400 IOPS and a network throughput of ~500Mb/s .

RDS supports a baseline of 3000 IOPS and EC2 supports a maximum point-to-point network a traffic of 5Gbps with a total maximum of 25Gbps. We would be able to run ~6 Horizon + Captive core pairs in parallel in order to reach those 3000 IOPS.

Now, the CPU graphs indicate that, at peak, Horizon + Captive-core consume 1.8 vCPUs (100% CPU from core and 80% from Horizon). This means that a single c5.xlarge machine (4 vCPUs) would be able to execute 2 Horizon+core pairs.

We would need to use 3 machines in order to reach the 3000 IOPS baseline (or a c5.4xlarge machine with 16 cores, with 4 leftover cores and causing a point-to-point network traffic of 2.4 Gbps).

And, as a ballpark, using those 3 machines, ingesting 14 hours worth of ledgers would take 21.5 / 6 = 3.6 minutes = 215 seconds

So, reingesting a full year worth of ledgers would take roughly (3600 * 24 * 365) / (14*3600) * 215 = 134375 seconds = ~37 hours.

And, as a pessimistic estimation (initial Stellar ledgers are almost empty). Reingesting the full history (~5 years) should take (37*5) / 24.0 = ~8 days

In theory, I think we may be able to quadruple the core+horizon pairs to 24, taking 2 days for the full reingestion.

This would require 9.6 Gbps of network traffic (still far from the 25Gbps limit) and 3000*4 = 12000 IOPS during that time.

@2opremio
Copy link
Contributor

2opremio commented Jun 7, 2020

Based on my analysis above, and before I get my hands dirty with the programmatic parallelization, I should be able to run a quick test running multiple isntances of horizon db reingest range splitting a wider range of ledgers.

If you all agree, I will do that next week.

@ire-and-curses
Copy link
Member Author

Thanks, very interesting.

Captive core is fast but not nearly as fast as I expected based on the numbers in stellar/stellar-core#2226.

Here we did 10k recent ledgers, and got 40 minutes for normal stellar-core, and 23 minutes for captive core.

According to stellar/stellar-core#2226, Graydon got 100k recent ledgers in 11 minutes for captive core. So that's 10 times more ledgers, in half the time.

Would be great to understand the difference. I'm wondering whether there is some configuration on the stellar-core side that we're missing.

@ire-and-curses
Copy link
Member Author

Also I note that the ledger range indicated is actually 16.5 hours, not 14. It's 10048 ledgers, for reference.

@2opremio
Copy link
Contributor

2opremio commented Jun 8, 2020

Here we did 10k recent ledgers, and got 40 minutes for normal stellar-core, and 23 minutes for captive core.

Note that 23 minutes is the time of Captive-core + Horizon re-ingestion, which, if my analysis is right should equate to the Captive-Core time (i.e. Captive core being the bottleneck in the pipeline). However, I may be wrong. For instance I assumed Captive-core is single-threaded, which may not be the case (and in turn 100% doesn't indicate that it's maxing out its CPU consumption).

Also I note that the ledger range indicated is actually 16.5 hours, not 14. It's 10048 ledgers, for reference.

(10048 ledgers, with sequence numbers between 29944063 and 29954111, for which I have a assumed a close time of 5 seconds)

(10048 * 5) / 3600.0 = 13.95 . What am I missing?

@2opremio
Copy link
Contributor

2opremio commented Jun 8, 2020

As for buffering, I don't quite understand. Can you elaborate? Core meta stream doesn't have an internal buffering so it only writes if there is a reader that reads. If you meant that buffering is needed to build a sequential range (ex. core1: streaming A->B and core2: streaming B+1->C so horizon need to buffer core2 ledgers) that it's not needed either. In such case you can simply run one core at a time.

I just realized I never addressed this. Sorry @bartekn .

If we run one core at a time (i.e. sequentially) then there is no parallelization, hence the need of buffering for approach A.

@graydon
Copy link
Contributor

graydon commented Jun 9, 2020

I did some checking on my workstation and the good news here is that captive core is as fast as I measured it before when run on the ranges I ran it on before. The bad news is that recent ledger ranges, as you've noticed, run ~10x slower. They are slower not for any mysterious reason: they just have 4x more ops (per ledger), and a little more than 2x more-expensive (per op) operations.

This is because people have taken to spamming us with maximum-volume (1000 ops per ledger) arbitrage-attempts, trying to win free money by submitting as many (computationally-expensive) path-payment-loops as they can. It's not really great behaviour, and everything on the network is suffering from it, but we haven't really figured out a sufficient way to discourage it yet (or digest it at a low enough cost to ignore it).

Here's a 10,000-ledger replay segment (and reporting per-op metrics) back in the good ol' days of pubnet ledger number 20,000,000, before such traffic started:

$ time ./src/stellar-core --conf ~/stellar-mainnet.cfg catchup --replay-in-memory 20000000/10000 --metric ledger.operation.apply
...
2020-06-08T16:37:22.017 GBUPX [default INFO]            count = 723508
2020-06-08T16:37:22.017 GBUPX [default INFO]        mean rate = 10016.1 calls/s
...
real	1m47.529s
user	1m56.967s
sys	0m12.509s

Watch those numbers! 10k ops/sec, 723k ops over 10k ledgers.

Here's the same sized span but a while later, ledger 25,000,000, when people were starting to experiment with spamming us with arbitrage txns:

$ time ./src/stellar-core --conf ~/stellar-mainnet.cfg catchup --replay-in-memory 25000000/10000 --metric ledger.operation.apply
...
2020-06-08T16:41:27.416 GBUPX [default INFO]            count = 1183200
2020-06-08T16:41:27.416 GBUPX [default INFO]        mean rate = 13323.1 calls/s
...
real	2m26.164s
user	2m28.947s
sys	0m17.808s

Same 10k ledgers but now 1.18m ops (running actually a bit faster here, at 13k ops/sec).

Now let's look at the (very recent, very spammy) segment you replayed above:

$ time ./src/stellar-core --conf ~/stellar-mainnet.cfg catchup --replay-in-memory 29954111/10048 --metric ledger.operation.apply
...
2020-06-08T16:58:57.188 GBUPX [default INFO]            count = 3311177
2020-06-08T16:58:57.188 GBUPX [default INFO]        mean rate = 3751.22 calls/s
...
real	15m21.582s
user	15m52.051s
sys	0m26.609s

Here we're up to 3 million ops in our 10k ledgers -- from 700k in the first segment -- and we're only managing to run 3.7k ops/sec rather than our original 10k ops/sec.

So while it's certainly possible we can optimize this a bit more, it's not unexpected when you're dealing with such an increase in load. You can't and shouldn't extrapolate replay speed from recent ledgers back over the entire 30-million-ledger history. Almost all of history runs vastly faster than this.

Here's a 10k replay at ledger 15,000,000 (back when most ledgers had 1-2 txs each):

$ time ./src/stellar-core --conf ~/stellar-mainnet.cfg catchup --replay-in-memory 15000000/10000 --metric ledger.operation.apply
...
2020-06-08T17:12:10.988 GBUPX [default INFO]            count = 21381
2020-06-08T17:12:10.988 GBUPX [default INFO]        mean rate = 1348.22 calls/s
...
real	0m21.492s
user	0m18.055s
sys	0m5.815s

@ire-and-curses
Copy link
Member Author

(10048 * 5) / 3600.0 = 13.95 . What am I missing?

It's just that the assumption of 5s doesn't hold at the moment, because these are very full ledgers, as Graydon mentions.

Ledger 29954111 closed at 2020-06-03T14:45:14Z.
Ledger 29944063 closed at 2020-06-02T22:15:59Z.
The difference between these datetimes is 16h29m15s.

@graydon, thanks for the sanity check rerunning your benchmarks (also it's really helpful to see the command line you used). This is much closer so that's good. But the times still don't match.

Graydon's benchmark:

Now let's look at the (very recent, very spammy) segment you replayed above:
...
user 15m52.051s

Fons' benchmark:

Running horizon db reingest core took 22 minutes and 41 seconds.

So where does that 7 minute difference come from? If we're limited by captive core replay speed as we believe, then shouldn't these times be the same? Do we think this is overhead in the buffering interface back to Horizon? Since this is very sensitive to particular ledgers, can we confirm the reingest range was the same set of ledgers?

@graydon
Copy link
Contributor

graydon commented Jun 9, 2020

If I had to guess I'd imagine part will arise from the fact that I was not actually streaming txmeta (much less into a pipe which was continuously blocking/sleeping/waking as the consumer side drained it). I'll re-run on a simulated consumer (shasum) and report back.

Also possible/likely that we're on different hardware. This is not running on a VM, but a desktop machine. It's not especially new or fresh hardware, but it is decently robust dual xeon with lots of memory. It's also reading from a local (on-disk) archive, not over the network, so all the download-and-decompress jobs are just "cp". I'll use a remote archive for my re-run also.

@graydon
Copy link
Contributor

graydon commented Jun 9, 2020

Hmm, so running with curl and piping the txmeta to shasum on the range 29954111/10048 gives me:

real 17m34.330s
user 17m28.151s
sys 0m34.959s

Which is a little slower but not as bad as @2opremio saw. I'd guess that the difference might have to do with the turnaround time it takes horizon to read and process each txmeta packet -- is it possible to increase the buffering on horizon's side of the pipe?

@bartekn
Copy link
Contributor

bartekn commented Jun 9, 2020

If I had to guess I'd imagine part will arise from the fact that I was not actually streaming txmeta (much less into a pipe which was continuously blocking/sleeping/waking as the consumer side drained it).

I think this is correct. I think we can try to run the test with #2664. It adds buffering so if there is a small delay (from reader or writer) the buffer will be used to hold the data so (in theory) it will probably make it faster.

@2opremio
Copy link
Contributor

2opremio commented Jun 9, 2020

is it possible to increase the buffering on horizon's side of the pipe?

I will give this a try and report back

@2opremio
Copy link
Contributor

2opremio commented Jun 9, 2020

We already had a 1MB buffer, but it wasn't filled concurrently.

Interestingly, after adding 4 1MB read-ahead buffers the situation worsens (It took 25 mins)

I will play with the buffer sizes and disable Core logging to see if it improves.

@2opremio
Copy link
Contributor

2opremio commented Jun 9, 2020

Scratch that, the situation didn't worsen (it didn't improve either). After removing the read-ahead buffers (and simply leaving the original bufio.Reader()) it keeps taking 25 minutes.

So, for some reason stellar-core is slower today. Lower S3 throughput today?

@graydon can you share some details about how captive core downloads and processes checkpoint data? Is it done concurrently?

@graydon
Copy link
Contributor

graydon commented Jun 9, 2020

It uses a bounded amount of concurrency, by default running 16 subprocesses at a time in a queue ahead of the downloaded file it's currently processing. You can increase this by setting MAX_CONCURRENT_SUBPROCESSES=N for some larger value in the config file.

You'll see less than 100%-of-a-core CPU usage if there's blocking or waiting like this though. I thought you said there was a full core active? Maybe it's just running on a slower CPU?

This is on a c5.xlarge right? (I can try reproducing your numbers / getting to the bottom of the discrepancy myself if that'd be helpful!)

@2opremio
Copy link
Contributor

2opremio commented Jun 9, 2020

Scratch that, the situation didn't worsen (it didn't improve either). After removing the read-ahead buffers (and simply leaving the original bufio.Reader()) it keeps taking 25 minutes.

Actually, this makes sense, since core is writing slower than Horizon reads (i.e. the buffer cannot read ahead, since there is nothing new to read).

@2opremio
Copy link
Contributor

2opremio commented Jun 9, 2020

You'll see less than 100%-of-a-core CPU usage if there's blocking or waiting like this though. I thought you said there was a full core active?

Good to confirm. Yes, it's running at 100%

Maybe it's just running on a slower CPU?

This is on a c5.xlarge right? (I can try reproducing your numbers / getting to the bottom of the discrepancy myself if that'd be helpful!)

I am suspect that, yes. I will run the same commands as in #2552 (comment) and come back with the time.

@bartekn
Copy link
Contributor

bartekn commented Jun 11, 2020

@2opremio it can possibly be connected to slowness: I'm currently running some experiments with the online mode - it doesn't really change anything when it comes to streaming. I was able to make it up and running and noticed that sometimes Horizon needs even 1s to ingest a testnet ledger (using testnet because building state is faster) with just a few operations. When it happens I can see "applying ledger" logs from Stellar-Core that actually go a few ledgers after the currently ingested ledger. Again, this is using the code without any buffering. I cherry-picked 54d4b53 into my dev branch and it doesn't happen anymore: all testnet ledgers are ingested in just milliseconds. This makes me think that there's some weirdness connected to bufio.Reader (maybe it's slow when changing the size of an internal buffer?). Not sure if it's connected (even partially) with the slowness you experience but... maybe. Decided to share.

@2opremio
Copy link
Contributor

2opremio commented Jun 22, 2020

After #2552 , reingesting range range: 20064319 to 20084415 from a c5.xlarge machine into a r4.8xlarge RDS instance leads to the following times:

  • --parallel-workers=1 (no parallelism): 11:24 minutes
  • --parallel-workers=2: 7:50
  • --parallel-workers=3: 7:28
  • --parallel-workers=4: 7:26

It's understandable that --parallel-workers=4 doesn't come with any improvement since the machine running horizon has 4 cores (which are maxed out with --parallel-workers=3 since there are 3 core instances plus a horizon instance consuming > 2 cores).

However, I find odd that there isn't a bigger improvement from 2 to 3 workers.

It may be that the IOPS of the RDS instance are reaching their peak, judging by the following graphs (the number of connections matches the number of workers):

Screenshot 2020-06-22 at 18 43 10

Screenshot 2020-06-22 at 18 43 24

I need to investigate (hopefully there is a mismatch in the IOPS of the RDS instance of my environment compared to production).

4\. I think Captive Core can be made faster by parallelizing the verification and download of checkpoints and processing them (I may be wrong, but it seems that this is done sequentially ( @graydon ?). This may help with (3).

@graydon sorry to insist on this, but it seems that that stellar-core spends a sizable amount of time downloading the history before Horizon starts to work (judging by the low Horizon CPU consumption and the curl+gzip processes spawned). It makes me think that captive core fully downloads the archives before starting to output metadata. Is this the case? Can the downloading be done in parallel with metadata processing?

@graydon
Copy link
Contributor

graydon commented Jun 22, 2020

@2opremio core does such a replay in effectively 3 phases, of which 2 happen before we emit anything to the metadata pipe. The first two are a fair bit shorter than the 3rd though.

  • Downloading, decompressing and verifying the ledger chain for a given range. Not including transactions or ledger state, just the very small ledger headers that make up the spine of the cryptographic data structure. This is because we want to ensure the rest of the data (buckets, transactions, etc) corresponds to verified / "secure" values derived from our consensus quorum. Currently the verification here has a serious gap in it (which I mentioned in our last call and which actually looks like it's going to be a fair bit worse than I predicted) but in any case something like what it's doing here has to happen from an integrity-checking perspective to avoid exposing core to untrusted input (eg. fake archive content). For the range in question (20064319 to 20084415 = 20096 ledgers = 315 checkpoints) the ledger chain looks like it's about 3.5MB. IOW this should happen quite quickly. I just ran it here and that phase completed on my workstation in 7 seconds.
  • Downloading, decompressing, verifying and applying (into an in-memory ledger) the contents of buckets at the first state in the ledger chain for the replay sequence. This represents the state of the ledger at the beginning of the range. It will involve something like 1-2GB of data downloaded and hopefully less than a minute to apply. Again on my workstation this phase takes 25 seconds to download and verify, then 20 seconds to apply.
  • Finally we begin replaying ledger transactions. This is the much longer and more involved part of the replay, and it interleaves download with apply -- the download-and-decompress steps for upcoming ledgers happen concurrently with the replay of current ledgers. IOW this part is as parallel as we can make it and we are emitting metadata while we go. On my workstation (on the example ledger range) this part takes 3.5 minutes.

@2opremio
Copy link
Contributor

2opremio commented Jun 23, 2020

@graydon thanks for the explanation! I guess it was phase (2) that I was observing while Horizon was idle.

I have also observed that reingesting the range in smaller jobs (which we need to do in order to reingest the full history) comes with a big overhead penalty, which I partly attribute to Horizon waiting for Core to start producing metadata.

I'm sure there is a strong reason for phase (2) and (3) to be done in a strictly sequential manner, but I thought it wouldn't hurt to ask. Wouldn't it be possible to execute them concurrently, in batches? 45 seconds out is a long time, particularly if we consider Horizon is idle during that time.

@bartekn
Copy link
Contributor

bartekn commented Jun 23, 2020

I'm sure there is a strong reason for phase (2) and (3) to be done in a strictly sequential manner, but I thought it wouldn't hurt to ask. Wouldn't it be possible to execute them concurrently, in batches?

I think it isn't possible because (2) needs to complete before (3). Ledger entries need to be stored in memory (2) before you can start applying transactions (3). The reason is that state (ledger entries) is needed to properly validate transactions. Ex. if you send an XLM payment you need to know if a sender has enough XLM.

@2opremio
Copy link
Contributor

2opremio commented Jun 23, 2020

I need to investigate (hopefully there is a mismatch in the IOPS of the RDS instance of my environment compared to production).

It turns out that I had used a 1TiB of General Purporse SSD storage for the RDS instance, whilst production uses 3.5TiB. According to Amazon's documentation:

Baseline I/O performance for General Purpose SSD storage is 3 IOPS for each GiB

So, by raising the storage size to 3.5TiB I expect the performance limit to triple (I will test that later today), but that will still be far from a 1-day full-history ingestion.

@2opremio
Copy link
Contributor

2opremio commented Jun 23, 2020

I think it isn't possible because (2) needs to complete before (3). Ledger entries need to be stored in memory (2) before you can start applying transactions (3). The reason is that state (ledger entries) is needed to properly validate transactions. Ex. if you send an XLM payment you need to know if a sender has enough XLM.

What I am suggesting is to break down the range in subranges S_1, S_2, ... S_N and apply a pipeline in which each subrange goes through the phases sequentially, but S_i+1 can go through phase (1) (2) and (3) provided that S_i is done. Isn't that a possibility?

That would respect ordering and minimize Horizon's wait on Core.

@2opremio
Copy link
Contributor

2opremio commented Jun 23, 2020

Well ... bummer. After increasing the RDS capacity to 3.5TiB ... the situation doesn't improve:

  • --parallel-workers=2: 7:58
  • --parallel-workers=3: 7:44
  • --parallel-workers=4: 07:40

Screenshot 2020-06-23 at 17 48 05

Interestingly ... if I simply spawn 4 horizons in parallel (with a 5024 subrange) they take ~6:45 each and we briefly surpass 3000 IOPS

@graydon
Copy link
Contributor

graydon commented Jun 23, 2020

What I am suggesting is to break down the range in subranges S_1, S_2, ... S_N and apply a pipeline in which each subrange goes through the phases sequentially, but S_i+1 can go through phase (1) (2) and (3) provided that S_i is done. Isn't that a possibility?

Wait, I'm confused -- isn't the entire point of this issue that you're looking at how fast you can make ingestion if you break ingestion down into subranges? I.e. isn't the 4-way parallelism you're trying to achieve here based on a split into 4 subranges?

(And aren't these subranges running in parallel? One captive core should absolutely be able to do parts (1) and (2) in parallel with another captive core doing its own parts (1) and (2). Possibly they can also do their respective parts (3) in parallel as well, depending on whether horizon wants to read their txmeta output sequentially. But I thought this was exactly what you were already doing: running multiple captive stellar-core instances in parallel on separate subranges.)

@2opremio
Copy link
Contributor

Wait, I'm confused -- isn't the entire point of this issue that you're looking at how fast you can make ingestion if you break ingestion down into subranges? I.e. isn't the 4-way parallelism you're trying to achieve here based on a split into 4 subranges?

Yes, you are right, that's what I am doing. However, for each subrange Horizon still needs to wait for phases (1) and (2) before it can start writing to the DB. This high priority though. I wanted to know whether there was a strong reason for core not to do it (other than simplicity).

The most important thing at this point is to maximize throughput with the DB.

@graydon
Copy link
Contributor

graydon commented Jun 23, 2020

I don't quite understand what you mean by "This high priority though. I wanted to know whether there was a strong reason for core not to do it". If you start two (or four) core proecesses at the same time, they will all start doing phase (1) and phase (2) at the same time. They will not block during those phases since they are not emitting any metadata. They will only block / wait for their reader (horizon) to be ready when they arrive at phase (3).

@2opremio
Copy link
Contributor

I meant this isn't high priority, sorry.

@graydon
Copy link
Contributor

graydon commented Jun 23, 2020

Oh ok! No worries. Let me know if you have further questions!

@2opremio
Copy link
Contributor

2opremio commented Jun 24, 2020

I managed to get the Write IOPS to peak at 10K in a beefier machine (c5.4xlarge) after spawning 16 workers.

When doing that, I can ingest range 20064319 to 20225087 (160768 ledgers) in 20 mins.

Extrapolating that to the full history, that means we should be able to ingest the full history in ((30283957 / 160768 ) * 20) / (24*60) = 2.6 days

We won't be able to get a better time in production without modifying the RDS storage, due to the IOPS limitation.

I will do a full reingestion once #2724 is merged.

@bartekn
Copy link
Contributor

bartekn commented Jun 24, 2020

What I am suggesting is to break down the range in subranges S_1, S_2, ... S_N and apply a pipeline in which each subrange goes through the phases sequentially, but S_i+1 can go through phase (1) (2) and (3) provided that S_i is done. Isn't that a possibility?

Doesn't the Si+1 waiting for Si make it a bad idea? To be able to ingest Si+1 you need to wait for Si and apply buckets (so (1) and (2)) - afaik, there is no other possibility.

What could be done using this idea (and I think you suggested this somewhere) is to have multiple Horizon instances reading from a single meta stream and any free worker takes next ledger. But it requires more (complicated) work on Horizon front.

Extrapolating that to the full history, that means we should be able to ingest the full history in ((30283957 / 160768 ) * 20) / (24*60) = 2.6 days

Looking at the calculations it doesn't take into account the fact the first half of the history is almost empty. So maybe it's actually possible to do that in <24h?

@2opremio
Copy link
Contributor

Looking at the calculations it doesn't take into account the fact the first half of the history is almost empty. So maybe it's actually possible to do that in <24h?

Maybe. I think we should just try it out.

@2opremio
Copy link
Contributor

Doesn't the Si+1 waiting for Si make it a bad idea? To be able to ingest Si+1 you need to wait for Si and apply buckets (so (1) and (2)) - afaik, there is no other possibility.

It doesn't, it's just a pipeline. You would save the initial wait time (or most of it) for phases (1) and (2) of captive core.

@2opremio
Copy link
Contributor

2opremio commented Jun 25, 2020

I managed to get the Write IOPS to peak at 10K in a beefier machine (c5.4xlarge) after spawning 16 workers.

When doing that, I can ingest range 20064319 to 20225087 (160768 ledgers) with 16 workers in 20 mins.

I switched to a larger machine (m5n.4xlarge) which has 16 cores, 64GB of memory and 25Gbps bandwidth.

The ingestion time (with the revised code from #2724 ) goes down to 18:32.

The Write IOPS went over the theoretical limit (3 * 3.5 GB = 10.5K IOPS) which is puzzling:

Screenshot 2020-06-25 at 13 22 04

Using a more recent ledger range ( 30139647 to 30300415 ) the memory consumption goes up to 45 GB and there are moments in which the 16 cores max out of 100% , so I think I will try with a larger instance

Regarding network bandwidth

Screenshot 2020-06-25 at 13 51 36

it peaks at ~3billion bytes every 300 seconds out and 2 billion in. Which is (20000000000.0 / (1000 * 1000 * 1000)) * 8 / 300.0 = ~0.5Gbps in and ~0.8Gbps out, so we we don't need an m5n instance with 25Gbps bandwidth, m5 should be fine.

I will try with an m5.8xlarge next.

@2opremio
Copy link
Contributor

2opremio commented Jun 29, 2020

I finally tested the reingestion of the the full history and it took 2 days and 9 hours.

TL;DR: I ran these two commands with the code from #2724 :

  1. horizon db regingest --retry-backoff-seconds=20 --retries=10 --parallel-workers=128 --parallel-job-size=100000 127 15000000
  2. horizon db regingest --retry-backoff-seconds=20 --retries=10 --parallel-workers=24 --parallel-job-size=100000 15000000 30301792

This is how I did it (I will move this into proper documentation later on):

In order to mimick a production rw DB, I created ar4.8xlarge Postgres RDS instance with 3.5TB of General Purpose SSD storage.

To run Horizon (and Captive core) I created an m5.8xlarge EC2 instance (running Ubuntu 18.04). Although we use c5 for running Horizon in production, m5 fits better, because reingestion is both compute and memory intensive (when reingesting recent ledger ranges, which are really packed, each captive core instance consumes 3GB of memory).

I ran the Horizon code from PR #2724 (which will be included in release 1.6.0) and jobs of 100K ledgers.

The larger the job, the better performance from Captive Core's perspective, but, you want to choose a job size which maximizes the time all workers are busy (e.g. if you use too big of a job size, since ledger ranges aren't equally packed, eventually some workers will be done and be idle). Alternatively, we could have an algorithm which dynamically creates workers and adjusts the job size, but I don't think it's worth the pain.

I also spawned two horizon db reingest instances, one for the first half of the history (ledgers 1 to 15 million) with 128 workers and another one for the most recent half with 24 workers. This is due to the first half of the history being almost empty whilst the second hand is much more packed. Having a single Horizon instance with enough workers to saturate the IO capacity of the machine for the the first half of the history would had most likely killed the machine when reingesting the second half.

The first half was done in about 5 hours. The second half took the full 2 days and 9 hours. And it's worth noting that the last 2 million ledgers (out of 30 million) took almost a full day.

Here are the write IOPS and wait graphs of the RDS instance:

Screenshot 2020-06-29 at 16 10 01

Screenshot 2020-06-29 at 16 13 03

Here are the Network and CPU graphs of the EC2 instance:

Screenshot 2020-06-29 at 16 20 08

Screenshot 2020-06-29 at 16 19 12

Screenshot 2020-06-29 at 16 21 30

(There don't seem to be Cloudwatch graphs for memory consumption, but it peaked at ~70GB when processing the last couple of million ledgers)

It's worth noting that:

  1. Using 128 workers for the first half and 24 workers for the second half may be unnecessarily high but I wanted to make sure that the DB's IO capacity was matched. In fact, using less workers:
    • will reduce write-lock contention from the DB which in turn could make ingestion a bit faster.
    • would probably allow for a smaller EC2 instance in which to run Horizon.
      I don't think we can reduce the number of workers by much if we want to keep saturating the DB write capacity.
  2. Using an RDS instance with more General Purpose SSD storage capacity (say, 10TB instead of 3.5TB) would had made the reingestion considerably faster since the storage capacity determines the IO capacity of the instance (according to Amazon's documentation you get 3 IOPS per GB of storage)
  3. Unexpectedly, the write IOPS graph of the RDS instance surpases the 10.5K IOPS limit (3 * 3500 GB), which I can't really explain.

bartekn added a commit that referenced this issue Jun 30, 2020
Wraps `metaPipe` in `stellarCoreRunner` in `bufio.Reader`. This improves
the speed of reading from a meta pipe.

In 936bc3a we removed `bufio.Reader` based on an observation [1] that it
slows down reading from a pipe. However, after running a CPU profiler on
reingestion code to debug another change I realized that a lot of time
is spent on reading from a pipe in methods like `xdr.Decoder.DecodeInt`
or `xdr.Decoder.DecodeUint` (that read just 4 bytes).

When reingesting 1000 ledgers all executions of this method take 17% of
entire run time! It was clear that there's an overhead time connected to
reading from a pipe. After wrapping meta pipe in `bufio.Reader` the
speed of reingestion of the same range improved by 25% (this isn't
affected by Stellar-Core init time, the timer starts after
`PrepareRange` is done).

Before:
```
INFO[2020-06-30T12:03:44.671+02:00] Reingestion done     duration=74.352344343 from=29999000 pid=44533 service=expingest to=30000000
```
After:
```
INFO[2020-06-30T12:06:14.829+02:00] Reingestion done     duration=55.449094814 from=29999000 pid=45181 service=expingest to=30000000
```

The change in 936bc3a solves the problems with reflection used in XDR
decoder being slow. This commit mitigates issues with overhead time
added when reading from a pipe.

[1] #2552 (comment)
@2opremio
Copy link
Contributor

I run a little test to verify that the parallel code didn't break reingestion by comparing database dumps of

horizon db reingest range --retry-backoff-seconds=20 --retries=10 --parallel-workers=3 --parallel-job-size=1000 29954111 29964159

and

horizon db reingest range --retry-backoff-seconds=20 --retries=10 29954111 29964159

I reused part of the verify-range container code for that.

It's a 10K ledger range, but it should be enough to check the parallel code is not missing ledgers when splitting the range in subranges.

@bartekn bartekn removed this from the Horizon 1.5.0 milestone Jul 1, 2020
@2opremio
Copy link
Contributor

2opremio commented Jul 3, 2020

This is done. It's been handed over to ops.

@2opremio 2opremio closed this as completed Jul 3, 2020
@ire-and-curses
Copy link
Member Author

For future reference: around 1.5 days for full reingestion with bigger hardware than above.

https://github.com/stellar/go/blob/master/services/horizon/internal/docs/captive_core.md#using-captive-core-to-reingest-the-full-public-network-history

stellar-horizon db reingest range --parallel-workers=64 1 16999999
stellar-horizon db reingest range --parallel-workers=24 17000000 <latest_ledger>

When saturating an RDS instance with 15K IOPS capacity:

(1) should take a few hours to complete.

(2) should take about 1.5 days to complete.

Hardware spec:

An m5.8xlarge (32 cores, 64GB of RAM) EC2 instance with at least 200 GB of disk capacity from which to run Horizon. This is needed to fit 24 Horizon parallel workers (each with its own Captive Core instance). Each Core instance can take up to 3GB of RAM and a full core (more on why 24 workers below). If the number of workers is increased, you may need a larger machine.

A Horizon database, where to reingest the History. Preferably, the database should be at least an RDS r4.8xlarge instance or better (to take full advantage of its IOPS write capacity) and should be empty, to minimize storage (Postgres accumulates data during usage, which is only deleted when VACUUMed). When using an RDS instance with General Purpose SSD storage, the reingestion throughput of the DB (namely Write IOPS) is determined by the storage size (3 IOPS per GB). With 5TB you get 15K IOPS, which can be saturated with 24 Horizon workers. As the DB storage grows, the IO capacity will grow along with it. The number of workers (and the size of the instance created in (1), should be increased accordingly if we want to take advantage of it. To make sure we are minimizing the reingestion time, we should look at the RDS Write IOPS CloudWatch graph. The graph should ideally always be close to the theoretical limit of the DB (3000 IOPS per TB of storage).

@graydon
Copy link
Contributor

graydon commented Nov 5, 2020

@ire-and-curses I'm assuming core is not consuming many IOPS during these runs (assuming it's all running captive in-memory) but if you're IOPS-limited on the horizon side, I wonder if you've considered trying running on instance-local NVMe storage.

I ask because -- for the core team's own testing work -- switching to instance NVMe storage has been a substantial performance win over "trying to get enough IOPS from EBS by renting gigantic EBS volumes". Much lower latency syncs and random IO, and IOPS up in the 100k-range. It might also play the same role here, to run your horizon postgres on, if you can find an instance with adequate instance store for your needs.

I see that the m5d.8xlarge instance type is only a bit more expensive ($1.53/hr => $1.80/hr) and has two 600GB NVMe instance disks; would a horizon database fit on one of those disks? An i3.8xlarge is a bit more still ($2.49/hr) and has x 1900GB NVMe instance disks, if the m5d.8xlarge isn't big enough...

@2opremio
Copy link
Contributor

2opremio commented Nov 5, 2020

@graydon While I was working on this issue in June, a Horizon RDS instance with the full history of prodnet consumed ~4TB of storage.

With those size requirements (which, in an ideal world, we should strive to reduce significantly to reduce running costs) General Purpose SSD storage seems like a nice fit.

I think NVMe would be too expensive. On the other hand, it may allow us to reingest the full history in a few hours or less.

@ire-and-curses
Copy link
Member Author

This is definitely something to keep in mind if/when we move to a model of configurable endpoints in Horizon to control DB size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants