既是入门,也是踩坑

周末写了个 rust 的异步web服务器,感觉学到了不少有意思的东西,写篇文章给读者们分享分享。
本文主要提供给和我一样的新手阅读,其中错误还请各位大佬指正。


起源(废话)

写作目的

这部分的起源是我要做一个适配的服务器,一头是一个只能发送http协议的设备,另一头是只能处理MQTT协议的单片机。

其实能发送 http 就能发送 MQTT 的,只不过我懒得改那头的代码。

可选的(我会的),抽象层次够高的语言主要是Pythonrust,我对Python的多线程体验实在是算不上太好(加上不太会),所以选择使用 rust 开发。在开发过程中我犯下了许多错误,也经历了许多修正。我觉得这些经历很有用处,所以整理了代码写成系列文章。

在第一篇文章里面,我将实现一个错误百出的服务器,并且在后续文章中指出这些错误代码并进行修正。

为什么是异步

传统服务器采用的是同步方式处理请求,需要创建足够的的工作进程/线程来处理请求。假设请求的处理时间比较长,每秒并发量稍微高一点,传统服务器就需要创建几十上百个进程,这个成本实在是太高。所谓异步服务器就是针对处理时间进行优化,可以把那些耗时较长的任务放到后台执行,执行完成后返回执行结果给请求方。因为传统服务器的处理任务大多数都是可以放到后台执行的,所以异步服务器在这方面体现了极大的优势,不需要创建多个进程/协程就可以满足同样的并发量。在这样的基础上,异步逐渐成为流行的服务器处理方式。

从原理上来讲,同步处理线程调用 IO 等会阻塞线程的操作的时候,整个线程会被阻塞,操作系统要切换去其他线程处理请求。异步所谓的后台运行,主要是把这些操作更改为非阻塞的版本,通过事件监听、回调等方式先将这些操作交给操作系统内核或者其他硬件去处理,等处理完成之后再回头获取处理结果。在等待处理的时间里面,这个线程又可以去处理其他请求。

实际上异步和非阻塞并不是完全绑定的,既有同步非阻塞也有异步阻塞。不过目前我们只讨论异步非阻塞就是了。

可以看出来,只在请求处理中包含许多阻塞+耗时任务的时候,异步非阻塞模型才比同步模型有优势。如果请求的处理时间很短,又或者是请求都是在 CPU 上密集计算的情况(比如需要把请求提交的信息进行加密后返回),那么异步模型就比不上同步模型的情况了。因为虽然异步开销比创建线程小很多,但也不是完全免费的午餐。

总结就是,如果你的服务满足以下特点,那就推荐异步,否则还是同步比较好。

  • 大量并发。
  • 每个请求都有阻塞+耗时的任务要完成。

什么?你问我高并发高CPU耗时的服务怎么处理?我推荐你先解决系统架构的问题,不要梦想用一台服务器解决好吧。直接做方便横向扩展的服务器,然后堆机器就完事了。

为什么是 Rust

主要是 Rust 的异步糖很甜,第三方库管理还香。

