RUST语言使用cargo创建工程有一个好的地方,库是lib.rs标记,主程序main.rs标记,所以相对别的语言明显好找启动点。波卡链的主程序在polkadot/polkadot/src/main.rs中,打开这个文件:
#![warn(missing_docs)] //忽略丢失文档
extern crate polkadot_cli as cli;
#[macro_use]
extern crate error_chain;
quick_main!(run);
fn run() -> cli::error::Result<()> {
cli::run(::std::env::args())
}
代码很简单,不过风格似乎在哪里见过?是不是和以太坊等的启动代码有些类似,只是这个更单薄一些。可能大家对RUST不太了解,所以在分析波卡链时同时对RUST一些知识进行说明。
quick_main!(run);这个是一个宏,能够产生main主函数,它在error_chain这个库中,所以上面要导入这个库。它除了能够产生主函数,还可以调用其下的函数并返回相关的错误结果Result<()>,这里当然就是run这个函数了。#[macro_use]是属性设置,加上!是代表包含的自身属性设置,不加代表下一个项目的属性设置。这里是说明可以把宏应用到当前的作用域中。
所以,可以进入run函数(cli/src/lib.rs):
pub fn run<I, T>(args: I) -> error::Result<()> where
I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone,
{
//这个是主角,一个异步IO的栈,网络库
let core = reactor::Core::new().expect("tokio::Core could not be created");
let yaml = load_yaml!("./cli.yml");//polkadot启动的配置参数在yml文件中,加载之
//RUST的SWITCH
let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) {
Ok(m) => m,
Err(ref e) if e.kind == clap::ErrorKind::VersionDisplayed => return Ok(()),
Err(ref e) if e.kind == clap::ErrorKind::HelpDisplayed || e.kind == clap::ErrorKind::VersionDisplayed => {
let _ = clap::App::from_yaml(yaml).print_long_help();//类似default
return Ok(());//如果是非链环境(帮助之类),则返回。
}
Err(e) => return Err(e.into()),
};
// TODO [ToDr] Split parameters parsing from actual execution.
let log_pattern = matches.value_of("log").unwrap_or("");
init_logger(log_pattern);
fdlimit::raise_fd_limit();
let mut config = service::Configuration::default();
let base_path = matches.value_of("base-path")
.map(|x| Path::new(x).to_owned())
.unwrap_or_else(default_base_path);
//keystroe的参数处理
config.keystore_path = matches.value_of("keystore")
.map(|x| Path::new(x).to_owned())
.unwrap_or_else(|| keystore_path(&base_path))
.to_string_lossy()
.into();
config.database_path = db_path(&base_path).to_string_lossy().into();
//角色处理,四种角色,其使用的是network中的Role:pub use network::Role;
let mut role = service::Role::FULL;
if matches.is_present("collator") {
info!("Starting collator.");
role = service::Role::COLLATOR;
} else if matches.is_present("validator") {
info!("Starting validator.");
role = service::Role::VALIDATOR;
} else if matches.is_present("light") {
info!("Starting light.");
role = service::Role::LIGHT;
}
match matches.value_of("chain") {
Some("dev") => config.chain_spec = ChainSpec::Development,
Some("local") => config.chain_spec = ChainSpec::LocalTestnet,
Some("poc-1") => config.chain_spec = ChainSpec::PoC1Testnet,
None => (),
Some(unknown) => panic!("Invalid chain name: {}", unknown),
}
info!("Chain specification: {}", match config.chain_spec {
ChainSpec::Development => "Development",
ChainSpec::LocalTestnet => "Local Testnet",
ChainSpec::PoC1Testnet => "PoC-1 Testnet",
});
config.roles = role;
{
//配置启动节点和相关网络参数
config.network.boot_nodes = matches
.values_of("bootnodes")
.map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect());
config.network.config_path = Some(network_path(&base_path).to_string_lossy().into());
config.network.net_config_path = config.network.config_path.clone();
let port = match matches.value_of("port") {
Some(port) => port.parse().expect("Invalid p2p port value specified."),
None => 30333,
};
config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
config.network.public_address = None;
config.network.client_version = format!("parity-polkadot/{}", crate_version!());
config.network.use_secret = match matches.value_of("node-key").map(|s| s.parse()) {
Some(Ok(secret)) => Some(secret),
Some(Err(err)) => return Err(format!("Error parsing node key: {}", err).into()),
None => None,
};
}
config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect();
//根据角色来创建全节点或者轻量级节点
match role == service::Role::LIGHT {
true => run_until_exit(core, service::new_light(config.clone())?, &matches, config),
false => run_until_exit(core, service::new_full(config.clone())?, &matches, config),
}
}
这个函数其实没啥,类似于所有的区块链启动,对参数进行解析,并启动相关的操作,一步步分析别急。其实最关键提最后两行,也就是注释中说明的轻节点和全节点。
先看一下这个函数:
//rust的模板模板编程,WHERE类似于C#的限定,最后一个参数是一个函数指针
fn run_until_exit<B, E>(mut core: reactor::Core, service: service::Service<B, E>, matches: &clap::ArgMatches, config: service::Configuration) -> error::Result<()>
where
B: client::backend::Backend + Send + Sync + 'static,//静态的生命周期
E: client::CallExecutor + Send + Sync + 'static,
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>
{
let exit = {
// can't use signal directly here because CtrlC takes only `Fn`.退出控制
let (exit_send, exit) = mpsc::channel(1);
//移动语义
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});
//推导返回值,可以使用return,但不推荐
exit
};
//启动钓鱼服务
informant::start(&service, core.handle());
//创建RPC和相关的处理句柄
let _rpc_servers = {
let http_address = parse_address("127.0.0.1:9933", "rpc-port", matches)?;//问号表示可以有0~1个匹配
let ws_address = parse_address("127.0.0.1:9944", "ws-port", matches)?;
//||是闭包,其后为闭包表达式
let handler = || {
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
let pool = RpcTransactionPool {
inner: service.transaction_pool(),
network: service.network(),
};
//创建rpc_handler,代码分析见后面的RPC
rpc::rpc_handler(service.client(), chain, pool, Configuration(config.clone()))
};
//处理元组,自动推导
(
//|address|是一个管道,用在闭包中。启动两个服务
start_server(http_address, |address| rpc::start_http(address, handler())),
start_server(ws_address, |address| rpc::start_ws(address, handler())),
)
};
//运行异步IO,启动网络通信
core.run(exit.into_future()).expect("Error running informant event loop");
Ok(())
}
再看一下启动的服务:
fn start_server<T, F>(mut address: SocketAddr, start: F) -> Result<T, io::Error> where
F: Fn(&SocketAddr) -> Result<T, io::Error>,
{
start(&address)
.or_else(|e| match e.kind() {
io::ErrorKind::AddrInUse |
io::ErrorKind::PermissionDenied => {
warn!("Unable to bind server to {}. Trying random port.", address);
address.set_port(0);
start(&address)
},
_ => Err(e),//一定要有默认分支
})
}
一个泛型函数,调用F函数指针,也就是前面的rpc::start_http和rpc::start_ws两个函数,编程语言越高级,越脱离人类的形式化感知,不知道是好事还是坏事。or_else是Result的一个结果分支语句。
基本上到这儿,相关的命令就全启动了。然后分析一下具体的细节。
下面分别来看钓鱼服务和RPC服务:
polkadot/cli/src/informant.rs
/// Spawn informant on the event loop
pub fn start<B, E>(service: &Service<B, E>, handle: reactor::Handle)
where
B: client::backend::Backend + Send + Sync + 'static,//where限定子句,+号就是多重限制(继承)
E: client::CallExecutor + Send + Sync + 'static,
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>
{
let interval = reactor::Interval::new_at(Instant::now(), Duration::from_millis(TIMER_INTERVAL_MS), &handle)
.expect("Error creating informant timer");
//调用client(&self)等得到Clone值
let network = service.network();
let client = service.client();
let display_notifications = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
let sync_status = network.status();
//if let语义
if let Ok(best_block) = client.best_block_header() {
let hash: HeaderHash = best_block.blake2_256().into();
let status = match (sync_status.sync.state, sync_status.sync.best_seen_block) {
(SyncState::Idle, _) => "Idle".into(),
(SyncState::Downloading, None) => "Syncing".into(),
(SyncState::Downloading, Some(n)) => format!("Syncing, target=#{}", n),
};
info!(target: "polkadot", "{} ({} peers), best: #{} ({})", status, sync_status.num_peers, best_block.number, hash)
} else {
warn!("Error getting best block information");
}
Ok(())
});
let client = service.client();//因为闭包,所以这里需要再次获得
let display_block_import = client.import_notification_stream().for_each(|n| {
info!(target: "polkadot", "Imported #{} ({})", n.header.number, n.hash);
Ok(())
});
//处理通知和数据块的导入
handle.spawn(display_notifications);
handle.spawn(display_block_import);
}
使用 handle.spawn 将函数 Future 绑定到 event loop 上面。只有如此在处理并发连接时不会阻塞 event loop。写过异步I/O 的都会明白 socket 是非阻塞的即不可能通过一次 read 或者 write 就将这个 socket 的数据全部处理完成。因此还会重新将 socket 给注册到 event loop,这样当有新的事件的时候,event loop 会重新调用对应的回调函数。 tokio_core提供了这么一个异步IO的机制,确实比较好用。
在上面的创建服务时,没有分析创建两类节点:
polkadot/service/src/lib.rs
/// Creates light client and register protocol with the network service轻节点
pub fn new_light(config: Configuration) -> Result<Service<client::light::Backend, client::RemoteCallExecutor<client::light::Backend, network::OnDemand<network::Service>>>, error::Error> {
//下划杠代表忽略绑定,类似于GO,MOVE语义加上管道形成一个闭包
Service::new(move |_, executor, genesis_builder: GenesisBuilder| {
let client_backend = client::light::new_light_backend();
let fetch_checker = Arc::new(client::light::new_fetch_checker(client_backend.clone(), executor));
let fetcher = Arc::new(network::OnDemand::new(fetch_checker));
let client = client::light::new_light(client_backend, fetcher.clone(), genesis_builder)?;
Ok((Arc::new(client), Some(fetcher)))
},
|client| Arc::new(polkadot_api::light::RemotePolkadotApiWrapper(client.clone())),
|_client, _network, _tx_pool, _keystore| Ok(None),
config)
}
/// Creates full client and register protocol with the network service全节点多了数据库的相关数据操作
pub fn new_full(config: Configuration) -> Result<Service<client_db::Backend, client::LocalCallExecutor<client_db::Backend, CodeExecutor>>, error::Error> {
let is_validator = (config.roles & Role::VALIDATOR) == Role::VALIDATOR;
Service::new(|db_settings, executor, genesis_builder: GenesisBuilder|
Ok((Arc::new(client_db::new_client(db_settings, executor, genesis_builder)?), None)),
|client| client,
|client, network, tx_pool, keystore| {
if !is_validator {
return Ok(None);
}
// Load the first available key. Code above makes sure it exisis.
let key = keystore.load(&keystore.contents()?[0], "")?;
info!("Using authority key {:?}", key.public());
//见下面推荐牟NEW
Ok(Some(consensus::Service::new(
client.clone(),
client.clone(),
network.clone(),
tx_pool.clone(),
::std::time::Duration::from_millis(4000), // TODO: dynamic
key,
)))
},
config)
}
/// Creates and register protocol with the network service这里很重要,是真正创建网络服务的地方
fn new<F, G, C, A>(client_creator: F, api_creator: G, consensus_creator: C, mut config: Configuration) -> Result<Self, error::Error>
where //RUST的WHERE这么做确实有点丑陋啊
F: FnOnce(
client_db::DatabaseSettings,
CodeExecutor,
GenesisBuilder,
) -> Result<(Arc<Client<B, E>>, Option<Arc<network::OnDemand<network::Service>>>), error::Error>,
G: Fn(
Arc<Client<B, E>>,
) -> Arc<A>,
C: Fn(
Arc<Client<B, E>>,
Arc<network::Service>,
Arc<Mutex<TransactionPool>>,
&Keystore
) -> Result<Option<consensus::Service>, error::Error>,
A: PolkadotApi + Send + Sync + 'static,
{
use std::sync::Barrier;//同步的内存屏障
let (signal, exit) = ::exit_future::signal();//事件元组
// Create client
let executor = polkadot_executor::Executor::new();
let mut keystore = Keystore::open(config.keystore_path.into())?;
for seed in &config.keys {
keystore.generate_from_seed(seed)?;//产生随机种子
}
//由上边的种子生产密钥对
if keystore.contents()?.is_empty() {
let key = keystore.generate("")?;
info!("Generated a new keypair: {:?}", key.public());
}
let ChainConfig { genesis_config, boot_nodes } = match config.chain_spec {
ChainSpec::Development => development_config(),
ChainSpec::LocalTestnet => local_testnet_config(),
ChainSpec::PoC1Testnet => poc_1_testnet_config(),
};
config.network.boot_nodes.extend(boot_nodes);
let genesis_builder = GenesisBuilder {
config: genesis_config,
};
let db_settings = client_db::DatabaseSettings {
cache_size: None,
path: config.database_path.into(),
};
//创建客户端
let (client, on_demand) = client_creator(db_settings, executor, genesis_builder)?;
let api = api_creator(client.clone());
let best_header = client.best_block_header()?;
info!("Starting Polkadot. Best block is #{}", best_header.number);
//创建交易池及其适配器
let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool)));
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
pool: transaction_pool.clone(),
client: client.clone(),
api: api.clone(),
});
let network_params = network::Params {
config: network::ProtocolConfig {
roles: config.roles,
},
network_config: config.network,
chain: client.clone(),
on_demand: on_demand.clone().map(|d| d as Arc<network::OnDemandService>),
transaction_pool: transaction_pool_adapter,
};
//创建网络通信服务
let network = network::Service::new(network_params)?;
let barrier = ::std::sync::Arc::new(Barrier::new(2));
//引入多线程原子变量控制
on_demand.map(|on_demand| on_demand.set_service_link(Arc::downgrade(&network)));
let thread = {
let client = client.clone();
let network = network.clone();
let txpool = transaction_pool.clone();
let thread_barrier = barrier.clone();
启动多线程并挂载启动网络等事件函数
thread::spawn(move || {
network.start_network();
thread_barrier.wait();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = client.import_notification_stream().for_each(move |notification| {
network.on_block_imported(notification.hash, ¬ification.header);//导入区块
prune_imported(&*client, &*txpool, notification.hash);//处理导入的区块
Ok(())
});
core.handle().spawn(events);//绑定事件
if let Err(e) = core.run(exit) {
debug!("Polkadot service event loop shutdown with {:?}", e);
}
debug!("Polkadot service shutdown");
})
};
// wait for the network to start up before starting the consensus
// service.
barrier.wait();
// Spin consensus service if configured创建共识创建者
let consensus_service = consensus_creator(client.clone(), network.clone(), transaction_pool.clone(), &keystore)?;
Ok(Service {
thread: Some(thread),
client: client,
network: network,
transaction_pool: transaction_pool,
signal: Some(signal),
_consensus: consensus_service,
})
}
这个还是有一些类似于以太坊,创建节点时把相关的网络服务都创建好。这样最后Ok返回让RUST自动推导结果。
把前面的服务分析明白后,就开始调用真正的RPC中的服务的分析:
substate/rpc-servers/src/lib.rs
/// Construct rpc `IoHandler`
pub fn rpc_handler<S, C, A, Y>(
state: S,
chain: C,
author: A,
system: Y,
) -> RpcHandler where
S: apis::state::StateApi,
C: apis::chain::ChainApi<Metadata=Metadata>,
A: apis::author::AuthorApi,
Y: apis::system::SystemApi,
{
let mut io = pubsub::PubSubHandler::default();
io.extend_with(state.to_delegate());
io.extend_with(chain.to_delegate());
io.extend_with(author.to_delegate());
io.extend_with(system.to_delegate());
io
}
/// Start HTTP server listening on given address.
pub fn start_http(
addr: &std::net::SocketAddr,
io: RpcHandler,
) -> io::Result<http::Server> {
http::ServerBuilder::new(io)
.threads(4)
.rest_api(http::RestApi::Unsecure)
.cors(http::DomainsValidation::Disabled)
.start_http(addr)
}
/// Start WS server listening on given address.
pub fn start_ws(
addr: &std::net::SocketAddr,
io: RpcHandler,
) -> io::Result<ws::Server> {
ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| Metadata::new(context.sender()))
.start(addr)
.map_err(|err| match err {
ws::Error(ws::ErrorKind::Io(io), _) => io,
ws::Error(ws::ErrorKind::ConnectionClosed, _) => io::ErrorKind::BrokenPipe.into(),
ws::Error(e, _) => {
error!("{}", e);
io::ErrorKind::Other.into()
}
})
}
这两个函数里都使用了JSON的库,直接就构建了相关的服务,这两个库的资料目前找不到,所以没办法深入的分析说明。这里分析完成了相关的服务启动,下一次重点分析网络相关具体内容。