怎么说呢上一篇文章里面是没有main()的因为要结合这一篇
main()文件
#include <iostream>
#include "WebSocketServer.h"
#include <csignal>
#include <thread>
#include <mutex>
#include "AsioIOServicePool.h"
int main()
{
try
{ //这里有个线程池,因为是多线程+协程并发编程嘛,这里按cpu核数分配线程跑一个子线程的io_context
auto& pool = AsioIOServicePool::GetInstance();
//这个是主线程的io_context
boost::asio::io_context io_context;
//结束信号
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
//收到信号之后会把线程池里面的io_context给停掉,以及主线程的io_conxtext也停掉
signals.async_wait([&io_context, &pool](auto,auto) {
io_context.stop();
pool.Stop();
});
//这里开启创建服务连接,也就是准备tcp握手
WebSocketServer server(io_context, 10086);
//开始服务,也就是可以接收握手请求了
server.StartAccept();
io_context.run();
}
catch (const std::exception&exp)
{
std::cout << "err " << exp.what() << std::endl;
}
//net::io_context ioc;
//WebSocketServer server(ioc, 10086);
//server.StartAccept();
//ioc.run();
}
AsioIOServicePool io_context线程池文件
.h文件
#include <boost/asio.hpp>
#include <vector>
#include <iostream>
class AsioIOServicePool
{
public:
using IOService = boost::asio::io_context;
using Work = boost::asio::io_context::work;
using WorkPtr = std::unique_ptr<Work>;
AsioIOServicePool& operator = (const AsioIOServicePool&) = delete;
AsioIOServicePool(const AsioIOServicePool&) = delete;
~AsioIOServicePool() { std::cout << "asioservicepool delete" << std::endl; };
//获取一个子线程的io_context
boost::asio::io_context& GetIOService();
//线程池停止工作
void Stop();
//获取单例,因为含参的原因,没有用之前的单例模板,而是按c++11之后static为线程安全的情况来书写单例
static AsioIOServicePool& GetInstance();
private:
//size默认为系统cpu核数
AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency());
//一个vector用来记录子线程的io_context
std::vector<IOService> _ioServices;
//记录io_context的work,怎么说呢你创建的io_context是需要有事干的
//但是如果在没收到消息调用GetIOService的时候,io_context是宕机状态,会被销毁掉,所以要给它一个默认的工作
std::vector<WorkPtr> _works;
//记录线程
std::vector<std::thread> _threads;
//下一个io_context在_ioServices中的位置
std::size_t _nextIOService;
};
.cpp文件
#include "AsioIOServicePool.h"
#include <iostream>
using namespace std;
//轮询获取一个io_context
boost::asio::io_context& AsioIOServicePool::GetIOService()
{
auto& servic = _ioServices[_nextIOService++];
if (_nextIOService == _ioServices.size())
{
_nextIOService = 0;
}
return servic;
}
//停止同时把线程给停掉
void AsioIOServicePool::Stop()
{
for (auto& work : _works)
{
work.reset();
}
for (auto& t : _threads)
{
t.join();
}
}
//拿到池子的单例
AsioIOServicePool& AsioIOServicePool::GetInstance()
{
static AsioIOServicePool instance(std::thread::hardware_concurrency());
return instance;
}
//初始化池子
AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size),
_works(size),_nextIOService(0)
{
for (std::size_t i = 0; i < size; i++)
{
//创建工作,需要给一个右值,也就是代码运行时才知道的东西
_works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
}
for (std::size_t i = 0; i < _ioServices.size(); i++)
{
//这里不用push是为了方便和提高性能
//push是拷贝构造,而emplace_back是直接构造
//如果用push的话应该先创建一个临时的线程然后move进去,麻烦而且多了个移动操作
_threads.emplace_back([this, i]() {
_ioServices[i].run();
});
}
}
ok到主要的地方嘞Connection的修改
Connection.h主要是加了
boost::asio::awaitable
void Close();
bool _b_close;
#include <boost/beast.hpp>
#include <boost/asio.hpp>
#include <memory.h>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <queue>
#include <mutex>
#include <boost/uuid/random_generator.hpp>
#include <iostream>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
namespace net = boost::asio;
namespace beast = boost::beast;
using namespace boost::beast;
using namespace boost::beast::websocket;
class Connection:public std::enable_shared_from_this<Connection>
{
public:
Connection(net::io_context& ioc);
std::string GetUid();
net::ip::tcp::socket& GetSocket();
void AsyncAccept();
void Start();
void Close();
void AsyncSend(std::string msg);
void SendHandle(error_code err, std::size_t buffer_bytes);
//看到awaitable了吗,这个就是说这个函数是可以等待的,类似nodejs的async
boost::asio::awaitable<void> XcSend(std::string msg);
private:
//websocket,socket
std::unique_ptr<stream<tcp_stream>>_ws_ptr;
//雪花uuid管理
std::string _uuid;
//上下文通信管理
net::io_context& _ioc;
//消息缓存区,会自动去扩容
flat_buffer _recv_buffer;
//发送队列
std::queue<std::string>_send_que;
//原子锁
std::mutex send_mtx;
//是否停止
bool _b_close=false;
};
.cpp修改就不放全部文件了基本上差不多的
close关闭socket,其实服务器应该不用特地去关,客户端断开就好
void Connection::Close()
{
_ws_ptr->close(websocket::close_code::normal);
_b_close = true;
}
start开始收发
void Connection::Start()
{
//这里的self同理也是伪闭包
auto self = shared_from_this();
//如果读到没有错误buffer_bytes会有读到的数据多长,否则err
//然后要绑定我们的消息缓存区
//boost::asio::co_spawn协程的区域,开启协程,需要子线程的io_context,
//[=]()->boost::asio::awaitable<void> {}这个lambda表达式里面的函数是可以等待的
//最底下还有一个boost::asio::detached,就是设置线程和协程分离
boost::asio::co_spawn(_ioc, [=]()->boost::asio::awaitable<void> {
try
{
//如果没有关闭就一直循环
for (; !_b_close;)
{
//等待读,上面说了boost::asio::detached会让线程和协程分离,awaitable是可等待
//具体什么意思呢就是,co_await就是等待一个函数执行结束,如果没结束就卡在这,就是同步的意思
//但是卡住的是线程的协程而不是线程,线程还是正常工作
//buffer_bytes读到是字符数
//boost::asio::use_awaitable就是把原来绑定的回调函数给替代掉,不需要他回调了,这边等到他结束
std::size_t buffer_bytes = co_await _ws_ptr->async_read(_recv_buffer, boost::asio::use_awaitable);
std::cout <<"read from:"<< GetUid() << std::endl;
if (buffer_bytes==0)
{
std::cout << "empty data" << std::endl;
co_return;
}
//设置收到的数据为文本
_ws_ptr->text(_ws_ptr->got_text());
//接收到的数据
std::string recv_data = boost::beast::buffers_to_string(_recv_buffer.data());
//把缓冲区数据清除掉
_recv_buffer.consume(_recv_buffer.size());
//std::cout << "websocket receive msg is" << recv_data << std::endl;
//然后这里就有趣了有3种写法,1是在协程里面调用协程去发数据,2.是co_await同步发,3.是用异步写回调去发
//boost::asio::co_spawn(_ioc, XcSend(recv_data), boost::asio::detached);
co_await XcSend(recv_data);
//AsyncSend(recv_data);
}
}
catch (const std::exception& exp)//现在错误会走到这里
{
std::cout << "exception is" << exp.what() << std::endl;
Close();
ConnectionMgr::GetInstance()->RmvConnection(GetUid());
}
//看到boost::asio::detached了吗
}, boost::asio::detached);
}
XcSend线程发
boost::asio::awaitable<void> Connection::XcSend(std::string msg)
{
{
std::lock_guard<std::mutex> lck_guard(send_mtx);
int que_len = this->_send_que.size();
_send_que.push(msg);
if (que_len > 0)
{
std::cout <<GetUid()<<" " << msg << " wait quelen: " << que_len << std::endl;
co_return;
}
}
//如果消息队列不是空的时候,把全部都发出去
for (;!_send_que.empty();)
{
//之前的msg逻辑要改,自己动脑好吧
std::string send_msg;
{
std::lock_guard<std::mutex> lck_gurad(send_mtx);
std::cout << GetUid() << " send:" << send_msg << std::endl;
if (_send_que.empty())
{
co_return;
}
send_msg = _send_que.front();
_send_que.pop();
}
//等待一次发结束后再发
co_await _ws_ptr->async_write(boost::asio::buffer(msg.c_str(), msg.length()),boost::asio::use_awaitable);
}
}
好了大概就改这么多,主要是收发改协程了,当然你闲的话把接收改协程也没问题(如果你看懂了应该也不难)
然后接下来主要是看这3种写法区别
- boost::asio::co_spawn(_ioc, XcSend(recv_data), boost::asio::detached);
这个是在协程中调用协程,效果就是当出现连发的情况下,服务器收比发的快的时候,收不会被发卡住(看打印是不是乱的就能看出来,消息队列的长度是会>1的)
- co_await XcSend(recv_data);
这个写法就是要等到一次发发完才能下一次收,发没发完读会被卡住(就是消息队列里面每次都只有一条消息,这样子是不行的,性能不好)
- AsyncSend(recv_data);
这个写法和第一种的效果是一样的,但是区别就是开销问题,每次写都开一个协程是要耗费时间和资源的,所以不建议去用第一种,应该协程和回调的写法混合来用,可以达到性能最高
nodejs测试代码
import WebSocket from 'ws';
// 连接到 WebSocket 服务器
const url = 'ws://127.0.0.1:10086'; // 确保使用 IPv4 地址
const ws = new WebSocket(url);
// 连接打开时的事件
ws.on('open', () => {
console.log('Connected to the server');
ws.send('Hello, c1!');
ws.send('Hello, c2!');
ws.send('Hello, c3!');
ws.send('Hello, c4!');
ws.send('Hello, c5!');
});
// 接收到消息时的事件
ws.on('message', (message) => {
const receivedMessage = Buffer.isBuffer(message) ? message.toString() : message;
console.log('Received:', receivedMessage);
});
// 连接关闭时的事件
ws.on('close', () => {
console.log('Disconnected from the server');
});
// 错误处理
ws.on('error', (error) => {
console.error('Error:', error);
});
const ws2 = new WebSocket(url);
// 连接打开时的事件
ws2.on('open', () => {
console.log('Connected to the server');
ws2.send('Hello, a1!');
ws2.send('Hello, a2!');
ws2.send('Hello, a3!');
ws2.send('Hello, a4!');
ws2.send('Hello, a5!');
});
// 接收到消息时的事件
ws2.on('message', (message) => {
const receivedMessage = Buffer.isBuffer(message) ? message.toString() : message;
console.log('Received2:', receivedMessage);
});
// 连接关闭时的事件
ws2.on('close', () => {
console.log('Disconnected from the server');
});
// 错误处理
ws2.on('error', (error) => {
console.error('Error:', error);
});
评论已关闭