5.4. 异步 I/O

5.4.1. I/O 多路复用

5.4.1.1. select

#include <sys/select.h>

void select_example(std::vector<int>& sockets) {
    fd_set read_fds;
    int max_fd = 0;
    
    FD_ZERO(&read_fds);
    for (int sock : sockets) {
        FD_SET(sock, &read_fds);
        max_fd = std::max(max_fd, sock);
    }
    
    timeval timeout = {5, 0};  // 5 秒超时
    int ready = select(max_fd + 1, &read_fds, nullptr, nullptr, &timeout);
    
    if (ready > 0) {
        for (int sock : sockets) {
            if (FD_ISSET(sock, &read_fds)) {
                // sock 可读
                handle_socket(sock);
            }
        }
    }
}

5.4.1.2. poll

#include <poll.h>

void poll_example(std::vector<int>& sockets) {
    std::vector<pollfd> fds;
    for (int sock : sockets) {
        fds.push_back({sock, POLLIN, 0});
    }
    
    int ready = poll(fds.data(), fds.size(), 5000);  // 5000ms 超时
    
    if (ready > 0) {
        for (const auto& pfd : fds) {
            if (pfd.revents & POLLIN) {
                handle_socket(pfd.fd);
            }
            if (pfd.revents & (POLLERR | POLLHUP)) {
                // 错误或断开
            }
        }
    }
}

5.4.1.3. epoll (Linux)

#include <sys/epoll.h>

class Epoll {
    int epfd_;
    
public:
    Epoll() {
        epfd_ = epoll_create1(0);
        if (epfd_ < 0) {
            throw std::system_error(errno, std::system_category());
        }
    }
    
    ~Epoll() {
        close(epfd_);
    }
    
    void add(int fd, uint32_t events = EPOLLIN) {
        epoll_event ev;
        ev.events = events;
        ev.data.fd = fd;
        if (epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev) < 0) {
            throw std::system_error(errno, std::system_category());
        }
    }
    
    void modify(int fd, uint32_t events) {
        epoll_event ev;
        ev.events = events;
        ev.data.fd = fd;
        epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev);
    }
    
    void remove(int fd) {
        epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr);
    }
    
    int wait(epoll_event* events, int max_events, int timeout_ms) {
        return epoll_wait(epfd_, events, max_events, timeout_ms);
    }
};

// 事件循环
void event_loop() {
    Epoll ep;
    ep.add(server_fd, EPOLLIN);
    
    epoll_event events[64];
    while (running) {
        int n = ep.wait(events, 64, 1000);
        for (int i = 0; i < n; ++i) {
            if (events[i].data.fd == server_fd) {
                // 新连接
                int client = accept(server_fd, ...);
                ep.add(client, EPOLLIN | EPOLLET);  // 边缘触发
            } else {
                handle_client(events[i].data.fd);
            }
        }
    }
}

5.4.2. Boost.Asio

5.4.2.1. 同步操作

#include <boost/asio.hpp>

namespace asio = boost::asio;
using tcp = asio::ip::tcp;

void sync_client() {
    asio::io_context io;
    tcp::socket socket(io);
    
    // 连接
    tcp::resolver resolver(io);
    auto endpoints = resolver.resolve("example.com", "80");
    asio::connect(socket, endpoints);
    
    // 发送
    std::string request = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n";
    asio::write(socket, asio::buffer(request));
    
    // 接收
    asio::streambuf response;
    asio::read_until(socket, response, "\r\n");
}

5.4.2.2. 异步操作

class AsyncClient {
    asio::io_context& io_;
    tcp::socket socket_;
    asio::streambuf buffer_;
    
public:
    AsyncClient(asio::io_context& io) : io_(io), socket_(io) {}
    
    void connect(const std::string& host, const std::string& port) {
        tcp::resolver resolver(io_);
        auto endpoints = resolver.resolve(host, port);
        
        asio::async_connect(socket_, endpoints,
            [this](boost::system::error_code ec, tcp::endpoint) {
                if (!ec) {
                    do_write();
                }
            });
    }
    
private:
    void do_write() {
        std::string request = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n";
        asio::async_write(socket_, asio::buffer(request),
            [this](boost::system::error_code ec, size_t) {
                if (!ec) {
                    do_read();
                }
            });
    }
    
    void do_read() {
        asio::async_read_until(socket_, buffer_, "\r\n",
            [this](boost::system::error_code ec, size_t) {
                if (!ec) {
                    std::istream is(&buffer_);
                    std::string line;
                    std::getline(is, line);
                    std::cout << line << "\n";
                }
            });
    }
};

// 使用
asio::io_context io;
AsyncClient client(io);
client.connect("example.com", "80");
io.run();  // 事件循环

5.4.2.3. 协程风格 (C++20)

#include <boost/asio/co_spawn.hpp>
#include <boost/asio/use_awaitable.hpp>

asio::awaitable<void> async_client() {
    auto executor = co_await asio::this_coro::executor;
    tcp::socket socket(executor);
    
    tcp::resolver resolver(executor);
    auto endpoints = co_await resolver.async_resolve(
        "example.com", "80", asio::use_awaitable);
    
    co_await asio::async_connect(socket, endpoints, asio::use_awaitable);
    
    std::string request = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n";
    co_await asio::async_write(socket, asio::buffer(request), 
                               asio::use_awaitable);
    
    asio::streambuf buffer;
    co_await asio::async_read_until(socket, buffer, "\r\n",
                                    asio::use_awaitable);
}

// 使用
asio::io_context io;
asio::co_spawn(io, async_client(), asio::detached);
io.run();

5.4.3. libuv

#include <uv.h>

void on_connect(uv_connect_t* req, int status) {
    if (status < 0) {
        fprintf(stderr, "Connect error: %s\n", uv_strerror(status));
        return;
    }
    // 连接成功
}

void libuv_example() {
    uv_loop_t* loop = uv_default_loop();
    
    uv_tcp_t socket;
    uv_tcp_init(loop, &socket);
    
    struct sockaddr_in dest;
    uv_ip4_addr("127.0.0.1", 8080, &dest);
    
    uv_connect_t connect_req;
    uv_tcp_connect(&connect_req, &socket, 
                   (const struct sockaddr*)&dest, on_connect);
    
    uv_run(loop, UV_RUN_DEFAULT);
    uv_loop_close(loop);
}

5.4.4. Reactor vs Proactor

5.4.4.1. Reactor 模式

1. 注册事件
2. 等待事件就绪
3. 分发事件
4. 处理程序同步处理

特点:事件处理程序负责 I/O 操作
示例:epoll, select, libevent

5.4.4.2. Proactor 模式

1. 发起异步 I/O 操作
2. 系统完成 I/O
3. 通知完成
4. 处理程序处理结果

特点:系统完成 I/O,处理程序只处理结果
示例:Windows IOCP, Boost.Asio (Windows)

小技巧

异步 I/O 选择建议:

  • Linux 高性能: epoll + 非阻塞 I/O

  • 跨平台: Boost.Asio 或 libuv

  • 简单场景: select/poll

  • 现代 C++: Asio + C++20 协程