Skip to content

Commit

Permalink
dummy and event
Browse files Browse the repository at this point in the history
  • Loading branch information
shishanyue committed Apr 13, 2024
1 parent 02ce4da commit ffbeb71
Show file tree
Hide file tree
Showing 17 changed files with 458 additions and 47 deletions.
32 changes: 27 additions & 5 deletions src/command_center.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,33 @@
use std::{sync::Arc, time::Duration};

use crate::{connection_manager::{By, ConnectionManager}, packet::super_packet::SuperPacket};
use log::info;

use crate::{
connection_manager::{By, ConnectionManager},
event::{self, EVENT_CHANNEL_MULTIPLE},
module::{ModuleType, MODULE_MANAGER},
packet::super_packet::SuperPacket,
};

pub async fn command_center(shared_connection_mg:Arc<ConnectionManager>){
let std_in = std::io::stdin();
let mut admin_command = String::new();
pub async fn command_center(shared_connection_mg: Arc<ConnectionManager>) {
let mut event_receiver = EVENT_CHANNEL_MULTIPLE.1.resubscribe();

MODULE_MANAGER
.write()
.unwrap()
.init_module(ModuleType::RwEngine);

//let std_in = std::io::stdin();
//let mut admin_command = String::new();
loop {

Check warning on line 22 in src/command_center.rs

View workflow job for this annotation

GitHub Actions / check format

Diff in /home/runner/work/RW-RJR-Server/RW-RJR-Server/src/command_center.rs
match event_receiver.recv().await {
Ok(event) => {
info!("{:?}",event.event_name)
},
Err(_) => todo!(),
};

/*
std_in.read_line(&mut admin_command).unwrap();
let admin_command = admin_command.trim().to_string();
Expand All @@ -25,6 +46,7 @@ pub async fn command_center(shared_connection_mg:Arc<ConnectionManager>){
.send_packet_to_player_by(By::Addr(admin_command.clone()), packet)
.await;
}
*/
}
//std::thread::sleep(Duration::from_millis(u64::MAX));
}
}
53 changes: 30 additions & 23 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use uuid::Uuid;
use crate::{
connection_manager::By,
core::ServerCommand,
event::{Event, EventType, EVENT_CHANNEL},
packet::{Packet, PacketReadWriteExt, PacketType},
relay_manager::{relay::SharedRelayRoom, SharedRelayManager},
worker_pool::{processor::ProcesseorData, receiver::ReceiverData, sender::SenderData},

Check failure on line 28 in src/connection.rs

View workflow job for this annotation

GitHub Actions / lint

unused imports: `receiver::ReceiverData`, `sender::SenderData`

Check failure on line 28 in src/connection.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused imports: `receiver::ReceiverData`, `sender::SenderData`

Check failure on line 28 in src/connection.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused imports: `receiver::ReceiverData`, `sender::SenderData`

Check failure on line 28 in src/connection.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused imports: `receiver::ReceiverData`, `sender::SenderData`
Expand Down Expand Up @@ -76,8 +77,6 @@ pub enum ConnectionAPI {
AddRelayConnect,
}

pub type WorkersSender = (mpsc::Sender<ReceiverData>, mpsc::Sender<SenderData>);

#[derive(Debug)]
pub struct Connection {
pub shared_con: Option<Arc<SharedConnection>>,
Expand Down Expand Up @@ -150,10 +149,10 @@ impl Connection {

pub async fn send_relay_server_info(&self) {
let mut packet = Packet::new(PacketType::RELAY_VERSION_INFO).await;
packet.packet_buffer.write_u8(0).await.unwrap();
packet.packet_buffer.write_u32(151).await.unwrap();
packet.packet_buffer.write_u32(1).await.unwrap();
packet.packet_buffer.write_u8(0).await.unwrap();
packet.write_u8(0).await.unwrap();
packet.write_u32(151).await.unwrap();
packet.write_u32(1).await.unwrap();
packet.write_u8(0).await.unwrap();
self.shared_con.as_ref().unwrap().send_packet(packet).await;
}

Expand All @@ -170,7 +169,7 @@ impl Connection {
let mut send_packet =
Packet::new(PacketType::try_from(packet_type).unwrap_or_default()).await;

send_packet.packet_buffer.write_all(&bytes).await.unwrap();
send_packet.write_all(&bytes).await.unwrap();

if packet_type == PacketType::KICK as u32 {
// TODO
Expand Down Expand Up @@ -238,8 +237,8 @@ impl Connection {
)
.await
.unwrap();
packet.packet_buffer.write_u8(1).await.unwrap();
packet.packet_buffer.write_u8(60).await.unwrap();
packet.write_u8(1).await.unwrap();
packet.write_u8(60).await.unwrap();

self.shared_con.as_ref().unwrap().send_packet(packet).await;
}
Expand Down Expand Up @@ -289,8 +288,8 @@ impl Connection {

pub async fn send_relay_hall_message(&self, msg: &str) {
let mut packet = Packet::new(PacketType::RELAY_117).await;
packet.packet_buffer.write_u8(1).await.unwrap();
packet.packet_buffer.write_u32(5).await.unwrap();
packet.write_u8(1).await.unwrap();
packet.write_u32(5).await.unwrap();

packet.write_string(msg).await.unwrap();

Expand All @@ -302,12 +301,12 @@ impl Connection {

packet.write_string(msg).await.unwrap();

packet.packet_buffer.write_u8(3).await.unwrap();
packet.write_u8(3).await.unwrap();

packet.write_is_string(sender).await.unwrap();

packet.packet_buffer.write_u32(team).await.unwrap();
packet.packet_buffer.write_u32(team).await.unwrap();
packet.write_u32(team).await.unwrap();
packet.write_u32(team).await.unwrap();

self.shared_con.as_ref().unwrap().send_packet(packet).await;
}
Expand All @@ -329,7 +328,7 @@ impl Connection {
.load(Ordering::Relaxed)
>= NEW_RELAY_PROTOCOL_VERSION
{
packet.packet_buffer.write_u8(1).await.unwrap();
packet.write_u8(1).await.unwrap();
packet
.packet_buffer
.write_u32(*self.room_index.as_ref().unwrap())
Expand All @@ -341,7 +340,7 @@ impl Connection {
.await
.expect("write packet error");

packet.packet_buffer.write_u8(0).await.unwrap();
packet.write_u8(0).await.unwrap();

packet
.write_is_string(&self.addr.ip().to_string())
Expand Down Expand Up @@ -450,11 +449,11 @@ impl Connection {
let public = false;

if shared_relay_room.shared_data.custom.version >= NEW_RELAY_PROTOCOL_VERSION {
packet.packet_buffer.write_u8(2).await.unwrap();
packet.write_u8(2).await.unwrap();

packet.packet_buffer.write_u8(1).await.unwrap();
packet.packet_buffer.write_u8(1).await.unwrap();
packet.packet_buffer.write_u8(1).await.unwrap();
packet.write_u8(1).await.unwrap();
packet.write_u8(1).await.unwrap();
packet.write_u8(1).await.unwrap();

packet
.write_string("RJR Team")
Expand All @@ -467,8 +466,8 @@ impl Connection {
.await
.unwrap();

packet.packet_buffer.write_u8(public as u8).await.unwrap();
packet.packet_buffer.write_u8(1).await.unwrap();
packet.write_u8(public as u8).await.unwrap();
packet.write_u8(1).await.unwrap();

packet
.write_string(&format!(
Expand All @@ -478,7 +477,7 @@ impl Connection {
.await
.expect("write packet error");

packet.packet_buffer.write_u8(public as u8).await.unwrap();
packet.write_u8(public as u8).await.unwrap();

packet
.write_is_string(&Uuid::new_v4().to_string())
Expand All @@ -489,6 +488,14 @@ impl Connection {
}

self.shared_con.as_ref().unwrap().send_packet(packet).await;

EVENT_CHANNEL
.0
.send(Event::new(
"abab",

Check warning on line 495 in src/connection.rs

View workflow job for this annotation

GitHub Actions / check format

Diff in /home/runner/work/RW-RJR-Server/RW-RJR-Server/src/connection.rs
EventType::NewRoomAndHostOk(self.shared_relay_room.as_ref().unwrap().clone()),
))
.await.expect("send event error");
}

pub async fn disconnect(&mut self) {
Expand Down
16 changes: 14 additions & 2 deletions src/connection_manager/connection_lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,22 @@ impl ConnectionLib {
pub async fn send_packet_to_player_by(&self, by: By, packet: Packet) {

Check warning on line 35 in src/connection_manager/connection_lib.rs

View workflow job for this annotation

GitHub Actions / check format

Diff in /home/runner/work/RW-RJR-Server/RW-RJR-Server/src/connection_manager/connection_lib.rs
match by {
By::Addr(addr) => {
dbg!(self.addr_map.contains_key(&addr));
if let Some(con) = self.addr_map.get(&addr) {
println!("len:{}",self.addr_map.len());
for (con_addr,con) in self.addr_map.iter() {
println!("con_addr:{},addr:{}",con_addr,addr);
println!("con_addr_len:{},addr_len:{}",con_addr.len(),addr.len());

dbg!(*con_addr == addr);
if *con_addr == addr{
con.send_packet(packet.clone()).await;
}
}
/*
if let Some(con) = self.addr_map.get(&addr) {
con.send_packet(packet.clone()).await;

Check warning on line 50 in src/connection_manager/connection_lib.rs

View workflow job for this annotation

GitHub Actions / check format

Diff in /home/runner/work/RW-RJR-Server/RW-RJR-Server/src/connection_manager/connection_lib.rs
}
*/

}
By::Name(_) => todo!(),
}
Expand Down
118 changes: 118 additions & 0 deletions src/dummy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::{collections::HashMap, sync::Arc};

use tokio::{
net::{tcp::OwnedWriteHalf, TcpStream},
runtime::{Builder, Runtime},
sync::{mpsc, RwLock},
task::JoinHandle,
};

use crate::{error::BasicDummyError, packet::Packet, worker_pool::receiver::receiver_fn};

lazy_static! {
static ref DUMMY_RUNTIME:Runtime = Builder::new_multi_thread()
.enable_time()
.worker_threads(10)
// no timer!
.build()
.expect("creat block runtime error");

}

pub enum BasicDummyAPI {
WriteHalf(Box<dyn FnOnce(&mut OwnedWriteHalf) + 'static + Sync + Send>),
}

pub enum DummyVersion {
Version1_15,
Version1_16,
}

pub enum TriggerFnWhenPacket {
All,
On(Packet),
}

pub struct BasicDummy {
pub dummy_name: String,
pub domain: String,
pub packet_version: u32,
pub client_version: u32,
pub net_handle: Option<JoinHandle<()>>,
basic_dummy_api_tx: async_channel::Sender<BasicDummyAPI>,
basic_dummy_api_rx: async_channel::Receiver<BasicDummyAPI>,
when_receiver_fn: HashMap<TriggerFnWhenPacket, Vec<Box<dyn FnOnce() + Send + Sync + 'static>>>,
}

impl BasicDummy {
pub fn new(dummy_name: &str, domain: &str, version: DummyVersion) -> Arc<RwLock<Self>> {
let (basic_dummy_api_tx, basic_dummy_api_rx) = async_channel::bounded(10);
match version {
DummyVersion::Version1_15 => Arc::new(RwLock::new(Self {
dummy_name: dummy_name.to_string(),
domain: domain.to_string(),
packet_version: 4,
client_version: 163,
net_handle: None,
when_receiver_fn: HashMap::new(),
basic_dummy_api_tx,
basic_dummy_api_rx,
})),
DummyVersion::Version1_16 => todo!(),
}
}

pub async fn write_half_fn(
&self,
clouse: Box<dyn FnOnce(&mut OwnedWriteHalf) + 'static + Sync + Send>,
) -> Result<(), async_channel::SendError<BasicDummyAPI>> {
self.basic_dummy_api_tx
.send(BasicDummyAPI::WriteHalf(clouse))
.await?;
Ok(())
}
pub async fn connect_to_relay(
dummy: Arc<RwLock<Self>>,
addr: &str,
) -> Result<(), BasicDummyError> {
match TcpStream::connect(&addr).await {
Ok(stream) => {
let api_rx = dummy.read().await.basic_dummy_api_rx.clone();
dummy.write().await.net_handle =
Some(DUMMY_RUNTIME.spawn(basic_dummy_fn(stream, dummy.clone(), api_rx)));
Ok(())
}
Err(_) => Err(BasicDummyError::JoinRelayError("connect error")),
}
}
}

async fn basic_dummy_fn(
stream: TcpStream,
dummy: Arc<RwLock<BasicDummy>>,
api_rx: async_channel::Receiver<BasicDummyAPI>,
) {
let (mut read_half, mut write_half) = stream.into_split();

loop {
tokio::select! {
recv = receiver_fn(&mut read_half) => {
match recv {
Ok(packet) => {
for (fn_type ,function) in dummy.write().await.when_receiver_fn.iter(){

}
continue;
}
Err(_) => {break;}
};
}

Ok(api) = api_rx.recv() => {
match api {
BasicDummyAPI::WriteHalf(f) => f(&mut write_half),
}
}
}
}
}
5 changes: 5 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ pub enum ReceiverError {
#[error("`{0}`")]
InvalidInput(String)
}
#[derive(Error, Debug)]
pub enum BasicDummyError {
#[error("join to relay error mag: {0}")]
JoinRelayError(&'static str)
}
43 changes: 43 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::sync::Arc;

use log::info;
use tokio::sync::{broadcast, mpsc};

use crate::relay_manager::relay::SharedRelayRoom;

#[derive(Clone)]
pub struct Event {
pub event_name: String,
pub event_type: EventType,
}
impl Event {
pub fn new(event_name: &str, event_type: EventType) -> Self {
Self {
event_name:event_name.to_string(),
event_type,
}
}
}
#[derive(Clone)]
pub enum EventType {
NewRoomAndHostOk(Arc<SharedRelayRoom>),
}

lazy_static! {
pub static ref EVENT_CHANNEL: (async_channel::Sender<Event>, async_channel::Receiver<Event>) =
async_channel::unbounded();
pub static ref EVENT_CHANNEL_MULTIPLE: (broadcast::Sender<Event>, broadcast::Receiver<Event>) =
broadcast::channel(100);
}

pub fn init_event_system() -> anyhow::Result<()> {
info!("事件系统正在初始化");

tokio::spawn(async move {
match EVENT_CHANNEL.1.recv().await {
Ok(event) => EVENT_CHANNEL_MULTIPLE.0.send(event),
Err(_) => todo!(),
}
});
Ok(())
}
Loading

0 comments on commit ffbeb71

Please sign in to comment.