Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tpu-client: Speed up performance by detaching tasks #32844

Closed
wants to merge 4 commits into from

Conversation

joncinque
Copy link
Contributor

@joncinque joncinque commented Aug 15, 2023

Problem

Program deployments have been slow starting in 1.16, as reported in #32595 and anecdotally by users. I was able to pinpoint this to the use of the full async API in 1.16, compared with the static async runtime in 1.14 which uses a form of send_data_async

async fn send_data_async(
.

I'm not 100% sure of why it's slow, but I think it's because we're always waiting until sending to all leaders is done before moving to the next message, so we're limited by our slowest send, which can be up to 250ms on devnet.

Summary of Changes

This uses the idea from the 1.14 implementation, but in a full async context, by detaching sending packets into tasks that can all run concurrently.

I tested this against a few networks, with devnet being the slowest. Without this PR, sending 100 self-transfers took 18s-28s, and with this PR it takes 2.5-3.5s.

Here's the test binary that I ran, adapted from https://github.com/mvines/solana-cli-template. I didn't think it was worth adding this as a test, since local validators don't show the problematic behavior. Let me know if you think it should be done otherwise!

After this lands, we can go through and replace usages of send_transaction with try_send_transaction_detached to get speed back in other places too.

use {
    clap::{crate_description, crate_name, crate_version, Arg, Command},
    solana_clap_v3_utils::{
        input_validators::{is_url_or_moniker, is_valid_signer, normalize_to_url_if_moniker},
        keypair::DefaultSigner,
    },
    solana_client::connection_cache::ConnectionCache,
    solana_remote_wallet::remote_wallet::RemoteWalletManager,
    solana_rpc_client::nonblocking::rpc_client::RpcClient,
    solana_sdk::{
        commitment_config::CommitmentConfig, message::Message, signature::Signer,
        system_instruction,
    },
    solana_tpu_client::{nonblocking::tpu_client::TpuClient, tpu_client::TpuClientConfig},
    std::{process::exit, sync::Arc, time::Instant},
};

struct Config {
    default_signer: Box<dyn Signer>,
    json_rpc_url: String,
    websocket_url: String,
}

async fn process_ping(
    rpc_client: RpcClient,
    websocket_url: &str,
    signer: &dyn Signer,
) -> Result<(), Box<dyn std::error::Error>> {
    let from = signer.pubkey();
    let to = signer.pubkey();
    let blockhash = rpc_client.get_latest_blockhash().await?;
    let messages = (0..100)
        .map(|i| {
            Message::new_with_blockhash(
                &[system_instruction::transfer(&from, &to, i)],
                Some(&signer.pubkey()),
                &blockhash,
            )
        })
        .collect::<Vec<_>>();

    let now = Instant::now();
    let connection_cache = ConnectionCache::new_quic("connection_cache_cli_program_quic", 1);
    let tpu_client = if let ConnectionCache::Quic(cache) = connection_cache {
        TpuClient::new_with_connection_cache(
            Arc::new(rpc_client),
            websocket_url,
            TpuClientConfig::default(),
            cache,
        )
        .await?
    } else {
        panic!("not possible");
    };
    let transaction_errors = tpu_client
        .send_and_confirm_messages_with_spinner(&messages, &[signer])
        .await
        .map_err(|err| format!("Data writes to account failed: {err}"))?
        .into_iter()
        .flatten()
        .collect::<Vec<_>>();

    if !transaction_errors.is_empty() {
        for transaction_error in &transaction_errors {
            println!("{:?}", transaction_error);
        }
        return Err(format!("{} write transactions failed", transaction_errors.len()).into());
    }

    println!("Took {}ms", now.elapsed().as_millis());
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let app_matches = Command::new(crate_name!())
        .about(crate_description!())
        .version(crate_version!())
        .subcommand_required(true)
        .arg_required_else_help(true)
        .arg({
            let arg = Arg::new("config_file")
                .short('C')
                .long("config")
                .value_name("PATH")
                .takes_value(true)
                .global(true)
                .help("Configuration file to use");
            if let Some(ref config_file) = *solana_cli_config::CONFIG_FILE {
                arg.default_value(config_file)
            } else {
                arg
            }
        })
        .arg(
            Arg::new("keypair")
                .long("keypair")
                .value_name("KEYPAIR")
                .validator(|s| is_valid_signer(s))
                .takes_value(true)
                .global(true)
                .help("Filepath or URL to a keypair [default: client keypair]"),
        )
        .arg(
            Arg::new("json_rpc_url")
                .short('u')
                .long("url")
                .value_name("URL")
                .takes_value(true)
                .global(true)
                .validator(|s| is_url_or_moniker(s))
                .help("JSON RPC URL for the cluster [default: value from configuration file]"),
        )
        .subcommand(Command::new("ping").about("Send a ping transaction"))
        .get_matches();

    let (command, matches) = app_matches.subcommand().unwrap();
    let mut wallet_manager: Option<Arc<RemoteWalletManager>> = None;

    let config = {
        let cli_config = if let Some(config_file) = matches.value_of("config_file") {
            solana_cli_config::Config::load(config_file).unwrap_or_default()
        } else {
            solana_cli_config::Config::default()
        };

        let default_signer = DefaultSigner::new(
            "keypair",
            matches
                .value_of("keypair")
                .map(|s| s.to_string())
                .unwrap_or_else(|| cli_config.keypair_path.clone()),
        );

        let json_rpc_url = normalize_to_url_if_moniker(
            matches
                .value_of("json_rpc_url")
                .unwrap_or(&cli_config.json_rpc_url),
        );

        let websocket_url = solana_cli_config::Config::compute_websocket_url(&json_rpc_url);
        Config {
            default_signer: default_signer
                .signer_from_path(matches, &mut wallet_manager)
                .unwrap_or_else(|err| {
                    eprintln!("error: {err}");
                    exit(1);
                }),
            json_rpc_url,
            websocket_url,
        }
    };
    solana_logger::setup_with_default("solana=info");

    let rpc_client =
        RpcClient::new_with_commitment(config.json_rpc_url.clone(), CommitmentConfig::confirmed());

    match (command, matches) {
        ("ping", _arg_matches) => {
            process_ping(
                rpc_client,
                &config.websocket_url,
                config.default_signer.as_ref(),
            )
            .await
            .unwrap_or_else(|err| {
                eprintln!("error: send transaction: {err}");
                exit(1);
            });
        }
        _ => unreachable!(),
    };

    Ok(())
}

Fixes #32595

Copy link
Contributor

@t-nelson t-nelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. r+ @lijunwangs approval

tpu-client/src/nonblocking/tpu_client.rs Show resolved Hide resolved
tpu-client/src/nonblocking/tpu_client.rs Show resolved Hide resolved
@KirillLykov
Copy link
Contributor

KirillLykov commented Aug 15, 2023

The same problem was addressed in #32388 and #32638
The approach was to use a new function send_and_confirm_transactions_in_parallel_blocking to be used instead of send_and_confirm_messages_with_spinner in cli/src/program.rs.
So these changes improve only program deploy, while it looks like you change have a larger impact because it introduces a new method to TpuClient
Would be interesting to compare the speed of both

@joncinque
Copy link
Contributor Author

The same problem was addressed in #32388 and #32638

Yeah I was following those PRs. I think they're great too! We can also do both at once. This PR pretty much restores the old behavior of sending quickly.

So it changes improves only program deploy, it looks like you change have a larger scope

Kind of, this restores the old behavior in send_and_confirm_messages_with_spinner, but we still need to slowly transition TPU users to try_send_transaction_detached for faster sending.

Would be interesting to compare the speed of both

For sure, let's get some tests going!

Comment on lines 72 to 74
P: ConnectionPool<NewConnectionConfig = C> + 'static,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C> + 'static,
C: Sync + Send + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't these 'static constraints be too restrictive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're required in order to detach the task unfortunately.

And looking at https://docs.rs/tokio/latest/tokio/task/fn.spawn.html, I'm not sure how to make the future T: Future + Send + 'static without the inner types being 'static -- do you have any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be solved by using Arc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is around the types, not the instances of the types. I did some digging around, the requirement is pretty hard for now unfortunately tokio-rs/tokio#2385. Here's the error:

error[E0310]: the parameter type `C` may not live long enough
   --> tpu-client/src/nonblocking/tpu_client.rs:356:9
    |
356 | / ...   task::spawn(async move {
357 | | ...       try_send_wire_transaction(wire_transaction, &leaders, &connection_cache).a...
358 | | ...   })
    | |________^ ...so that the type `C` will meet its required lifetime bounds
    |
help: consider adding an explicit lifetime bound...
    |
344 |     C: Send + Sync + 'static,

And it repeats for P and M.

Here's the simplest example that shows the issue:

use {
    futures::future::join_all,
    std::{fmt::Debug, time::Duration},
    tokio::task::JoinHandle,
};
trait Printable: Debug {}
impl Printable for String {}
#[derive(Debug)]
struct MyStruct<T: Printable> {
    _field: T,
}
async fn print_struct<T: Printable>(instance: MyStruct<T>) {
    tokio::time::sleep(Duration::from_millis(10)).await;
    println!("{instance:?}");
}
fn make_temp_task<T: Printable + Send + Sync + 'static>(instance: MyStruct<T>) -> JoinHandle<()
> {
    tokio::task::spawn(print_struct(instance))
}
#[tokio::main]
async fn main() {
    #[derive(Debug)]
    struct MyTempStruct;
    impl Printable for MyTempStruct {}
    let tasks = (0..10).map(|_| make_temp_task(MyStruct { _field: MyTempStruct })).collect::<Ve
c<_>>();
    join_all(tasks).await;
}

If make_temp_task does not have T: Send + Sync + 'static, this will not work because T is not ensured to live long enough.

As a better option, however, I added the required bounds only for the new functions, so this way you'll only need the types to be 'static if you're calling the detached version. And I was able to relax Send + Sync in a lot of places. Let me know what you think!

@codecov
Copy link

codecov bot commented Aug 15, 2023

Codecov Report

Merging #32844 (abb9c81) into master (b44c9bc) will decrease coverage by 0.1%.
Report is 7 commits behind head on master.
The diff coverage is 72.1%.

@@            Coverage Diff            @@
##           master   #32844     +/-   ##
=========================================
- Coverage    82.0%    82.0%   -0.1%     
=========================================
  Files         785      785             
  Lines      212096   212111     +15     
=========================================
+ Hits       173982   173987      +5     
- Misses      38114    38124     +10     

tpu-client/src/nonblocking/tpu_client.rs Outdated Show resolved Hide resolved
Comment on lines 72 to 74
P: ConnectionPool<NewConnectionConfig = C> + 'static,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C> + 'static,
C: Sync + Send + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be solved by using Arc?

Comment on lines +69 to +88
/// Create a new client that disconnects when dropped
pub fn new_with_connection_cache(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<BackendConnectionCache<P, M, C>>,
) -> Result<Self> {
Ok(Self {
tpu_client: BackendTpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
config,
connection_cache,
)?,
})
}

