asio

  • Boost.Asio是一个跨平台的C++库,主要用于网络和其他一些底层的I/O编程。
  • Boost.Asio成功的抽象了包括网络,串口、文件在内的同步和异步的I/O操作:
  • Boost.Asio支持大多数操作系统,轻量可扩展的,目前网络部分支持TCP, UDP, IMCP,可以很方便的扩展自定义协议。

注意

  • 把lambda表达式声明成类得成员变量 ,而不是一个局部变量。
  • 虽然io_service拷贝handler以备异步调用,但是handler还引用了作为外部变量得自身,而io_service是不知道外部变量得,也就不会拷贝。
  • 如果lambda表达式是一个函数内局部变量,那离开函数作用域 就会消失,io_service执行handler时候就会发生错误。

asio程序优雅的退出

  • Server对象还监听信号量,从而优雅的退出。
  • signal_set.async_wait(SignalHandler handler)
  • void handler(const system::error_code&ec , int signal_number)
#include <cstdlib>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <iostream>
 
using namespace boost;
using namespace boost::asio;
 
using namespace std;
 
class Server {
    
public:
    Server(io_service & s)
    : io_(s), signals_(s) {
            signals_.add(SIGINT);
            signals_.add(SIGTERM);
#if defined(SIGQUIT)
            signals_.add(SIGQUIT);
#endif
            signals_.async_wait(boost::bind(&Server::stop, this));
    }
    
    void run() {
        io_.run();
    }
    
private:
    void stop() {
        cout << "x" << endl;
        io_.stop();
    }
    
private:
    io_service& io_;
    boost::asio::signal_set signals_;
};
 
 
int main(int argc, char** argv) {
    io_service s;
    Server server(s);
    server.run();
    return 0;
}

定时器

  • steady_timer.async_wait(TimerHandler handler)
  • void handler(const system::error_code&ec , int signal_number)
    io_service io;
    steady_timer t(io,boost::asio::chrono::seconds(5));
    auto handler = [](const boost::system::error_code &ec){
        cout <<"hello 5"<<endl;
    };
    t.async_wait(handler);

简易服务端

#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/smart_ptr.hpp>
using namespace boost::asio;
using boost::system::error_code;
using ip::tcp;
struct CHelloWorld_Service{
     CHelloWorld_Service(io_service &iosev)
         :m_iosev(iosev),m_acceptor(iosev, tcp::endpoint(tcp::v4(), 1000))
     {
     }
    void start()
     {
        // 开始等待连接(非阻塞)
         boost::shared_ptr<tcp::socket> psocket(new tcp::socket(m_iosev));
        // 触发的事件只有error_code参数,所以用boost::bind把socket绑定进去
         m_acceptor.async_accept(*psocket,
             boost::bind(&CHelloWorld_Service::accept_handler, this, psocket, _1)
             );
     }
    // 有客户端连接时accept_handler触发
    void accept_handler(boost::shared_ptr<tcp::socket> psocket, error_code ec)
     {
        if(ec) return;
        // 继续等待连接
         start();
        // 显示远程IP
         std::cout << psocket->remote_endpoint().address() << std::endl;
        psocket->async_read_some(buffer(m_buf)),
        		boost::bind(&CHelloWorld_Service::read_handler ,this ,_1,_2));
        
        
        // 发送信息(非阻塞)
  //       boost::shared_ptr<std::string> pstr(new std::string("hello async world!"));
  //       psocket->async_write_some(buffer(*pstr),
  //           boost::bind(&CHelloWorld_Service::write_handler, this, pstr, _1, _2)
  //           );
     }
    // 异步写操作完成后write_handler触发
    void write_handler(boost::shared_ptr<std::string> pstr,
         error_code ec, size_t bytes_transferred)
     {
        if(ec)
             std::cout<< "发送失败!" << std::endl;
        else
             std::cout<< *pstr << " 已发送" << std::endl;
     }
    
     void read_handler(error_code ec, size_t bytes_transferred)
     {
        if(ec)
             std::cout<< "read失败!" << std::endl;
        else
             std::cout<< *pstr << " read succ" << std::endl;
     }
private:
     io_service &m_iosev;
     ip::tcp::acceptor m_acceptor;
    vector<char> m_buf;
};

注意:

  • 可以把跟这个服务端口连接的socket , boost::shared_ptr< tcp::socket > psocket(new tcp::socket(m_iosev)); 存起来
  • 在需要reset得时候 把vec< socket > 遍历关掉,再reset

拓展服务端:

class Connection:

#include <vector>
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>

using namespace boost;
using namespace boost::asio;
using ip::tcp;
using namespace std;
using boost::system::error_code;
class ConnectionsManager ;

class Connection: public boost::enable_shared_from_this<Connection>
{
public:
    Connection(io_service& s, ConnectionsManager& cons);

