流程
- WebSocketServer类实现tcp acceptor的握手
- tcp握手结束后升级到Websocket,Connection管理某个socket读写
- ConnectionMgr管理Connection
流程1
namespace net = boost::asio;
namespace beast = boost::beast;
using namespace boost::beast;
using namespace boost::beast::websocket;
.h文件如下
#include "ConnectionMgr.h"
class WebSocketServer
{
public:
WebSocketServer(net::io_context&ioc,unsigned short port);
WebSocketServer& operater(const WebSocketServer& ws) = delete;//赋值构造禁用
WebSocketServer(const WebSocketServer &) = delete;//拷贝构造禁用
void StartAccept();
private:
net::ip::tcp::acceptor _acceptor;
net::io_context& _ioc;
};
_acceptor用于tcp握手
_ioc类似QT中的.exe轮询同时会有类似信号的机制,用于异步调用函数,当收到信号对应的回调函数会被调用
.cpp文件
#include "WebSocketServer.h"
//_acceptor的构建传入ioc就是前面的轮询管理上下文,以及绑定id协议和端口号
WebSocketServer::WebSocketServer(net::io_context& ioc, unsigned short port):
_ioc(ioc),_acceptor(ioc,net::ip::tcp::endpoint(net::ip::tcp::v4(),port))
{
std::cout << "Server start on port:" << port << std::endl;
}
//开始监听握手
void WebSocketServer::StartAccept()
{
//创建的连接类后面会讲
//创建的时候会创建一个全双工的管道websocket
auto con_ptr = std::make_shared<Connection>(_ioc);
//tcp握手,监听到握手之后,判断有没有错误,没有就升级到去websocket握手,重新调用StartAccept()监听下一个连接
//然后我们是要拿到websocket最底层的tcpsocket去做握手,GetSocket是后面封装的获取websocket底层tcpsocket的方法
_acceptor.async_accept(con_ptr->GetSocket(), [this,con_ptr](error_code err) {
try
{
if (!err)
{
//websocket握手
con_ptr->AsyncAccept();
}
else
{
std::cout << "acceptor async_acceptor err" << err.what() << std::endl;
}
StartAccept();
}
catch (const std::exception&exp)
{
std::cout << "async_accept error is" << exp.what() << std::endl;
}
});
}
流程2
.h文件
#include <boost/beast.hpp>
#include <boost/asio.hpp>
#include <memory.h>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <queue>
#include <mutex>
#include <boost/uuid/random_generator.hpp>
#include <iostream>
namespace net = boost::asio;
namespace beast = boost::beast;
using namespace boost::beast;
using namespace boost::beast::websocket;
class Connection:public std::enable_shared_from_this<Connection>
{
public:
Connection(net::io_context& ioc);
std::string GetUid();
net::ip::tcp::socket& GetSocket();
void AsyncAccept();
void Start();
void AsyncSend(std::string msg);
private:
//websocket,socket
std::unique_ptr<stream<tcp_stream>>_ws_ptr;
//雪花uuid管理
std::string _uuid;
//上下文通信管理
net::io_context& _ioc;
//消息缓存区,会自动去扩容
flat_buffer _recv_buffer;
//发送队列
std::queue<std::string>_send_que;
//原子锁
std::mutex send_mtx;
};
_ws_ptr websocket的socket其实更形象点就是tcp流字符流,webserver是以字符流发送的有边界,tcp是以比特流发送的没边界会粘包
_uuid 雪花uuid管理
_ioc 上下文通信管理
flat_buffer _recv_buffer 消息缓存区,会自动去扩容
_send_que 发送队列
send_mtx 原子锁
.cpp
#include "Connection.h"
#include "ConnectionMgr.h"
//构造函数接收到ioc,就去创建一个websocket管道,
//让_ws_ptr只能在这里这里使用,传递的参数有ioc,make_strand创建保证线程安全
Connection::Connection(net::io_context& ioc):
_ioc(ioc),_ws_ptr(std::make_unique<websocket::stream<tcp_stream>>(make_strand(ioc)))
{
boost::uuids::random_generator generator;
//雪花算法配置唯一uuid
this->_uuid = boost::uuids::to_string(generator());
}
std::string Connection::GetUid()
{
return this->_uuid;
}
//前面tcp握手需要websocket最底层的tcpsocket,beast有封装用get_lowest_layer就能获取到
net::ip::tcp::socket& Connection::GetSocket()
{
//返回最低层的socket
return boost::beast::get_lowest_layer(*_ws_ptr).socket();
}
void Connection::AsyncAccept()
{
auto self = shared_from_this();
//webserver层面的握手,并伪闭包,防止在函数走完之后,直接就没了,导致接收不到回调
_ws_ptr->async_accept([self](boost::system::error_code err) {
try
{
if (!err)
{
//如果没有问题那么就把ConnectionMgr加到Mgr里面去管理
//这里封装了一个map的insert
ConnectionMgr::GetInstance()->AddConnection(self);
//然后就是开始监听,客户端发送是数据
self->Start();
}
else
{
std::cout << err.what() << std::endl;
}
}
catch (const std::exception&exp)
{
std::cout << "websocket err" << exp.what() << std::endl;
}
});
}
void Connection::Start()
{
//这里的self同理也是伪闭包
auto self = shared_from_this();
//如果读到没有错误buffer_bytes会有读到的数据多长,否则err
//然后要绑定我们的消息缓存区
_ws_ptr->async_read(_recv_buffer, [self](error_code err,std::size_t buffer_bytes) {
try
{
if (err)//断开连接后会走到这里
{
std::cout << "websocket read err" << std::endl;
//如果err被收到的话就在Mgr中把对应的Connection他去除掉
ConnectionMgr::GetInstance()->RmvConnection(self->GetUid());
return;
}
//设置收到的数据为文本
self->_ws_ptr->text(self->_ws_ptr->got_text());
//接收到的数据
std::string recv_data = boost::beast::buffers_to_string(self->_recv_buffer.data());
//把缓冲区数据清除掉
self->_recv_buffer.consume(self->_recv_buffer.size());
std::cout << "websocket receive msg is" << recv_data << std::endl;
//调用发送数据
self->AsyncSend(std::move(recv_data));
//再次开启读操作
self->Start();
}
catch (const std::exception&exp)
{
std::cout<< "exception is" << exp.what() << std::endl;
ConnectionMgr::GetInstance()->RmvConnection(self->GetUid());
}
});
}
//这里的发送有讲究,1.是RAII技术,2.是消息队列
//RAII技术,局部管理资源技术,就是一个{}里面的数据到}的时候资源释放
//消息队列,保证每次发送的原子性,防止上一次还没发送完就被下一次发送覆盖掉
void Connection::AsyncSend(std::string msg)
{
//不要以顺序思维来看这段代码,你的想法是没错的,会有2-3个消息串行进入,加入消息队列中,并且会被卡住
//没错恋恋风尘大哥的这里写的消息队列确实有问题问题就在于这里是Lambda表达式而不是函数绑定(仅视频中的bug,仓库源码没有问题)
{
//RAII技术,锁管理,如果前面的消息还没发完不允许再发,等到消息队列为0再发送
std::lock_guard<std::mutex> lck_guard(send_mtx);
int que_len = this->_send_que.size();
_send_que.push(msg);
if (que_len > 0)return;
}
auto self = shared_from_this();
_ws_ptr->async_write(boost::asio::buffer(msg.c_str(), msg.length()),
[self](error_code err, std::size_t nsize)
{
try
{
if (err)
{
std::cout << "async_send err" << err.what() << std::endl;
ConnectionMgr::GetInstance()->RmvConnection(self->GetUid());
return;
}
std::string send_msg;
{
std::lock_guard<std::mutex> lck_gurad(self->send_mtx);
self->_send_que.pop();
if (self->_send_que.empty())
{
return;
}
send_msg = self->_send_que.front();
}
self->AsyncSend(std::move(send_msg));//这一条是有问题的不应该这样写
}
catch (const std::exception&exp)
{
std::cout << "async_send err" << exp.what() << std::endl;
ConnectionMgr::GetInstance()->RmvConnection(self->GetUid());
}
});
}
恋恋风尘大哥写的消息队列这里是存在问题的,你看下面的客户端收到的消息只有两条
所以我们应该改成之前的函数绑定的那种消息队列的写法
void Connection::AsyncSend(std::string msg)
{
{
std::lock_guard<std::mutex> lck_guard(send_mtx);
int que_len = this->_send_que.size();
_send_que.push(msg);
if (que_len > 0)return;
}
auto self = shared_from_this();
//用函数绑定的写法
_ws_ptr->async_write(boost::asio::buffer(msg.c_str(), msg.length()),
std::bind(&Connection::SendHandle, self, std::placeholders::_1, std::placeholders::_2));
}
void Connection::SendHandle(error_code err, std::size_t buffer_bytes)
{
try
{
if (err)
{
std::cout << "async_send err" << err.what() << std::endl;
ConnectionMgr::GetInstance()->RmvConnection(GetUid());
return;
}
std::string send_msg;
{
std::lock_guard<std::mutex> lck_gurad(send_mtx);
_send_que.pop();
if (_send_que.empty())
{
return;
}
send_msg = _send_que.front();
}
auto self = shared_from_this();
//在绑定的函数里面再调用写,保证队列写完
_ws_ptr->async_write(boost::asio::buffer(send_msg.c_str(), send_msg.length()),
std::bind(&Connection::SendHandle, self, std::placeholders::_1, std::placeholders::_2));
}
catch (const std::exception& exp)
{
std::cout << "async_send err" << exp.what() << std::endl;
ConnectionMgr::GetInstance()->RmvConnection(GetUid());
}
}
改完之后就正常了
流程3
.h文件
#include "Connection.h"
#include "boost/unordered_map.hpp"
#include "Singleton.h"
class ConnectionMgr:public Singleton<ConnectionMgr>,std::enable_shared_from_this<ConnectionMgr>
{
public:
//添加连接
void AddConnection(std::shared_ptr<Connection>conptr);
//删除连接
void RmvConnection(std::string id);
private:
//map记录连接
boost::unordered_map<std::string, std::shared_ptr<Connection>> _map_cons;
};
.cpp文件
#include "ConnectionMgr.h"
void ConnectionMgr::AddConnection(std::shared_ptr<Connection> conptr)
{
this->_map_cons[conptr->GetUid()] = conptr;
}
void ConnectionMgr::RmvConnection(std::string id)
{
this->_map_cons.erase(id);
}
评论已关闭