pub fn rpc_client(&self) -> &RpcClient {
self.tpu_client.rpc_client()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moved from the other impl block

@@ -6,7 +6,7 @@ use {
};

#[async_trait]
pub trait ClientConnection {
pub trait ClientConnection: Send + Sync {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the only other required bounds in the end! Other than in the clients, of course

tpu-client/src/nonblocking/tpu_client.rs Show resolved Hide resolved
Comment on lines +456 to +463
pub fn rpc_client(&self) -> &RpcClient {
&self.rpc_client
}

pub async fn shutdown(&mut self) {
self.exit.store(true, Ordering::Relaxed);
self.leader_tpu_service.join().await;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions were moved from the other impl block

@KirillLykov
Copy link
Contributor

I launched this branch vs testnet several times and got the following times [0m41s, 0m17s, 0m16s, Fail, 0m20s, Fail, 0m15s], see #32595 (comment) to compare with other runs.

Looks like it is close to what we had before in 1.14.
The couple of observations:

  • sometimes it fails with Error: Custom: Invalid blockhash
  • another time it works but takes longer time (40s) and prints warnings like the following
[2023-08-17T15:28:06.475255036Z WARN  solana_quic_client::nonblocking::quic_client] Failed to send data async to 69.197.49.7:9009, error: Custom("ConnectionError(TimedOut)")
  • another thing it looks like the progress in percent is not correctly rendered (not sure it worked before):
0.0% | Sending 648/649 transactions                         [2023-08-17T15:28:25.430615875Z INFO  solana_quic_client::nonblocking::quic_client] Made connection to 194.126.172.222:8009 id 94630395482992 try_count 0
⠤   0.0% | Sending 649/649 transactions                         [2023-08-17T15:28:25.509365719Z INFO  solana_quic_client::nonblocking::quic_client] Made connection to 81.16.237.150:8009 id 94630395247536 try_count 0
⠖   0.0% | Sending 649/649 transactions                         [2023-08-17T15:28:25.705801857Z INFO  solana_quic_client::nonblocking::quic_client] Made connection to 107.155.88.82:8009 id 94630395265376 try_count 0

@joncinque
Copy link
Contributor Author

joncinque commented Aug 17, 2023

sometimes it fails with Error: Custom: Invalid blockhash

This error's strange... did you see that with any other branch?

another time it works but takes longer time (40s) and prints warnings like the following

Ah yes, I bumped the number up to 1000 txs and now I'm starting to see it after many runs. Normally it's supposed to resend over RPC when the TPU fails, but this could be due to the error not getting propagated back up. I'll look into this, thanks for noticing.

another thing it looks like the progress in percent is not correctly rendered (not sure it worked before):

This is unfortunately correct 😓 the percent only shows the "confirmed" transactions. Since we send everything and then confirm everything all at once, if everything is confirmed on your first check, you'll never see it change.

@t-nelson
Copy link
Contributor

sometimes it fails with Error: Custom: Invalid blockhash

This error's strange... did you see that with any other branch?

this error message is probably misleading... the sync rpc client papers over any actual getFeeForMessage error with it.

another time it works but takes longer time (40s) and prints warnings like the following

Ah yes, I bumped the number up to 1000 txs and now I'm starting to see it after many runs. Normally it's supposed to resend over RPC when the TPU fails, but this could be due to the error not getting propagated back up. I'll look into this, thanks for noticing.

yeah this behavior is almost always improper error or blockhash handling. so long as this isn't worse than 1.14 it's an improvement. remember that we need to keep this solution backportable to 1.16, so avoid feature creep.

another thing it looks like the progress in percent is not correctly rendered (not sure it worked before):

This is unfortunately correct 😓 the percent only shows the "confirmed" transactions. Since we send everything and then confirm everything all at once, if everything is confirmed on your first check, you'll never see it change.

dropping the percentage and instead displaying something like "Sending transactions: {pending}/{confirmed}/{total} pending/confirmed/total" would probably be more intuitive

Copy link
Contributor

@lijunwangs lijunwangs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@KirillLykov
Copy link
Contributor

@joncinque haven't seen your comment

This error's strange... did you see that with any other branch?
Not really, the first time.

Do we plan to backport these changes to 1.16?

@joncinque
Copy link
Contributor Author

joncinque commented Aug 22, 2023

Do we plan to backport these changes to 1.16?

Yes, but I have a less invasive fix which also addresses your error of [2023-08-17T15:28:06.475255036Z WARN solana_quic_client::nonblocking::quic_client] Failed to send data async to 69.197.49.7:9009, error: Custom("ConnectionError(TimedOut)"), so I'll get a new PR today with that fix

Edit: ...and close this one

@joncinque
Copy link
Contributor Author

Closing in favor of #32945

@joncinque joncinque closed this Aug 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Solana program deploys on devnet with Solana 1.16 are slow
5 participants