主要来说市面上常见的异步糖都是基于协程实现的(C# 你怎么是线程实现的啊!),协程又分为 有栈(stackful)协程无栈(stackless)协程 。前者的代表是 Golang ,后者。。。有一堆语言用了后者, Rust 就是其中之一。

我直接贴一个介绍两者的链接:有栈协程与无栈协程

省流助手:

  • 有栈协程能在其任意嵌套函数中被挂起,无栈协程不能。
  • 无栈协程的切换比有栈协程的切换快,而且几乎没有额外的内存开销
  • 无栈协程关键字具有传染性,有栈协程没这问题。

画重点,要考的。

Rust 的语法糖很甜,我又正好会一点,加上学习项目没有历史包袱也没有 kpi 需求,就直接用了。

反正我也不会 Golang


逻辑梳理

流程图

如图所示,我需要在收到请求之后发送数据,并且等待回应才行。鉴于单片机有时候会突然下线或者不回应或者回应错误信息,我需要给http请求回应一个Success状态或者一个Failure

写成文字伪代码大概是这样:

发送MQTT请求
while(10)
    查看回应列表
    等待100ms
根据回应列表返回请求

开始编写!

选库

不是什么特别大的项目,本着易上手的原则,我直接选择用 actix-webpaho-mqtt 和其他的一些库(用到什么导入什么)。直接晒出基础Cargo.toml

[package]
name = "mcc"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix-web = "3"
serde = "1"
serde_json = "1"
paho-mqtt = "0.9.1"

服务器本体

首先还是根据 Actix 文档 把 web 服务器框架撸出来。

use actix_web::{post, App, Error, HttpResponse, HttpServer};

// 这个宏代表函数将在 post http://地址/ 的时候被调用
#[post("/")]
async fn command(info: String) -> Result<HttpResponse, Error> {
    Ok(HttpResponse::Ok().body(info)) // 直接 post 什么就返回什么
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| {
        // 绑定一下 factory
        App::new().service(command)
    })
    .bind("127.0.0.1:8080")? // 绑定一下服务器地址和端口
    .run()
    .await
}

使用 curl 测试一下是不是运行正常。

curl -X POST -i 'http://127.0.0.1:8080/' --data 'Hello World'

MQTT 消息的发送

虽然我自己有 MQTT 服务器,但是我想还是隐藏地址的好,这里就使用公共的 MQTT 服务器当示例啦。
参照 paho-mqtt 的示例把发送客户端写出来。

use paho_mqtt as mqtt;
use std::process;

// EMQ X 提供的 免费公共 MQTT 服务器地址
const DFLT_BROKER: &str = "tcp://broker.emqx.io:1883";

// 这个宏代表函数将在 post http://地址/ 的时候被调用
#[post("/")]
async fn command(info: String) -> Result<HttpResponse, Error> {
    // 创建客户端
    let client = mqtt::Client::new(DFLT_BROKER).unwrap_or_else(|e| {
        // 错误处理
        eprintln!("Error: {:?}", e);
        process::exit(1);
    });
    // 设置连接参数
    let connect_opts = mqtt::ConnectOptionsBuilder::new()
        .clean_session(true) // 退出后清理客户端
        .finalize();
    // 客户端连接服务器
    if let Err(e) = client.connect(connect_opts) { 
        // 错误处理
        eprintln!("Error: {:?}", e);
        process::exit(1);
    }
    // 组建消息
    let msg = mqtt::Message::new("mcc/test/202110", info.to_string(), mqtt::QOS_0);
    let re = client.publish(msg); // 发送消息
    match re {
        Ok(_) => {}
        Err(e) => {
            // 错误处理
            eprintln!("Error: {:?}", e);
        }
    }

    Ok(HttpResponse::Ok().body(info)) // 直接 post 什么就返回什么
}

再用之前的 curl 命令测试一下。感觉一切正常。。。。就是回应有点慢。
实际上这里的写法是错误的,因为连接 MQTT 服务器并不是瞬间就能连接上的。这个客户端应该只在启动时连接一次。
那么要想办法提前连接好客户端,需要的时候直接发送数据。在 Actix 文档 - Writing an Application 这里有相关的方法。
把客户端作为 Shared Mutable State 分享到所有线程。因为这是个唯一对象,所有工作线程共享一个对象,所以需要用互斥锁保护,避免不同线程同时发送消息。
对着代码一顿猛抄,把客户端的连接挪到主函数部分来完成。

虽然网址是 actix 的 docs ,但我感觉这明明是 actix-web 的 docs 啊。

use std::sync::Mutex;

// 客户端结构体
struct SendClient {
    // 因为这个对象会在各种线程之间传来传去,所以需要一个互斥锁保护它
    pub client_mutex: Mutex<mqtt::Client>,
}

