Coffee_Candy

239 分类: nodejs后端开发,C++boost后端

C++boost::beast WebServer C++20标准协程并发(nodejs做客户端测试)(从上一篇改过来的,建议把上一篇先看了)

怎么说呢上一篇文章里面是没有main()的因为要结合这一篇

main()文件

#include <iostream>
#include "WebSocketServer.h"
#include <csignal>
#include <thread>
#include <mutex>
#include "AsioIOServicePool.h"
int main()
{
    try
    {       //这里有个线程池,因为是多线程+协程并发编程嘛,这里按cpu核数分配线程跑一个子线程的io_context
        auto& pool = AsioIOServicePool::GetInstance();
            //这个是主线程的io_context
        boost::asio::io_context io_context;
            //结束信号
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
            //收到信号之后会把线程池里面的io_context给停掉,以及主线程的io_conxtext也停掉
        signals.async_wait([&io_context, &pool](auto,auto) {
            io_context.stop();
            pool.Stop();
        });
            //这里开启创建服务连接,也就是准备tcp握手
        WebSocketServer server(io_context, 10086);
            //开始服务,也就是可以接收握手请求了
        server.StartAccept();
        io_context.run();
    }
    catch (const std::exception&exp)
    {
        std::cout << "err " << exp.what() << std::endl;
    }
    //net::io_context ioc;
    //WebSocketServer server(ioc, 10086);
    //server.StartAccept();
    //ioc.run();
}

AsioIOServicePool io_context线程池文件

.h文件

#include <boost/asio.hpp>
#include <vector>
#include <iostream>
class AsioIOServicePool
{
public:
    using IOService = boost::asio::io_context;
    using Work = boost::asio::io_context::work;
    using WorkPtr = std::unique_ptr<Work>;
    AsioIOServicePool& operator = (const AsioIOServicePool&) = delete;
    AsioIOServicePool(const AsioIOServicePool&) = delete;
    ~AsioIOServicePool() { std::cout << "asioservicepool delete" << std::endl; };
    //获取一个子线程的io_context
    boost::asio::io_context& GetIOService();
    //线程池停止工作
    void Stop();
    //获取单例,因为含参的原因,没有用之前的单例模板,而是按c++11之后static为线程安全的情况来书写单例
    static AsioIOServicePool& GetInstance();
private:
    //size默认为系统cpu核数
    AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());
    //一个vector用来记录子线程的io_context
    std::vector<IOService> _ioServices;
    //记录io_context的work,怎么说呢你创建的io_context是需要有事干的
    //但是如果在没收到消息调用GetIOService的时候,io_context是宕机状态,会被销毁掉,所以要给它一个默认的工作
    std::vector<WorkPtr> _works;
    //记录线程
    std::vector<std::thread> _threads;
    //下一个io_context在_ioServices中的位置
    std::size_t _nextIOService;
};

.cpp文件

#include "AsioIOServicePool.h"
#include <iostream>

using namespace std;
//轮询获取一个io_context
boost::asio::io_context& AsioIOServicePool::GetIOService()
{
    auto& servic = _ioServices[_nextIOService++];
    if (_nextIOService == _ioServices.size())
    {
        _nextIOService = 0;
    }
    return servic;
}
//停止同时把线程给停掉
void AsioIOServicePool::Stop()
{
    for (auto& work : _works)
    {
        work.reset();
    }
    for (auto& t : _threads)
    {
        t.join();
    }
}
//拿到池子的单例
AsioIOServicePool& AsioIOServicePool::GetInstance()
{
    static AsioIOServicePool instance(std::thread::hardware_concurrency());
    return instance;
}
//初始化池子
AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),
_works(size),_nextIOService(0)
{
    for (std::size_t i = 0; i < size; i++)
    {
        //创建工作,需要给一个右值,也就是代码运行时才知道的东西
        _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
    }

    for (std::size_t i = 0; i < _ioServices.size(); i++)
    {
        //这里不用push是为了方便和提高性能
        //push是拷贝构造,而emplace_back是直接构造
        //如果用push的话应该先创建一个临时的线程然后move进去,麻烦而且多了个移动操作
        _threads.emplace_back([this, i]() {
            _ioServices[i].run();
        });
    }
}

