前置知识要求要会boost::beast的http写法和webserver写法
1.在http的回调函数里面判断get当前的路径/后面是什么,同时判断是否升级
void HttpConnection::HandleReq()
{
_response.version(_request.version());
_response.keep_alive(false);
if (_request.method() == boost::beast::http::verb::get)
{
PreParseGetParam();
//另一边需要一个智能指针做管理,所以shared_from_this
//bool success = LogicSystem::GetInstance()->HandleGet(_get_url, shared_from_this());
//看到这个if没有就是在这里判断url后缀匹配,以及是否升级
if (_get_url=="/web"&&boost::beast::websocket::is_upgrade(_request))
{
_response.keep_alive(true);//把连接是否保留置为true
//这是webssocket那个管理类的构造
auto con_ptr = std::make_shared<Connection>(this->_ioc, std::move(this->_socket), this->_request);
//这是http升级到websocket握手
con_ptr->HttpUpToWebsocketAsyncAccept();
return;
}
bool success = true;
if (!success)
{
_response.result(boost::beast::http::status::not_found);
_response.set(boost::beast::http::field::content_type, "text/plain");
boost::beast::ostream(_response.body()) << "url not found\r\n";
WriteResponse();
return;
}
_response.result(boost::beast::http::status::ok);
_response.set(boost::beast::http::field::server, "GateServer");
WriteResponse();
return;
}
if (_request.method() == boost::beast::http::verb::post)
{
//bool success = LogicSystem::GetInstance()->HandlePost(_request.target(), shared_from_this());
bool success = true;
if (!success)
{
_response.result(boost::beast::http::status::not_found);
_response.set(boost::beast::http::field::content_type, "text/plain");
boost::beast::ostream(_response.body()) << "url not found\r\n";
WriteResponse();
return;
}
_response.result(boost::beast::http::status::ok);
_response.set(boost::beast::http::field::server, "GateServer");
WriteResponse();
return;
}
}
2.Connection连接的修改,主要是对前面我写的文章里面的websocket,Connection修改
#include <boost/beast.hpp>
#include <boost/asio.hpp>
#include <memory.h>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <queue>
#include <mutex>
#include <boost/uuid/random_generator.hpp>
#include <iostream>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
namespace net = boost::asio;
namespace beast = boost::beast;
using namespace boost::beast;
using namespace boost::beast::websocket;
class Connection:public std::enable_shared_from_this<Connection>
{
public:
~Connection() { std::cout << "connection del" << std::endl; };
Connection(net::io_context& ioc);
Connection(net::io_context& ioc, boost::asio::ip::tcp::socket socket,
http::request<http::dynamic_body> request);
std::string GetUid();
net::ip::tcp::socket& GetSocket();
void HttpUpToWebsocketAsyncAccept();
void AsyncAccept();
void Start();
void Close();
void AsyncSend(std::string msg);
void SendHandle(error_code err, std::size_t buffer_bytes);
//看到awaitable了吗,这个就是说这个函数是可以等待的,类似nodejs的async
boost::asio::awaitable<void> XcSend(std::string msg);
private:
//websocket,socket
std::unique_ptr<stream<tcp_stream>>_ws_ptr;
//http升级到websocket使用的http::dynamic_body
http::request<http::dynamic_body> _request;
//雪花uuid管理
std::string _uuid;
//上下文通信管理
net::io_context& _ioc;
//消息缓存区,会自动去扩容
flat_buffer _recv_buffer;
//发送队列
std::queue<std::string>_send_que;
//原子锁
std::mutex send_mtx;
//是否停止
bool _b_close;
};
这里解释一下加了什么
void HttpUpToWebsocketAsyncAccept();因为和tcp升级到webserver握手写法不一样,所以重新写了个方法
http::request<http::dynamic_body> _request; //http升级到websocket使用的http::dynamic_body
Connection(net::io_context& ioc, boost::asio::ip::tcp::socket socket, http::request<http::dynamic_body> request);对Connection的构造进行了重载
3..cpp的实现(其实和之前的没什么大区别,都是协程异步读写,然后主要讲一下void Connection::HttpUpToWebsocketAsyncAccept())
#include "Connection.h"
#include "ConnectionMgr.h"
//构造函数接收到ioc,就去创建一个websocket管道,
//让_ws_ptr只能在这里这里使用,传递的参数有ioc,make_strand创建保证线程安全
Connection::Connection(net::io_context& ioc):
_ioc(ioc),_ws_ptr(std::make_unique<websocket::stream<tcp_stream>>(make_strand(ioc))),
_b_close(false)
{
boost::uuids::random_generator generator;
//雪花算法配置唯一uuid
this->_uuid = boost::uuids::to_string(generator());
}
Connection::Connection(net::io_context& ioc, boost::asio::ip::tcp::socket socket, http::request<http::dynamic_body> request):
_ioc(ioc),_ws_ptr(std::make_unique<websocket::stream<tcp_stream>>(std::move(socket))),
_request(request),_b_close(false)
{
boost::uuids::random_generator generator;
//雪花算法配置唯一uuid
this->_uuid = boost::uuids::to_string(generator());
}
std::string Connection::GetUid()
{
return this->_uuid;
}
//前面tcp握手需要websocket最底层的tcpsocket,beast有封装用get_lowest_layer就能获取到
net::ip::tcp::socket& Connection::GetSocket()
{
//返回最低层的socket
return boost::beast::get_lowest_layer(*_ws_ptr).socket();
}
void Connection::HttpUpToWebsocketAsyncAccept()
{
auto self = shared_from_this();
_ws_ptr->async_accept(_request, [self](boost::system::error_code err) {
try
{
if (!err)
{
//如果没有问题那么就把ConnectionMgr加到Mgr里面去管理
//这里封装了一个map的insert
ConnectionMgr::GetInstance()->AddConnection(self);
//if(self->_ws_ptr)
//然后就是开始监听,客户端发送是数据
self->Start();
}
else
{
std::cout << err.what() << std::endl;
}
}
catch (const std::exception& exp)
{
std::cout << "websocket err" << exp.what() << std::endl;
}
});
}
void Connection::AsyncAccept()
{
auto self = shared_from_this();
//webserver层面的握手,并伪闭包,防止在函数走完之后,直接就没了,导致接收不到回调
_ws_ptr->async_accept([self](boost::system::error_code err) {
try
{
if (!err)
{
//如果没有问题那么就把ConnectionMgr加到Mgr里面去管理
//这里封装了一个map的insert
ConnectionMgr::GetInstance()->AddConnection(self);
//if(self->_ws_ptr)
//然后就是开始监听,客户端发送是数据
self->Start();
}
else
{
std::cout << err.what() << std::endl;
}
}
catch (const std::exception&exp)
{
std::cout << "websocket err" << exp.what() << std::endl;
}
});
}
void Connection::Start()
{
//这里的self同理也是伪闭包
auto self = shared_from_this();
//如果读到没有错误buffer_bytes会有读到的数据多长,否则err
//然后要绑定我们的消息缓存区
//boost::asio::co_spawn协程的区域,开启协程,需要子线程的io_context,
//[=]()->boost::asio::awaitable<void> {}这个lambda表达式里面的函数是可以等待的
//最底下还有一个boost::asio::detached,就是设置线程和协程分离
boost::asio::co_spawn(_ioc, [=]()->boost::asio::awaitable<void> {
try
{
//如果没有关闭就一直循环
for (; !_b_close;)
{
//等待读,上面说了boost::asio::detached会让线程和协程分离,awaitable是可等待
//具体什么意思呢就是,co_await就是等待一个函数执行结束,如果没结束就卡在这,就是同步的意思
//但是卡住的是线程的协程而不是线程,线程还是正常工作
//buffer_bytes读到是字符数
//boost::asio::use_awaitable就是把原来绑定的回调函数给替代掉,不需要他回调了,这边等到他结束
std::size_t buffer_bytes = co_await _ws_ptr->async_read(_recv_buffer, boost::asio::use_awaitable);
std::cout <<"read from:"<< GetUid() << std::endl;
if (buffer_bytes==0)
{
std::cout << "empty data" << std::endl;
co_return;
}
//设置收到的数据为文本
_ws_ptr->text(_ws_ptr->got_text());
//接收到的数据
std::string recv_data = boost::beast::buffers_to_string(_recv_buffer.data());
//把缓冲区数据清除掉
_recv_buffer.consume(_recv_buffer.size());
std::cout << "websocket receive msg is" << recv_data << std::endl;
//然后这里就有趣了有3种写法,1是在协程里面调用协程去发数据,2.是co_await同步发,3.是用异步写回调去发
//boost::asio::co_spawn(_ioc, XcSend(recv_data), boost::asio::detached);
co_await XcSend(recv_data);
//AsyncSend(recv_data);
}
}
catch (const std::exception& exp)//现在错误会走到这里
{
std::cout << "exception is 11111" << exp.what() << std::endl;
Close();
ConnectionMgr::GetInstance()->RmvConnection(GetUid());
}
//看到boost::asio::detached了吗
}, boost::asio::detached);
}
void Connection::Close()
{
_ws_ptr->close(websocket::close_code::normal);
_b_close = true;
}
//这里的发送有讲究,1.是RAII技术,2.是消息队列
//RAII技术,局部管理资源技术,就是一个{}里面的数据到}的时候资源释放
//消息队列,保证每次发送的原子性,防止上一次还没发送完就被下一次发送覆盖掉
void Connection::AsyncSend(std::string msg)
{
//不要以顺序思维来看这段代码,你的想法是没错的,会有2-3个消息串行进入,加入消息队列中,就是卡住了
//恋恋风尘大哥写的消息队列确实有问题问题就在于这里是Lambda表达式,不是函数绑定
{
//RAII技术,锁管理,如果前面的消息还没发完不允许再发,等到消息队列为0再发送
std::lock_guard<std::mutex> lck_guard(send_mtx);
int que_len = this->_send_que.size();
_send_que.push(msg);
if (que_len > 0)
{
std::cout << GetUid() << " " << msg << " wait quelen: " << que_len << std::endl;
return;
}
}
auto self = shared_from_this();
_ws_ptr->async_write(boost::asio::buffer(msg.c_str(), msg.length()),
std::bind(&Connection::SendHandle, self, std::placeholders::_1, std::placeholders::_2));
}
void Connection::SendHandle(error_code err, std::size_t buffer_bytes)
{
try
{
if (err)
{
std::cout << "async_send err" << err.what() << std::endl;
ConnectionMgr::GetInstance()->RmvConnection(GetUid());
return;
}
std::string send_msg;
{
std::lock_guard<std::mutex> lck_gurad(send_mtx);
std::cout << GetUid() << " send" << std::endl;
_send_que.pop();
if (_send_que.empty())
{
return;
}
send_msg = _send_que.front();
}
auto self = shared_from_this();
_ws_ptr->async_write(boost::asio::buffer(send_msg.c_str(), send_msg.length()), std::bind(&Connection::SendHandle, self, std::placeholders::_1, std::placeholders::_2));
}
catch (const std::exception& exp)
{
std::cout << "async_send err" << exp.what() << std::endl;
ConnectionMgr::GetInstance()->RmvConnection(GetUid());
}
}
//协程发
boost::asio::awaitable<void> Connection::XcSend(std::string msg)
{
{
std::lock_guard<std::mutex> lck_guard(send_mtx);
int que_len = this->_send_que.size();
_send_que.push(msg);
if (que_len > 0)
{
std::cout <<GetUid()<<" " << msg << " wait quelen: " << que_len << std::endl;
co_return;
}
}
//如果消息队列不是空的时候,把全部都发出去
for (;!_send_que.empty();)
{
//之前的msg逻辑要改,自己动脑好吧
std::string send_msg;
{
std::lock_guard<std::mutex> lck_gurad(send_mtx);
std::cout << GetUid() << " send:" << send_msg << std::endl;
if (_send_que.empty())
{
co_return;
}
send_msg = _send_que.front();
_send_que.pop();
}
//等待一次发结束后再发
co_await _ws_ptr->async_write(boost::asio::buffer(msg.c_str(), msg.length()),boost::asio::use_awaitable);
}
}
先看重载的构造函数
Connection(net::io_context& ioc, boost::asio::ip::tcp::socket socket, http::request<http::dynamic_body> request):_ioc(ioc),_ws_ptr(std::make_unique<websocket::stream<tcp_stream>>(std::move(socket))),_request(request),_b_close(false)
_ws_ptr用的是http的socket移动过来构造的,为什么用move,因为boost规定的这玩意不能拷贝和赋值构造只能移动过来
_requesthttp升级到websocket使用的http::dynamic_body
接下来单独把void Connection::HttpUpToWebsocketAsyncAccept()抽出来讲一下
_ws_ptr->async_accept(_request, self {});
这个是最主要的,_request实际是noost::beast::http::request<http::dynamic_body> _request
你可以点进去看一下boost的源码对async_accept的重载
你会发现和之前的async_accept不同参数是这两个
http::request<Body, http::basic_fields
AcceptHandler&& handler//一个回调函数
没错下面这个就是http升级到websocket才使用的async_accept,不能用原来的那个否则是连不通的
怎么说呢搞这玩意花了我一天的时间主要还是文档太少也不太清晰,样例也少,而已还有那种自以为是的rz乱写博客,误导性极强(怎么说呢Coffee_Candy在这里向Coffee God and Candy God许愿这种rz赶紧exterminate,没学明白写什么博客),最后还是看你llfc大哥的博客才看明白了怎么一回事,llfc yyds,其实这里也只是补充了异步的写法,同步的写法可以移步到llfc大哥的博客上面去看llfc大哥的http升级到websocket的博客链接
4.Nodejs测试代码
import WebSocket from 'ws';
// 连接到 WebSocket 服务器
const url = 'ws://127.0.0.1:10087/web'; // 确保使用 IPv4 地址
const ws = new WebSocket(url);
// 连接打开时的事件
ws.on('open', () => {
console.log('Connected to the server');
ws.send('Hello, c1!');
// ws.send('Hello, c2!');
// ws.send('Hello, c3!');
// ws.send('Hello, c4!');
// ws.send('Hello, c5!');
});
// 接收到消息时的事件
ws.on('message', (message) => {
const receivedMessage = Buffer.isBuffer(message) ? message.toString() : message;
console.log('Received:', receivedMessage);
});
// 连接关闭时的事件
ws.on('close', () => {
console.log('Disconnected from the server');
});
// 错误处理
ws.on('error', (error) => {
console.error('Error:', error);
});
评论已关闭