Skip to content

Commit

Permalink
Merge branch 'zeyla/gateway-refactor' into 7596ff/ci/nightly-fmt-for-…
Browse files Browse the repository at this point in the history
…examples
  • Loading branch information
7596ff committed Jul 26, 2022
2 parents 9e11f65 + 51b939b commit 6b6f3e5
Show file tree
Hide file tree
Showing 88 changed files with 4,886 additions and 6,471 deletions.
45 changes: 24 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,24 @@ bot's token. You must also depend on `futures`, `tokio`,

```rust,no_run
use std::{env, error::Error, sync::Arc};
use futures::stream::StreamExt;
use twilight_cache_inmemory::{InMemoryCache, ResourceType};
use twilight_gateway::{Cluster, Event};
use twilight_gateway::{Event, Shard, ShardId};
use twilight_http::Client as HttpClient;
use twilight_model::gateway::Intents;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize the tracing subscriber.
tracing_subscriber::fmt::init();
let token = env::var("DISCORD_TOKEN")?;
// Use intents to only receive guild message events.
// A cluster is a manager for multiple shards that by default
// creates as many shards as Discord recommends.
let (cluster, mut events) = Cluster::new(token.to_owned(), Intents::GUILD_MESSAGES).await?;
let cluster = Arc::new(cluster);
// Start up the cluster.
let cluster_spawn = Arc::clone(&cluster);
// Start all shards in the cluster in the background.
tokio::spawn(async move {
cluster_spawn.up().await;
});
let mut shard = Shard::new(
ShardId::ONE,
token.clone(),
Intents::GUILD_MESSAGES,
).await?;
// HTTP is separate from the gateway, so create a new client.
let http = Arc::new(HttpClient::new(token));
Expand All @@ -155,18 +149,30 @@ async fn main() -> anyhow::Result<()> {
.build();
// Process each event as they come in.
while let Some((shard_id, event)) = events.next().await {
loop {
let event = match shard.next_event().await {
Ok(event) => event,
Err(source) => {
tracing::warn!(?source, "error receiving event");
if source.is_fatal() {
break;
}
continue;
}
};
// Update the cache with the event.
cache.update(&event);
tokio::spawn(handle_event(shard_id, event, Arc::clone(&http)));
tokio::spawn(handle_event(event, Arc::clone(&http)));
}
Ok(())
}
async fn handle_event(
shard_id: u64,
event: Event,
http: Arc<HttpClient>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
Expand All @@ -177,9 +183,6 @@ async fn handle_event(
.exec()
.await?;
}
Event::ShardConnected(_) => {
println!("Connected on shard {shard_id}");
}
// Other events here...
_ => {}
}
Expand Down
52 changes: 18 additions & 34 deletions book/src/chapter_1_crates/section_3_gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,6 @@ parses and processes them, and then gives them to you. It will automatically
reconnect, resume, and identify, as well as do some additional connectivity
checks.

Also provided is the `Cluster`, which will automatically manage a collection of
shards and unify their messages into one stream. It doesn't have a large API, you
usually want to spawn a task to bring it up such that you can begin to receive
tasks as soon as they arrive.

```rust,no_run
# use std::sync::Arc;
# use futures::StreamExt;
# use twilight_gateway::{Cluster, Intents};
#
# #[tokio::main]
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
# let token = String::from("dummy");
let intents = Intents::GUILD_MESSAGES | Intents::GUILDS;
let (cluster, mut events) = Cluster::new(token, intents).await?;
let cluster = Arc::new(cluster);
let cluster_spawn = cluster.clone();
tokio::spawn(async move {
cluster_spawn.up().await;
});
# let _ = events.next().await;
# Ok(())
# }
```

## Features


Expand Down Expand Up @@ -108,23 +81,34 @@ the dependency tree it will make use of that instead of [zlib-ng].
Starting a `Shard` and printing the contents of new messages as they come in:

```rust,no_run
use futures::StreamExt;
use std::{env, error::Error};
use twilight_gateway::{Intents, Shard};
use twilight_gateway::{Intents, Shard, ShardId};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
// Initialize the tracing subscriber.
tracing_subscriber::fmt::init();
let token = env::var("DISCORD_TOKEN")?;
let (shard, mut events) = Shard::new(token, Intents::GUILD_MESSAGES);
let intents = Intents::GUILD_MESSAGES;
let mut shard = Shard::new(ShardId::ONE, token, intents).await?;
tracing::info!("created shard");
loop {
let event = match shard.next_event().await {
Ok(event) => event,
Err(source) => {
tracing::warn!(?source, "error receiving event");
if source.is_fatal() {
break;
}
shard.start().await?;
println!("Created shard");
continue;
}
};
while let Some(event) = events.next().await {
println!("Event: {:?}", event);
tracing::debug!(?event, "event");
}
Ok(())
Expand Down
21 changes: 16 additions & 5 deletions book/src/chapter_1_crates/section_4_cache_inmemory.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,30 @@ Process new messages that come over a shard into the cache:
```rust,no_run
# #[tokio::main]
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
use futures::StreamExt;
use std::env;
use twilight_cache_inmemory::InMemoryCache;
use twilight_gateway::{Intents, Shard};
use twilight_gateway::{Intents, Shard, ShardId};
let token = env::var("DISCORD_TOKEN")?;
let (shard, mut events) = Shard::new(token, Intents::GUILD_MESSAGES);
shard.start().await?;
let mut shard = Shard::new(ShardId::ONE, token, Intents::GUILD_MESSAGES).await?;
let cache = InMemoryCache::new();
while let Some(event) = events.next().await {
loop {
let event = match shard.next_event().await {
Ok(event) => event,
Err(source) => {
tracing::warn!(?source, "error receiving event");
if source.is_fatal() {
break;
}
continue;
}
};
cache.update(&event);
}
# Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ Create a [client], add a [node], and give events to the client to [process]
events:

```rust,no_run
use futures::StreamExt;
use std::{
env,
error::Error,
net::SocketAddr,
str::FromStr,
};
use twilight_gateway::{Intents, Shard};
use twilight_gateway::{Intents, Shard, ShardId};
use twilight_http::Client as HttpClient;
use twilight_lavalink::Lavalink;
Expand All @@ -67,11 +66,22 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
lavalink.add(lavalink_host, lavalink_auth).await?;
let intents = Intents::GUILD_MESSAGES | Intents::GUILD_VOICE_STATES;
let (shard, mut events) = Shard::new(token, intents);
let mut shard = Shard::new(ShardId::ONE, token, intents).await?;
shard.start().await?;
loop {
let event = match shard.next_event().await {
Ok(event) => event,
Err(source) => {
tracing::warn!(?source, "error receiving event");
if source.is_fatal() {
break;
}
continue;
}
};
while let Some(event) = events.next().await {
lavalink.process(&event).await?;
}
Expand Down
26 changes: 13 additions & 13 deletions book/src/chapter_2_multi-serviced_approach.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@ you have a small bot and just want to get it going in a monolithic application,
then it's also a good choice. It's easy to split off parts of your application
into other services as your application grows.

## Gateway clusters
## Gateway groups

One of the popular design choices when creating a multi-serviced application is
to have a service that simply connects shards to the gateway and sends the
events to a broker to be processed. As bots grow into hundreds or thousands of
shards, multiple instances of the application can be created and clusters -
groups of shards - can be managed by each. Twilight is a good choice for this
use case: you can receive either events that come in in a loop and send the
payloads to the appropriate broker stream, or you can loop over received
payloads' bytes to send off.
to have a service that only connects shards to the gateway and sends the events
to a broker to be processed. As bots grow into hundreds or thousands of shards,
multiple instances of the application can be created and groups of shards can be
managed by each. Twilight is an excellent choice for this use case: you can
receive either events that come in in a loop and send the payloads to the
appropriate broker stream, or you can loop over received payloads' bytes to send
off.

## Gateway session ratelimiting

If you have multiple clusters, then you need to queue and ratelimit your
initialized sessions. The Gateway includes a Queue trait which you can
implement, and the gateway will submit a request to the queue before starting a
If multiple shard groups are used, then they need to be queued and their session
initialization ratelimited. The Gateway includes a Queue trait which can be
implemented; the gateway will submit a request to the queue before starting a
session. Twilight comes with a queue that supports sharding and Large Bot
sharding, but when you start to have multiple clusters you'll want to implement
your own. Our [gateway-queue] is an example of this.
sharding, but when multiple shard groups are in use then a custom queue will
need to be implemented. Refer to [gateway-queue] for an example of this.

## HTTP proxy ratelimiting

Expand Down
48 changes: 20 additions & 28 deletions book/src/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,40 +46,21 @@ Below is a quick example of a program printing "Pong!" when a ping command comes
in from a channel:

```rust,no_run
use futures::stream::StreamExt;
use std::{env, error::Error, sync::Arc};
use twilight_cache_inmemory::{InMemoryCache, ResourceType};
use twilight_gateway::{cluster::{Cluster, ShardScheme}, Event, Intents};
use twilight_gateway::{Event, Intents, Shard, ShardId};
use twilight_http::Client as HttpClient;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let token = env::var("DISCORD_TOKEN")?;
// Start a single shard.
let scheme = ShardScheme::Range {
from: 0,
to: 0,
total: 1,
};
// Specify intents requesting events about things like new and updated
// messages in a guild and direct messages.
let intents = Intents::GUILD_MESSAGES | Intents::DIRECT_MESSAGES;
let (cluster, mut events) = Cluster::builder(token.clone(), intents)
.shard_scheme(scheme)
.build()
.await?;
let cluster = Arc::new(cluster);
// Start up the cluster
let cluster_spawn = cluster.clone();
tokio::spawn(async move {
cluster_spawn.up().await;
});
// Create a single shard.
let mut shard = Shard::new(ShardId::ONE, token.clone(), intents).await?;
// The http client is separate from the gateway, so startup a new
// one, also use Arc such that it can be cloned to other threads.
Expand All @@ -90,30 +71,41 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.resource_types(ResourceType::MESSAGE)
.build();
// Startup an event loop to process each event in the event stream as they
// Startup the event loop to process each event in the event stream as they
// come in.
while let Some((shard_id, event)) = events.next().await {
loop {
let event = match shard.next_event().await {
Ok(event) => event,
Err(source) => {
tracing::warn!(?source, "error receiving event");
if source.is_fatal() {
break;
}
continue;
}
};
// Update the cache.
cache.update(&event);
// Spawn a new task to handle the event
tokio::spawn(handle_event(shard_id, event, Arc::clone(&http)));
tokio::spawn(handle_event(event, Arc::clone(&http)));
}
Ok(())
}
async fn handle_event(
shard_id: u64,
event: Event,
http: Arc<HttpClient>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
match event {
Event::MessageCreate(msg) if msg.content == "!ping" => {
http.create_message(msg.channel_id).content("Pong!")?.exec().await?;
}
Event::ShardConnected(_) => {
println!("Connected on shard {}", shard_id);
Event::Ready(_) => {
println!("Shard is ready");
}
_ => {}
}
Expand Down
1 change: 1 addition & 0 deletions book/tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ twilight-mention = { path = "../../twilight-mention" }
twilight-standby = { path = "../../twilight-standby" }
twilight-util = { features = ["full"], path = "../../twilight-util" }
tokio = { default-features = false, features = ["full"], version = "1.0" }
tracing = "0.1.35"
tracing-subscriber = "0.3"

[build-dependencies]
Expand Down
4 changes: 0 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ twilight-lavalink = { path = "../twilight-lavalink" }
twilight-model = { path = "../twilight-model" }
twilight-standby = { path = "../twilight-standby" }

[[example]]
name = "gateway-cluster"
path = "gateway-cluster.rs"

[[example]]
name = "gateway-intents"
path = "gateway-intents.rs"
Expand Down
Loading

0 comments on commit 6b6f3e5

Please sign in to comment.