// EMQ X 提供的 免费公共 MQTT 服务器地址
const DFLT_BROKER: &str = "tcp://broker.emqx.io:1883";

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // 创建客户端
    let client = mqtt::Client::new(DFLT_BROKER).unwrap_or_else(|e| {
        // 错误处理
        eprintln!("Error: {:?}", e);
        process::exit(1);
    });
    // 设置连接参数
    let connect_opts = mqtt::ConnectOptionsBuilder::new()
        .clean_session(true) // 退出后清理客户端
        .finalize();
    // 客户端连接服务器
    if let Err(e) = client.connect(connect_opts) {
        // 错误处理
        eprintln!("Error: {:?}", e);
        process::exit(1);
    }
    // 一个多线程有效的计数引用 ,这个 Data 底层是通过 Arc 实现的。
    let counter = web::Data::new(SendClient {
        client_mutex: Mutex::new(client),
    });

    HttpServer::new(
        // 这里改成移动闭包
        move || {
            App::new()
                .service(command) // 绑定一下 factory
                .app_data(counter.clone()) // 添加引用的拷贝
        },
    )
    .bind("127.0.0.1:8080")? // 绑定一下服务器地址和端口
    .run()
    .await
}

然后把处理请求的函数修改成这样

use actix_web::{error, post, web, App, Error, HttpResponse, HttpServer};

// 这个宏代表函数将在 post http://地址/ 的时候被调用
#[post("/")]
async fn command(counter: web::Data<SendClient>, info: String) -> Result<HttpResponse, Error> {
    // 在互斥锁保护下拿到客户端
    let lock_re = counter.get_ref().client_mutex.lock();
    match lock_re {
        Ok(ref cli) => {
            // 组建消息
            let msg = mqtt::Message::new("mcc/test/202110", info.to_string(), mqtt::QOS_0);
            let re = cli.publish(msg); // 发送消息
            match re {
                Ok(_) => {}
                Err(e) => {
                    // 错误处理
                    eprintln!("Error: {:?}", e);
                    return Err(error::ErrorInternalServerError("发送信息失败!"));
                }
            }
        }
        Err(e) => {
            eprintln!("Error: {:?}", e);
            return Err(error::ErrorInternalServerError("解锁失败!"));
        }
    }

    Ok(HttpResponse::Ok().body(info)) // 直接 post 什么就返回什么
}

现在再进行测试,相应的速度就非常快了。

这里有个有趣的事情

MQTT 消息的接收

最朴素的想法就是另起一个客户端,用生产者-消费者模型建立一个消息队列。翻了翻 paho-mqtt异步 Example ,参照代码写了个函数用于启动异步的客户端并返回消息队列的引用。

use std::time::Duration;
use std::{process, thread};

// 消息队列
struct ResultMutex {
    pub mutex: Mutex<Vec<String>>,
}

// EMQ X 提供的 免费公共 MQTT 服务器地址
const DFLT_BROKER: &str = "tcp://broker.emqx.io:1883";
// 订阅的主题,单片机消息将发送到这个主题
const DFLT_TOPICS: &[&str] = &["mcc/test/CmdResult"];

// 连接成功的时候订阅主题
fn on_connect_success(client: &mqtt::AsyncClient, _msgid: u16) {
    client.subscribe_many(DFLT_TOPICS, &[0]);
}

// 连接失败的时候过段时间重连
fn on_connect_failure(client: &mqtt::AsyncClient, _msgid: u16, _rc: i32) {
    thread::sleep(Duration::from_millis(1500));
    client.reconnect_with_callbacks(on_connect_success, on_connect_failure);
}

// 启动异步的客户端并返回消息队列的引用
fn start_receive_client() -> web::Data<ResultMutex> {
    let connect_opts = mqtt::ConnectOptionsBuilder::new()
        .clean_session(true) // 退出后清理客户端
        .finalize();
    let mut client = mqtt::AsyncClient::new(DFLT_BROKER).unwrap_or_else(|e| {
        // 错误处理
        eprintln!("Error: {:?}", e);
        process::exit(1);
    });

    // 消息队列的引用
    let counter = web::Data::new(ResultMutex {
        mutex: Mutex::new(vec![]),
    });
    let counter_clone = counter.clone();
    // 启动新的线程
    let _thread_handle = thread::spawn(move || {
        // 设置接收到消息的时候,把消息存入消息队列
        client.set_message_callback(move |_cli, msg| {
            if let Some(msg) = msg {
                let payload_str = msg.payload_str();
                let counter = counter.clone();
                let cmd_result: String = (&*payload_str).to_string();
                let mut array = counter.get_ref().mutex.lock().unwrap();
                array.push(cmd_result);
            }
            ()
        });
        client.connect_with_callbacks(connect_opts, on_connect_success, on_connect_failure);
        loop {}
    });
    counter_clone
}