ok到主要的地方嘞Connection的修改

Connection.h主要是加了
boost::asio::awaitable XcSend(std::string msg);
void Close();
bool _b_close;

#include <boost/beast.hpp>
#include <boost/asio.hpp>
#include <memory.h>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <queue>
#include <mutex>
#include <boost/uuid/random_generator.hpp>
#include <iostream>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>

namespace net = boost::asio;
namespace beast = boost::beast;
using namespace boost::beast;
using namespace boost::beast::websocket;

class Connection:public std::enable_shared_from_this<Connection>
{
public:
    Connection(net::io_context& ioc);
    std::string GetUid();
    net::ip::tcp::socket& GetSocket();
    void AsyncAccept();
    void Start();
    void Close();
    void AsyncSend(std::string msg);
    void SendHandle(error_code err, std::size_t buffer_bytes);

    //看到awaitable了吗,这个就是说这个函数是可以等待的,类似nodejs的async
    boost::asio::awaitable<void> XcSend(std::string msg);

private:
    //websocket,socket
    std::unique_ptr<stream<tcp_stream>>_ws_ptr;
    //雪花uuid管理
    std::string _uuid;
    //上下文通信管理
    net::io_context& _ioc;
    //消息缓存区,会自动去扩容
    flat_buffer _recv_buffer;
    //发送队列
    std::queue<std::string>_send_que;
    //原子锁
    std::mutex send_mtx;
    //是否停止
    bool _b_close=false;
};

.cpp修改就不放全部文件了基本上差不多的

close关闭socket,其实服务器应该不用特地去关,客户端断开就好

void Connection::Close()
{
    _ws_ptr->close(websocket::close_code::normal);
    _b_close = true;
}

start开始收发

void Connection::Start()
{
    //这里的self同理也是伪闭包
    auto self = shared_from_this();
    //如果读到没有错误buffer_bytes会有读到的数据多长,否则err
    //然后要绑定我们的消息缓存区

    //boost::asio::co_spawn协程的区域,开启协程,需要子线程的io_context,
    //[=]()->boost::asio::awaitable<void> {}这个lambda表达式里面的函数是可以等待的
    //最底下还有一个boost::asio::detached,就是设置线程和协程分离
    boost::asio::co_spawn(_ioc, [=]()->boost::asio::awaitable<void> {
        try
        {
            //如果没有关闭就一直循环
            for (; !_b_close;)
            {
                //等待读,上面说了boost::asio::detached会让线程和协程分离,awaitable是可等待
                //具体什么意思呢就是,co_await就是等待一个函数执行结束,如果没结束就卡在这,就是同步的意思
                //但是卡住的是线程的协程而不是线程,线程还是正常工作
                //buffer_bytes读到是字符数
                            //boost::asio::use_awaitable就是把原来绑定的回调函数给替代掉,不需要他回调了,这边等到他结束
                std::size_t buffer_bytes = co_await _ws_ptr->async_read(_recv_buffer, boost::asio::use_awaitable);
                std::cout <<"read from:"<< GetUid() << std::endl;
                if (buffer_bytes==0)
                {
                    std::cout << "empty data" << std::endl;
                    co_return;
                }
                //设置收到的数据为文本
                _ws_ptr->text(_ws_ptr->got_text());
                //接收到的数据
                std::string recv_data = boost::beast::buffers_to_string(_recv_buffer.data());
                //把缓冲区数据清除掉
                _recv_buffer.consume(_recv_buffer.size());
                //std::cout << "websocket receive msg is" << recv_data << std::endl;
                
                //然后这里就有趣了有3种写法,1是在协程里面调用协程去发数据,2.是co_await同步发,3.是用异步写回调去发
                //boost::asio::co_spawn(_ioc, XcSend(recv_data), boost::asio::detached);
                co_await XcSend(recv_data);
                //AsyncSend(recv_data);

            }
        }
        catch (const std::exception& exp)//现在错误会走到这里
        {
            std::cout << "exception is" << exp.what() << std::endl;
            Close();
            ConnectionMgr::GetInstance()->RmvConnection(GetUid());
        }

        //看到boost::asio::detached了吗
        }, boost::asio::detached);
}

