asio
- Boost.Asio是一个跨平台的C++库,主要用于网络和其他一些底层的I/O编程。
- Boost.Asio成功的抽象了包括网络,串口、文件在内的同步和异步的I/O操作:
- Boost.Asio支持大多数操作系统,轻量可扩展的,目前网络部分支持TCP, UDP, IMCP,可以很方便的扩展自定义协议。
注意
- 把lambda表达式声明成类得成员变量 ,而不是一个局部变量。
- 虽然io_service拷贝handler以备异步调用,但是handler还引用了作为外部变量得自身,而io_service是不知道外部变量得,也就不会拷贝。
- 如果lambda表达式是一个函数内局部变量,那离开函数作用域 就会消失,io_service执行handler时候就会发生错误。
asio程序优雅的退出
- Server对象还监听信号量,从而优雅的退出。
- signal_set.async_wait(SignalHandler handler)
- void handler(const system::error_code&ec , int signal_number)
#include <cstdlib>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <iostream>
using namespace boost;
using namespace boost::asio;
using namespace std;
class Server {
public:
Server(io_service & s)
: io_(s), signals_(s) {
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif
signals_.async_wait(boost::bind(&Server::stop, this));
}
void run() {
io_.run();
}
private:
void stop() {
cout << "x" << endl;
io_.stop();
}
private:
io_service& io_;
boost::asio::signal_set signals_;
};
int main(int argc, char** argv) {
io_service s;
Server server(s);
server.run();
return 0;
}
定时器
- steady_timer.async_wait(TimerHandler handler)
- void handler(const system::error_code&ec , int signal_number)
io_service io;
steady_timer t(io,boost::asio::chrono::seconds(5));
auto handler = [](const boost::system::error_code &ec){
cout <<"hello 5"<<endl;
};
t.async_wait(handler);
简易服务端
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/smart_ptr.hpp>
using namespace boost::asio;
using boost::system::error_code;
using ip::tcp;
struct CHelloWorld_Service{
CHelloWorld_Service(io_service &iosev)
:m_iosev(iosev),m_acceptor(iosev, tcp::endpoint(tcp::v4(), 1000))
{
}
void start()
{
// 开始等待连接(非阻塞)
boost::shared_ptr<tcp::socket> psocket(new tcp::socket(m_iosev));
// 触发的事件只有error_code参数,所以用boost::bind把socket绑定进去
m_acceptor.async_accept(*psocket,
boost::bind(&CHelloWorld_Service::accept_handler, this, psocket, _1)
);
}
// 有客户端连接时accept_handler触发
void accept_handler(boost::shared_ptr<tcp::socket> psocket, error_code ec)
{
if(ec) return;
// 继续等待连接
start();
// 显示远程IP
std::cout << psocket->remote_endpoint().address() << std::endl;
psocket->async_read_some(buffer(m_buf)),
boost::bind(&CHelloWorld_Service::read_handler ,this ,_1,_2));
// 发送信息(非阻塞)
// boost::shared_ptr<std::string> pstr(new std::string("hello async world!"));
// psocket->async_write_some(buffer(*pstr),
// boost::bind(&CHelloWorld_Service::write_handler, this, pstr, _1, _2)
// );
}
// 异步写操作完成后write_handler触发
void write_handler(boost::shared_ptr<std::string> pstr,
error_code ec, size_t bytes_transferred)
{
if(ec)
std::cout<< "发送失败!" << std::endl;
else
std::cout<< *pstr << " 已发送" << std::endl;
}
void read_handler(error_code ec, size_t bytes_transferred)
{
if(ec)
std::cout<< "read失败!" << std::endl;
else
std::cout<< *pstr << " read succ" << std::endl;
}
private:
io_service &m_iosev;
ip::tcp::acceptor m_acceptor;
vector<char> m_buf;
};
注意:
- 可以把跟这个服务端口连接的socket , boost::shared_ptr< tcp::socket > psocket(new tcp::socket(m_iosev)); 存起来
- 在需要reset得时候 把vec< socket > 遍历关掉,再reset
拓展服务端:
class Connection:
#include <vector>
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
using namespace boost;
using namespace boost::asio;
using ip::tcp;
using namespace std;
using boost::system::error_code;
class ConnectionsManager ;
class Connection: public boost::enable_shared_from_this<Connection>
{
public:
Connection(io_service& s, ConnectionsManager& cons);
~Connection();
void StartWork();
void CloseSocket();
void AfterReadChar(boost::system::error_code const& ec,std::size_t len);
public:
tcp::socket socket;
private:
vector<char> read_buffer_;
ConnectionsManager& cons_;
};
#include <iostream>
#include <boost/bind.hpp>
#include "Connection.h"
#include "ConnectionsManager.h"
using namespace std;
Connection::Connection(io_service &s, ConnectionsManager &cons): socket(s), read_buffer_(1, 0), cons_(cons)
{
cout <<"Connection" <<endl;
}
Connection::~Connection()
{
cout << "~Connection" << endl;
}
void Connection::StartWork()
{
cout << "the new connection object is starting now." << endl;
async_read(socket, buffer(read_buffer_),
boost::bind(&Connection::AfterReadChar, shared_from_this(), _1,_2));
}
void Connection::CloseSocket()
{
cout << "closing the socket" << endl;
socket.shutdown(tcp::socket::shutdown_both);
socket.close();
}
void Connection::AfterReadChar(boost::system::error_code const &ec,std::size_t len )
{
if (ec) {
cout << "ec message :"<< ec.value()<<"---" <<ec.message() <<endl;
cons_.Remove(shared_from_this());
return;
}
char x = read_buffer_[0];
if (x == 'a') {
cout << "correct data received" << endl;
async_read(socket, buffer(read_buffer_),
boost::bind(&Connection::AfterReadChar, shared_from_this(), _1,_2));
CloseSocket();
} else {
cout << "wrong data received, char is:" << (int) x << endl;
CloseSocket();
cons_.Remove(shared_from_this());
}
}
class ConnectionsManager:
#include <set>
#include <algorithm>
#include <boost/shared_array.hpp>
using namespace std;
using namespace boost;
#include "Connection.h"
class ConnectionsManager
{
public:
void Add(boost::shared_ptr<Connection>& con);
void Remove(boost::shared_ptr<Connection> con);
void CloseAll();
private:
set<boost::shared_ptr<Connection> > values_;
};
#include <iostream>
#include <boost/bind.hpp>
#include "ConnectionsManager.h"
using namespace std;
void ConnectionsManager::Add(boost::shared_ptr<Connection> &con)
{
values_.insert(con);
cout << "the number of connections is: " << values_.size() << endl;
}
void ConnectionsManager::Remove(boost::shared_ptr<Connection> con)
{
values_.erase(con);
}
void ConnectionsManager::CloseAll()
{
for_each(values_.begin(), values_.end(), boost::bind(&Connection::CloseSocket, _1));
values_.clear();
}
class Server:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <vector>
#include <algorithm>
#include "Connection.h"
#include "ConnectionsManager.h"
using namespace boost;
using namespace boost::asio;
using ip::tcp;
using namespace std;
using boost::system::error_code;
class Server
{
public:
Server(io_service & s, tcp::endpoint const& listen_endpoint)
: io_(s), signals_(s), acceptor_(s, listen_endpoint) {
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif
signals_.async_wait(boost::bind(&Server::Stop, this));
boost::shared_ptr<Connection> c(new Connection(io_, cons_));
acceptor_.async_accept(c->socket,
boost::bind(&Server::AfterAccept, this, c, _1));
}
void Run() {
io_.run();
}
void AfterAccept(boost::shared_ptr<Connection>& c, boost::system::error_code const& ec) {
// Check whether the server was stopped by a signal before this completion
// handler had a chance to run.
if (!acceptor_.is_open()) {
return;
}
if (!ec) {
cons_.Add(c);
c->StartWork();
boost::shared_ptr<Connection> c2(new Connection(io_, cons_));
acceptor_.async_accept(c2->socket,
boost::bind(&Server::AfterAccept, this, c2, _1));
}
}
private:
void Stop() {
cout << "stopping" << endl;
acceptor_.close(); //reject new conn
cons_.CloseAll();
io_.stop();
}
private:
io_service& io_;
boost::asio::signal_set signals_;
tcp::acceptor acceptor_;
ConnectionsManager cons_;
};
main:
io_service s;
tcp::endpoint listen_endpoint(tcp::v4(), 8888);
Server server(s, listen_endpoint);
server.Run();
其他
- 可以io_service.run()可以在多个线程内执行 ,此时需要strand来wrap防止竞争,待续