在主函数中调用这个函数

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    /*
     * .......
     */

    // 消息队列的引用
    let counter_copy = start_receive_client();

    HttpServer::new(
        // 这里改成移动闭包
        move || {
            App::new()
                .service(command) // 绑定一下 factory
                .app_data(counter.clone()) // 添加 MQTT 消息客户端引用的拷贝
                .app_data(counter_copy.clone()) // 添加消息队列的引用拷贝
        },
    )
    .bind("127.0.0.1:8080")? // 绑定一下服务器地址和端口
    .run()
    .await
}

接下来就是在 command 函数里面获取这个消息队列,并试图读取消息了。

// 这个宏代表函数将在 post http://地址/ 的时候被调用
#[post("/")]
async fn command(
    result_mutex: web::Data<ResultMutex>,
    counter: web::Data<SendClient>,
    info: String,
) -> Result<HttpResponse, Error> {
    // 在互斥锁保护下拿到客户端
    let lock_re = counter.get_ref().client_mutex.lock();

    match lock_re {
        Ok(ref cli) => {
            // 组建消息
            let msg = mqtt::Message::new("mcc/test/202110", info.to_string(), mqtt::QOS_0);
            let re = cli.publish(msg); // 发送消息
            if let Err(e) = re {
                // 错误处理
                eprintln!("Error: {:?}", e);
                return Err(error::ErrorInternalServerError("发送信息失败!"));
            }
        }
        Err(e) => {
            eprintln!("Error: {:?}", e);
            return Err(error::ErrorInternalServerError("解锁失败!"));
        }
    }

    // 解锁
    let mut array = result_mutex.get_ref().mutex.lock().unwrap();
    for _num in 0..10 {
        // 休眠 100 ms
        thread::sleep(Duration::from_millis(100));
        // 检查消息队列
        if array.len() != 0 {
            let temp = array.remove(0);
            return Ok(HttpResponse::Ok().body(temp));
        }
    }

    return Ok(HttpResponse::Ok().body("请求没有回应"));
}

经过 curl 的测试,结果很不错。

$ curl -X POST -i 'http://127.0.0.1:8080/' --data 'Hello World'
HTTP/1.1 200 OK
content-length: 18
date: Tue, 19 Oct 2021 15:07:28 GMT

请求没有回应

总结

第一阶段的原型机全部代码如下:

use actix_web::{error, post, web, App, Error, HttpResponse, HttpServer};
use paho_mqtt as mqtt;
use std::sync::Mutex;
use std::time::Duration;
use std::{process, thread};

// 这个宏代表函数将在 post http://地址/ 的时候被调用
#[post("/")]
async fn command(
    result_mutex: web::Data<ResultMutex>,
    counter: web::Data<SendClient>,
    info: String,
) -> Result<HttpResponse, Error> {
    // 在互斥锁保护下拿到客户端
    let lock_re = counter.get_ref().client_mutex.lock();

    match lock_re {
        Ok(ref cli) => {
            // 组建消息
            let msg = mqtt::Message::new("mcc/test/202110", info.to_string(), mqtt::QOS_0);
            let re = cli.publish(msg); // 发送消息
            if let Err(e) = re {
                // 错误处理
                eprintln!("Error: {:?}", e);
                return Err(error::ErrorInternalServerError("发送信息失败!"));
            }
        }
        Err(e) => {
            eprintln!("Error: {:?}", e);
            return Err(error::ErrorInternalServerError("解锁失败!"));
        }
    }

    // 解锁
    let mut array = result_mutex.get_ref().mutex.lock().unwrap();
    for _num in 0..10 {
        // 休眠 100 ms
        thread::sleep(Duration::from_millis(100));
        // 检查消息队列
        if array.len() != 0 {
            let temp = array.remove(0);
            return Ok(HttpResponse::Ok().body(temp));
        }
    }

    return Ok(HttpResponse::Ok().body("请求没有回应"));
}