XcSend线程发

boost::asio::awaitable<void> Connection::XcSend(std::string msg)
{
    {
        std::lock_guard<std::mutex> lck_guard(send_mtx);
        int que_len = this->_send_que.size();
        _send_que.push(msg);
        if (que_len > 0)
        {
            std::cout <<GetUid()<<" " << msg << " wait quelen: " << que_len << std::endl;
            co_return;
        }
    }
    //如果消息队列不是空的时候,把全部都发出去
    for (;!_send_que.empty();)
    {
        //之前的msg逻辑要改,自己动脑好吧
        std::string send_msg;
        {
            std::lock_guard<std::mutex> lck_gurad(send_mtx);
            std::cout << GetUid() << " send:" << send_msg << std::endl;
            if (_send_que.empty())
            {
                co_return;
            }
            send_msg = _send_que.front();
            _send_que.pop();
        }
        //等待一次发结束后再发
        co_await _ws_ptr->async_write(boost::asio::buffer(msg.c_str(), msg.length()),boost::asio::use_awaitable);
    }
}

好了大概就改这么多,主要是收发改协程了,当然你闲的话把接收改协程也没问题(如果你看懂了应该也不难)

然后接下来主要是看这3种写法区别

  1. boost::asio::co_spawn(_ioc, XcSend(recv_data), boost::asio::detached);

这个是在协程中调用协程,效果就是当出现连发的情况下,服务器收比发的快的时候,收不会被发卡住(看打印是不是乱的就能看出来,消息队列的长度是会>1的)
2024-10-28T08:46:15.png

  1. co_await XcSend(recv_data);

这个写法就是要等到一次发发完才能下一次收,发没发完读会被卡住(就是消息队列里面每次都只有一条消息,这样子是不行的,性能不好)
2024-10-28T08:47:04.png

  1. AsyncSend(recv_data);

这个写法和第一种的效果是一样的,但是区别就是开销问题,每次写都开一个协程是要耗费时间和资源的,所以不建议去用第一种,应该协程和回调的写法混合来用,可以达到性能最高
2024-10-28T08:50:45.png

nodejs测试代码

import WebSocket from 'ws';

// 连接到 WebSocket 服务器
const url = 'ws://127.0.0.1:10086'; // 确保使用 IPv4 地址
const ws = new WebSocket(url);

// 连接打开时的事件
ws.on('open', () => {
    console.log('Connected to the server');
    ws.send('Hello, c1!');
    ws.send('Hello, c2!');
    ws.send('Hello, c3!');
    ws.send('Hello, c4!');
    ws.send('Hello, c5!');

});

// 接收到消息时的事件
ws.on('message', (message) => {
    const receivedMessage = Buffer.isBuffer(message) ? message.toString() : message;
    console.log('Received:', receivedMessage);
});

// 连接关闭时的事件
ws.on('close', () => {
    console.log('Disconnected from the server');
});

// 错误处理
ws.on('error', (error) => {
    console.error('Error:', error);
});

const ws2 = new WebSocket(url);

// 连接打开时的事件
ws2.on('open', () => {
    console.log('Connected to the server');
    ws2.send('Hello, a1!');
    ws2.send('Hello, a2!');
    ws2.send('Hello, a3!');
    ws2.send('Hello, a4!');
    ws2.send('Hello, a5!');

});

// 接收到消息时的事件
ws2.on('message', (message) => {
    const receivedMessage = Buffer.isBuffer(message) ? message.toString() : message;
    console.log('Received2:', receivedMessage);
});

// 连接关闭时的事件
ws2.on('close', () => {
    console.log('Disconnected from the server');
});

// 错误处理
ws2.on('error', (error) => {
    console.error('Error:', error);
});

#none

作者: Coffee_Candy

版权: 除特别声明,均采用BY-NC-SA 4.0许可协议,转载请表明出处

目录Content

评论已关闭