异步Rust:构建实时消息代理服务器
在本文中,我们将深入研究使用Rust构建实时消息代理服务器,展示其强大的并发特性。我们将使用Warp作为web服务器,并使用Tokio来管理异步任务。此外,我们将创建一个WebSocket客户端来测试代理服务器的功能。
设计图如下:
图片
构建消息代理服务器
消息代理服务器允许客户端为主题生成事件并订阅它们。它使用Warp作为HTTP和WebSocket服务器,使用Tokio作为异步运行时。
使用以下命令创建一个Rust项目:
cargo new real-ime-message
在Cargo.toml文件中加入以下依赖项:
[dependencies] futures-util = "0.3.30" tokio = {version = "1.35.1", features = ["full"]} tokio-tungstenite = "0.21.0" url = "2.5.0" warp = "0.3.6"
在src/main.rs文件中定义一个Broker结构体:
use std::{ collections::{HashMap, VecDeque}, sync::Arc, }; use futures_util::{SinkExt, StreamExt}; use tokio::sync::{ mpsc::{self, UnboundedSender}, RwLock, }; use warp::{filters::ws::Message, Filter}; type Topic = String; type Event = String; type WsSender = UnboundedSender; struct Broker { events: Arc, subscribers: Arc, }
- events:存储每个主题的事件。
- subscribers:跟踪每个主题的订阅者。
创建一个新的Broker实例:
impl Broker { fn new() -> Self { Broker { events: Arc::new(RwLock::new(HashMap::new())), subscribers: Arc::new(RwLock::new(HashMap::new())), } } }
定义发布事件的方法produce:
impl Broker { ...... async fn produce(&self, topic: Topic, event: Event) { let mut events = self.events.write().await; events .entry(topic.clone()) .or_default() .push_back(event.clone()); // 异步通知所有订阅者 let subscribers_list; { let subscribers = self.subscribers.read().await; subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default(); } for ws_sender in subscribers_list { // 将事件发送到WebSocket客户端 let _ = ws_sender.send(warp::ws::Message::text(event.clone())); } } }
这个方法主要是将事件添加到相应的主题,然后将新事件通知所有订阅者。
定义subscribe方法,来管理新的订阅:
impl Broker { ...... pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) { let (ws_sender, mut ws_receiver) = socket.split(); let (tx, mut rx) = mpsc::unbounded_channel::(); { let mut subs = self.subscribers.write().await; subs.entry(topic).or_default().push(tx); } tokio::task::spawn(async move { while let Some(result) = ws_receiver.next().await { match result { Ok(message) => { // 处理有效的消息 if message.is_text() { println!( "Received message from client: {}", message.to_str().unwrap() ); } } Err(e) => { // 处理错误 eprintln!("WebSocket error: {:?}", e); break; } } } println!("WebSocket connection closed"); }); tokio::task::spawn(async move { let mut sender = ws_sender; while let Some(msg) = rx.recv().await { let _ = sender.send(msg).await; } }); } }
这个方法主要是将WebSocket拆分为发送方和接收方,将订阅者添加到订阅者列表中,处理传入的WebSocket消息。
main函数代码如下:
#[tokio::main] async fn main() { let broker = Arc::new(Broker::new()); let broker_clone1 = Arc::clone(&broker); let broker_clone2 = Arc::clone(&broker); let produce = warp::path!("produce" / String) .and(warp::post()) .and(warp::body::json()) .and(warp::any().map(move || Arc::clone(&broker_clone1))) .and_then( move |topic: String, event: Event, broker_clone2: Arc| async move { broker_clone2.produce(topic, event).await; Ok::(warp::reply()) }, ); let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map( move |topic: String, ws: warp::ws::Ws| { let broker_clone3 = Arc::clone(&broker_clone2); ws.on_upgrade(move |socket| async move { broker_clone3.subscribe(topic.clone(), socket).await; }) }, ); let routes = produce.or(subscribe); println!("Broker server running at http://127.0.0.1:3030"); warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; }
实现WebSocket客户端
WebSocket客户端将模拟一个订阅主题和接收消息的真实用户。
在src/bin目录下,创建一个ws_cli.rs文件。在文件中定义websocket_client函数,建立WebSocket连接并管理消息:
use futures_util::{sink::SinkExt, stream::StreamExt}; use std::sync::Arc; use tokio::sync::RwLock; use tokio::time::{sleep, Duration}; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use url::Url; async fn websocket_client(topic_url: &str) { // 解析要连接WebSocket服务器的URL let url = Url::parse(topic_url).expect("Invalid URL"); // 连接到WebSocket服务器 let (ws_stream, _) = connect_async(url).await.expect("Failed to connect"); println!("WebSocket client connected"); let (mut write, mut read) = ws_stream.split(); let message = Arc::new(RwLock::new(String::new())); let message_1 = message.clone(); // 生成一个任务来处理传入的消息 tokio::spawn(async move { let msg_lock = message_1.clone(); while let Some(message) = read.next().await { match message { Ok(msg) => { let mut ms = msg_lock.write().await; *ms = msg.to_text().unwrap().to_string(); println!("Received message: {}", msg.to_text().unwrap()); } Err(e) => { eprintln!("Error receiving message: {:?}", e); break; } } } }); // 发送消息 loop { let msg_lock = message.clone(); let ms = msg_lock.read().await; if let Err(e) = write.send(Message::Text(ms.to_string())).await { eprintln!("Error sending message: {:?}", e); break; } sleep(Duration::from_secs(5)).await; } }
main函数代码如下:
#[tokio::main] async fn main() { websocket_client("ws://127.0.0.1:3030/subscribe/newtopic").await; }
测试
执行如下命令运行消息代理服务器:
cargo run --bin real-ime-message
执行结果:
Broker server running at http://127.0.0.1:3030
然后打开一个新的命令行,执行如下命令运行WebSocket客户端:
cargo run --bin ws_cli
执行结果:
WebSocket client connected
向http://127.0.0.1:3030/produce/newtopic接口发送post请求,如图:
图片
客户端接收到消息:
WebSocket client connected Received message: This is a new event
总结
我们已经探索了在Rust中创建一个简单的消息代理,并使用WebSocket客户端对其进行测试。这个例子突出了Rust在构建高效、并发的网络应用程序方面的能力。