// 客户端结构体
struct SendClient {
    // 因为这个对象会在各种线程之间传来传去,所以需要一个互斥锁保护它
    pub client_mutex: Mutex<mqtt::Client>,
}

// 消息队列
struct ResultMutex {
    pub mutex: Mutex<Vec<String>>,
}

// EMQ X 提供的 免费公共 MQTT 服务器地址
const DFLT_BROKER: &str = "tcp://broker.emqx.io:1883";
// 订阅的主题,单片机消息将发送到这个主题
const DFLT_TOPICS: &[&str] = &["mcc/test/CmdResult"];

// 连接成功的时候订阅主题
fn on_connect_success(client: &mqtt::AsyncClient, _msgid: u16) {
    client.subscribe_many(DFLT_TOPICS, &[0]);
}

// 连接失败的时候过段时间重连
fn on_connect_failure(client: &mqtt::AsyncClient, _msgid: u16, _rc: i32) {
    thread::sleep(Duration::from_millis(1500));
    client.reconnect_with_callbacks(on_connect_success, on_connect_failure);
}

// 启动异步的客户端并返回消息队列的引用
fn start_receive_client() -> web::Data<ResultMutex> {
    let connect_opts = mqtt::ConnectOptionsBuilder::new()
        .clean_session(true) // 退出后清理客户端
        .finalize();
    let mut client = mqtt::AsyncClient::new(DFLT_BROKER).unwrap_or_else(|e| {
        // 错误处理
        eprintln!("Error: {:?}", e);
        process::exit(1);
    });

    // 消息队列的引用
    let counter = web::Data::new(ResultMutex {
        mutex: Mutex::new(vec![]),
    });
    let counter_clone = counter.clone();
    // 启动新的线程
    let _thread_handle = thread::spawn(move || {
        // 设置接收到消息的时候,把消息存入消息队列
        client.set_message_callback(move |_cli, msg| {
            if let Some(msg) = msg {
                let payload_str = msg.payload_str();
                let counter = counter.clone();
                let cmd_result: String = (&*payload_str).to_string();
                let mut array = counter.get_ref().mutex.lock().unwrap();
                array.push(cmd_result);
            }
            ()
        });
        client.connect_with_callbacks(connect_opts, on_connect_success, on_connect_failure);
        loop {}
    });
    counter_clone
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // 创建客户端
    let client = mqtt::Client::new(DFLT_BROKER).unwrap_or_else(|e| {
        // 错误处理
        eprintln!("Error: {:?}", e);
        process::exit(1);
    });
    // 设置连接参数
    let connect_opts = mqtt::ConnectOptionsBuilder::new()
        .clean_session(true) // 退出后清理客户端
        .finalize();
    // 客户端连接服务器
    if let Err(e) = client.connect(connect_opts) {
        // 错误处理
        eprintln!("Error: {:?}", e);
        process::exit(1);
    }
    // 一个多线程有效的计数引用 ,这个 Data 底层是通过 Arc 实现的。
    let counter = web::Data::new(SendClient {
        client_mutex: Mutex::new(client),
    });
    // 消息队列的引用
    let counter_copy = start_receive_client();
    HttpServer::new(
        // 这里改成移动闭包
        move || {
            App::new()
                .service(command) // 绑定一下 factory
                .app_data(counter.clone()) // 添加 MQTT 消息客户端引用的拷贝
                .app_data(counter_copy.clone()) // 添加消息队列的引用拷贝
        },
    )
    .bind("127.0.0.1:8080")? // 绑定一下服务器地址和端口
    .run()
    .await
}

实际上这份代码确实是原型机,一部分原因是因为这玩意完全没有异步,消息队列也很怪,非常容易出现bug。这部分内容我留到下一篇文章再来解决。

其实就是想骗阅读量。

文章目录