    ~Connection();

    void StartWork();

    void CloseSocket();

    void AfterReadChar(boost::system::error_code const& ec,std::size_t len);

public:
    tcp::socket socket;

private:
    vector<char> read_buffer_;
    ConnectionsManager& cons_;
};


#include <iostream>
#include <boost/bind.hpp>
#include "Connection.h"
#include "ConnectionsManager.h"
using namespace std;
Connection::Connection(io_service &s, ConnectionsManager &cons): socket(s), read_buffer_(1, 0), cons_(cons)
{
    cout <<"Connection" <<endl;
}

Connection::~Connection()
{
    cout << "~Connection" << endl;
}

void Connection::StartWork()
{

    cout << "the new connection object is starting now." << endl;
    async_read(socket, buffer(read_buffer_),
               boost::bind(&Connection::AfterReadChar, shared_from_this(), _1,_2));
}

void Connection::CloseSocket()
{
    cout << "closing the socket" << endl;
    socket.shutdown(tcp::socket::shutdown_both);
    socket.close();
}

void Connection::AfterReadChar(boost::system::error_code const &ec,std::size_t len )
{
    if (ec) {
        cout << "ec message :"<< ec.value()<<"---" <<ec.message() <<endl;
        cons_.Remove(shared_from_this());
        return;
    }

    char x = read_buffer_[0];
    if (x == 'a') {
        cout << "correct data received" << endl;
        async_read(socket, buffer(read_buffer_),
                   boost::bind(&Connection::AfterReadChar, shared_from_this(), _1,_2));
        CloseSocket();
    } else {
        cout << "wrong data received, char is:" << (int) x << endl;
        CloseSocket();
        cons_.Remove(shared_from_this());
    }
}

class ConnectionsManager:


#include <set>
#include <algorithm>
#include <boost/shared_array.hpp>

using namespace std;
using namespace boost;

#include "Connection.h"


class ConnectionsManager
{

public:
    void Add(boost::shared_ptr<Connection>& con);

    void Remove(boost::shared_ptr<Connection> con);

    void CloseAll();

private:
    set<boost::shared_ptr<Connection> > values_;
};

#include <iostream>
#include <boost/bind.hpp>
#include "ConnectionsManager.h"
using namespace std;
void ConnectionsManager::Add(boost::shared_ptr<Connection> &con)
{
    values_.insert(con);
    cout << "the number of connections is: " << values_.size() << endl;
}

void ConnectionsManager::Remove(boost::shared_ptr<Connection> con)
{
    values_.erase(con);
}

void ConnectionsManager::CloseAll()
{
    for_each(values_.begin(), values_.end(), boost::bind(&Connection::CloseSocket, _1));
    values_.clear();
}

class Server:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <vector>
#include <algorithm>
#include "Connection.h"
#include "ConnectionsManager.h"

using namespace boost;
using namespace boost::asio;
using ip::tcp;
using namespace std;
using boost::system::error_code;


class Server
{
public:

    Server(io_service & s, tcp::endpoint const& listen_endpoint)
            : io_(s), signals_(s), acceptor_(s, listen_endpoint) {
        signals_.add(SIGINT);
        signals_.add(SIGTERM);
#if defined(SIGQUIT)
        signals_.add(SIGQUIT);
#endif
        signals_.async_wait(boost::bind(&Server::Stop, this));
        boost::shared_ptr<Connection> c(new Connection(io_, cons_));

        acceptor_.async_accept(c->socket,
                               boost::bind(&Server::AfterAccept, this, c, _1));
    }

    void Run() {
        io_.run();
    }

    void AfterAccept(boost::shared_ptr<Connection>& c, boost::system::error_code const& ec) {
        // Check whether the server was stopped by a signal before this completion
        // handler had a chance to run.
        if (!acceptor_.is_open()) {
            return;
        }

        if (!ec) {
            cons_.Add(c);
            c->StartWork();
            boost::shared_ptr<Connection> c2(new Connection(io_, cons_));
            acceptor_.async_accept(c2->socket,
                                   boost::bind(&Server::AfterAccept, this, c2, _1));
        }
    }

private:

    void Stop() {
        cout << "stopping" << endl;
        acceptor_.close(); //reject new conn
        cons_.CloseAll();
        io_.stop();
    }

private:
    io_service& io_;
    boost::asio::signal_set signals_;
    tcp::acceptor acceptor_;
    ConnectionsManager cons_;


};

main:

    io_service s;
    tcp::endpoint listen_endpoint(tcp::v4(), 8888);

    Server server(s, listen_endpoint);
    server.Run();

其他

  • 可以io_service.run()可以在多个线程内执行 ,此时需要strand来wrap防止竞争,待续