Reactor模型

Reactor模型

引言

概念

  • Reactor模型:又称为反应堆模型,它是一种基于事件驱动I/O多路复用的设计模式,常用于处理大量并发I/O事件,是一个高性能模型,它通过事件驱动的方式,高效地管理大量并发连接,实现了对事件的同步解耦和高效处理,广泛应用于网络服务器的开发中
  • 事件:事件就是类似欲I/O操作中的accept、connect、read、write等等不同的事件
  • 事件驱动:不再是检测发生的事件类型调用对应的函数,而是通过提前注册好事件以及对应的回调函数,当事件发生时调用其回调函数,由事件来主导处理
  • Reactor模型也分单Reactor单线程模型单Reactor多线程模型多Reactor多线程模型(主从Reactor多线程模型)单Reactor多进程模型(主从reactor多进程模型)等等
  • 本文只讲解单Reactor多线程模型和主从Reactor多线程模型,单Reactor单线程可参考Moon’s Blog

区别

模型 特点 优点 缺点
单Reactor单线程 Reactor、事件处理器都在同一线程中运行,适用于连接数较少、业务处理简单的场景 实现简单,数据无需在线程间传递,线程安全性高 无法利用多核CPU,性能有限;当事件处理器执行耗时操作时,会阻塞整个事件循环
单Reactor多线程 Reactor在主线程中运行,负责事件检测和分发,事件处理器在工作线程中运行,通常使用线程池 主线程专注于事件检测和分发,提高响应效率;工作线程并发处理事件,充分利用多核CPU 实现相对复杂;当事件量较大时,单Reactor线程可能会成为性能瓶颈
主从reactor多线程 主Reactor线程负责接受新连接,将连接分配给从Reactor线程;从Reactor线程各自运行事件循环,处理各自负责的连接的I/O事件 主线程和工作线程职责分离,性能更高;更适合高并发、大流量的场景 实现复杂度更高;多个Reactor线程会消耗更多的系统资源,可能会导致资源利用率下降

单Reactor多线程模型

概念

概念

  • 单Reactor多线程模型:在单Reactor单线程的模型增加了一个线程池,Reactor线程只接受网络I/O和连接请求,其他事件交给线程池的工作线程处理

具体流程

  1. 启动Reactor线程,将acceptor事件注册在I/O多路复用模型(select、epoll等)中
  2. Reactor线程监听客户端请求,处理连接请求,并将客户端事件(也就是handler)注册到I/O多路复用模型
  3. 当客户端事件发生时,将数据收发或者其他事件,放入任务队列(也就是交给工作线程)
  4. 最终将业务事件交给线程池中的工作线程处理

注意:Acceptor和handler组件可以抽象成一个事件类,因为他们其实都是事件的回调函数

单Reactor多线程模型

实现细节

  1. 实现一个基于非阻塞ET模式EPOLL的事件管理类(Reactor类)
  2. 将Acceptor和handler组件抽象成一个监听事件对象和业务处理事件对象,因此只需要实现一个事件类即可
  3. 将事件的属性信息作为成员封装成事件类
  4. 实现线程池并封装成类,将一个线程池作为对象放入Reactor类(用于将分发事件)
  5. 初始化监听事件对象(也就是Acceptor,可以像libevent库一样封装成类),将其注册在事件管理类(Reactor)中
  6. 启动Reactor事件循环,Acceptor将发起连接请求的客户端等信息初始化一个事件对象,然后注册到Reactor中,如果是断开连接请求,就将其从Reactor中删除
  7. 当客户端发送数据来时,事件被触发,Reactor将其事件对象中的回调函数,加入到任务队列中,分发给工作线程处理

组件实现

组件实现

  1. Reactor类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    //提前定义的事件类(将acceptor和handler可抽象成一个事件类)
    #define MAX_EVENTS 65535
    #define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000)

    class Event;


    class EventBase {
    public:
    EventBase();
    ~EventBase();

    void add_event(Event* ev); //注册事件到epoll中
    void del_event(Event* ev); //删除事件到epoll中
    void dispatch(struct timeval *tv); // 等同于libevent的event_base_dispatch
    void loopbreak(); // 等同于libevent的event_base_loopbreak
    Event* timer_event_new(Callback cb,void *ctx,int flag,int timeout_ms); //创建并初始化新定时事件

    private:
    int epfd_; //epoll文件描述符
    Threadpool *tpool; //线程池
    bool shutdown_; //终止变量
    struct epoll_event events_[MAX_EVENTS]; //存储事件发生数组
    };
  2. 事件类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    class Event {
    public:
    using Callback=std::function<void(Event*,void*)>; //定义回调函数类型
    Event(EventBase* base, int fd, uint32_t events, Callback cb,void *ctx); //事件初始化
    ~Event();

    EventBase* getbase(); //获取其处于的Reactor类对象指针
    int getfd() const; //获取事件文件描述符
    uint32_t getevents() const; //获取事件类型
    void handle_cb(); //调用回调函数
    //判断事件是否是定时事件
    bool is_timer() const;

    private:
    EventBase* base_; //所处于的Reactor类对象指针
    int fd_; //事件文件描述符
    uint32_t events_; //事件类型
    Callback cb_; //事件发生要调用的回调函数
    void *ctx_; //回调函数参数
    bool is_timer_; //定时事件判断位
    };
  3. 线程池类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    class Threadpool{
    public:
    Threadpool(int num):threads(std::vector<std::thread>(num)),min_thr_num(num),live_num(num){
    init();
    }
    ~Threadpool(){
    t_shutdown();
    }
    //初始化线程池
    void init();
    //销毁线程池并释放资源
    void t_shutdown();
    //各任务线程入口函数
    void t_task();
    //管理线程入口函数
    void adjust_task();
    //添加任务
    template<typename _Fn,typename... _Args>
    void add_task(_Fn&& fn,_Args&&... args){
    {
    auto f=std::bind(std::forward<_Fn>(fn),std::forward<_Args>(args)...);
    {
    std::unique_lock<std::mutex> lock(mx);
    if(shutdown) return;
    tasks.emplace(std::move(f));
    }
    task_cv.notify_one();
    }
    }
    private:
    std::thread adjust_thr; //管理线程
    std::vector<std::thread> threads; //线程数组
    std::queue<std::function<void()>> tasks; //任务队列
    std::mutex mx; //线程池锁
    std::condition_variable task_cv; //任务通知条件变量
    int min_thr_num=0; //线程池最小线程数
    int max_thr_num=0; //cpu核数的2n+1
    std::atomic<int> run_num=0; //线程池中正在执行任务的线程数
    std::atomic<int> live_num=0; //线程池空闲线程数
    std::atomic<int> exit_num=0; //线程池要销毁线程数
    bool shutdown=false; //线程池状态,true为运行,false为关闭
    };
  4. 接口定义:类似于libevent库中的接口提供,可以将以上类接口放入到私有成员,然后让用户使用以下列出的接口即可

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    //创建Reactor对象指针
    EventBase* event_base_new();

    // 开始事件循环
    void event_base_dispatch(EventBase* base,struct timeval *tv);

    // 停止事件循环
    void event_base_loopbreak(EventBase* base);

    // 创建新的事件
    Event* event_new(EventBase* base, int fd, uint32_t events, Event::Callback cb,void *ctx);

    // 添加事件
    void event_add(EventBase *base,Event* ev);

    // 删除事件
    void event_del(EventBase *base,Event* ev);

    // 释放事件
    void event_free(Event* ev);

    // 释放事件循环
    void event_base_free(EventBase* base);

    //提供创建定时事件
    Event *event_timer_new(EventBase *base,EventBase::Callback cb,void *ctx,int flag,int timeout_ms);

    //判断事件是否是定时事件
    bool is_event_timer(Event *ev);

注:线程池实现可参考本博客Moon’s Linux网络编程Moon’s C++多线程编程

具体实现

具体实现

  1. Reactor类EventBase

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    EventBase::EventBase() : epfd_(epoll_create1(0)), shutdown_(false) {
    //获取当前系统cpu数
    unsigned int cpu_cores = std::thread::hardware_concurrency();
    if (cpu_cores == 0) {
    cpu_cores = 4; // 默认值
    }
    tpool=new Threadpool(cpu_cores+1);
    if (epfd_ == -1) {
    perror("epoll_create1");
    exit(EXIT_FAILURE);
    }
    }

    //需要清理线程池和关闭epoll
    EventBase::~EventBase() {
    delete tpool;
    close(epfd_);
    }

    //注册事件(向epoll中添加)
    void EventBase::add_event(Event* event) {
    int fd = event->getfd();
    epoll_event ev;
    ev.data.ptr=event;
    ev.events = event->getevents();

    if (epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev) == -1) {
    perror("epoll_ctl: add");
    exit(EXIT_FAILURE);
    }
    }

    //删除事件(从epoll中删除)
    void EventBase::del_event(Event* event) {
    int fd = event->getfd();

    if (epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr) == -1) {
    perror("epoll_ctl: del");
    }

    }

    //事件循环,分发事件(将事件放入线程池中的任务队列)
    void EventBase::dispatch(struct timeval *tv) {
    //epoll_event epoll_events[MAX_EVENTS];
    int timeout=-1;
    if(tv!=nullptr){
    timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;
    }
    if(timeout>MAX_EPOLL_TIMEOUT_MSEC){
    timeout=MAX_EPOLL_TIMEOUT_MSEC;
    }

    while (!shutdown_) {
    //根据timeout设置检测间隔
    int n = epoll_wait(epfd_, events_, MAX_EVENTS, timeout);
    if (n == -1) {
    perror("epoll_wait");
    break;
    }

    for (int i = 0; i < n; ++i) {
    auto ev=static_cast<Event*>(events_[i].data.ptr);
    if(ev){
    tpool->add_task([&]{
    ev->handle_cb();
    });
    }
    }
    }
    }

    //创建并初始化新定时事件
    Event* EventBase::timer_event_new(Callback cb,void *ctx,int flag,int timeout_ms){
    //使用Linux提供的定时器接口,可以结合epoll使用
    int fd=timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK);
    if(-1==fd){
    perror("timerfd_create error");
    return nullptr;
    }
    itimerspec tvalue;
    //it_value设置定时器第一次到期时间
    tvalue.it_value.tv_sec=timeout_ms/1000; //初始化到期秒数
    tvalue.it_value.tv_nsec=(timeout_ms%1000)*1000000; //初始化到期纳秒数
    tvalue.it_interval={0,0};
    //it_interval设置定时器第一次之后每次到期的间隔时间,设置为0,定时器只会触发一次,非0为周期性触发
    if(flag){
    tvalue.it_interval.tv_sec=timeout_ms/1000; //间隔时间秒数
    tvalue.it_interval.tv_nsec=(timeout_ms%1000)*1000000; //间隔时间的纳秒数
    }

    if(timerfd_settime(fd,0,&tvalue,NULL)==-1){
    perror("timerfd_settime error");
    close(fd);
    return nullptr;
    }
    return new Event(this,fd,EPOLLIN,cb,ctx,true);
    }


    //终止事件循环
    void EventBase::loopbreak() {
    shutdown_ = false;
    }
  2. 事件类Event

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    Event::Event(EventBase* base, int fd, uint32_t events, Callback cb,void *ctx,bool istimer)
    : base_(base), fd_(fd), events_(events), cb_(cb),ctx_(ctx),is_timer_(istimer){
    //初始化就将事件放入对应的EventBase类对象中
    base_->add_event(this);
    }

    //析构自动调用删除事件操作
    Event::~Event() {
    base_->del_event(this);
    }

    //获取其EventBase类对象
    EventBase* Event::getbase(){
    return base_;
    }

    //获取事件文件描述符
    int Event::getfd() const {
    return fd_;
    }

    //获取事件的事件类型
    uint32_t Event::getevents() const {
    return events_;
    }

    //调用事件回调函数
    void Event::handle_cb() {
    if (cb_) {
    cb_(this,ctx_);
    }
    }

    //获取定时事件判断位
    bool Event::is_timer() const{
    return is_timer_;
    }
  3. 线程池类Threadpool

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    #include "Threadpool.h"
    #include <chrono>
    #include <mutex>
    #include <iostream>
    #include <unistd.h>




    void Threadpool::init(){
    unsigned int cpu_cores = std::thread::hardware_concurrency()/2;
    if(cpu_cores==0) cpu_cores=4;
    max_thr_num=2*cpu_cores+1;
    adjust_thr=std::thread([this]{
    this->adjust_task();
    });
    for(int i=0;i<min_thr_num;++i){
    threads.emplace_back([this]{
    this->t_task();
    });
    }
    }




    void Threadpool::t_task(){
    while (1) {
    std::unique_lock<std::mutex> lock(mx);
    task_cv.wait(lock,[this]{
    return !tasks.empty()||shutdown||exit_num>0;
    });
    if(exit_num>0){
    exit_num--;
    return;
    }
    if(shutdown&&tasks.empty()){
    return;
    }
    auto task=tasks.front();
    tasks.pop();
    lock.unlock();
    ++run_num;
    --live_num;
    task();
    ++live_num;
    --run_num;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    }


    void Threadpool::t_shutdown(){
    {
    std::unique_lock<std::mutex> lock(mx);
    shutdown=true;
    }
    adjust_thr.detach();
    task_cv.notify_all();
    for(auto& t:threads){
    if(t.joinable()) t.join();
    }
    }



    void Threadpool::adjust_task(){
    while (!shutdown) {
    std::this_thread::sleep_for(std::chrono::seconds(DEFAULT_TIME));
    {
    int size=threads.size();
    if (tasks.size() > live_num && live_num < max_thr_num&&size<max_thr_num) {
    int add = 0;
    std::unique_lock<std::mutex> lock(mx);
    for (int i = size; i < max_thr_num && add < 10; ++i) {
    threads.emplace_back([this] {
    this->t_task();
    });
    add++;
    live_num++;
    }
    lock.unlock();
    std::cout << "线程池扩容成功!" << std::endl;
    }
    if (run_num * 2 < live_num && live_num > min_thr_num) {
    exit_num=live_num-min_thr_num>=10?10:live_num-min_thr_num;
    int x=exit_num;
    std::unique_lock<std::mutex> lock(mx);
    for (int i = 0; i < x; ++i) {
    task_cv.notify_one();
    live_num--;
    }
    lock.unlock();
    std::cout << "线程池瘦身成功!" << std::endl;
    }
    }
    }
    }
  4. 接口实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    // 创建新的事件循环
    EventBase* event_base_new() {
    return new EventBase();
    }

    // 开始事件循环
    void event_base_dispatch(EventBase* base,struct timeval *tv) {
    base->dispatch(tv);
    }

    // 停止事件循环
    void event_base_loopbreak(EventBase* base) {
    base->loopbreak();
    }

    // 创建新的事件
    Event* event_new(EventBase* base, int fd, uint32_t events, Event::Callback cb,void *ctx) {
    return new Event(base, fd, events, cb,ctx);
    }


    // 添加事件
    void event_add(EventBase *base,Event* ev) {
    // 在Event的构造函数中已经添加
    base->add_event(ev);
    }

    // 删除事件
    void event_del(EventBase *base,Event* ev) {
    // 在Event的析构函数中已经删除
    base->del_event(ev);
    }

    // 释放事件
    void event_free(Event* ev) {
    delete ev;
    }

    // 释放事件循环
    void event_base_free(EventBase* base) {
    delete base;
    }

    //提供创建定时事件接口
    Event *event_timer_new(EventBase *base,EventBase::Callback cb,void *ctx,int flag,int timeout_ms){
    return base->timer_event_new(cb,ctx,flag,timeout_ms);
    }

    //判断事件是否是定时事件
    bool is_event_timer(Event *ev){
    return ev->is_timer();
    }

使用示例

使用示例

使用以上实现的接口写一个回声服务器(我将其封装到了event.h文件中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include "event.h"


void setnonblock(int fd){
fcntl(fd,F_SETFL, fcntl(fd,F_GETFL)|O_NONBLOCK);
}

void readCallback(Event *ev,void *args){
char buffer[1024];
int fd=ev->getfd();
while (true) {
ssize_t n = read(fd, buffer, sizeof(buffer));
if (n > 0) {
write(fd, buffer, n); // 回显
} else if (n == 0) {
std::cout << "Client disconnected: " << fd << std::endl;
event_free(ev);
close(fd);
break;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 数据读取完毕
} else {
perror("read");
event_free(ev);
close(fd);
break;
}
}
}
}


void acceptCallback(Event *ev,void *args) {
EventBase* base=ev->getbase();
int fd=ev->getfd();
while (true) {
sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(fd, (sockaddr*)&client_addr, &client_len);
if (client_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 没有更多的连接
} else {
perror("accept");
break;
}
}

std::cout << "New connection: " << client_fd << std::endl;
setnonblock(client_fd);
Event* client_event = event_new(base, client_fd, EPOLLIN|EPOLLET, readCallback,nullptr);
}
}


void time_hello_cb(Event *ev,void *args){
//用来取出定时器到期次数
uint64_t expirations;
ssize_t s = read(ev->getfd(), &expirations, sizeof(uint64_t));
if (s==-1) {
//对于非阻塞处理
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return;
} else {
perror("read");
exit(EXIT_FAILURE);
}
}
std::cout<<expirations<<":hello timer_event"<<std::endl;
}


void time_world_cb(Event *ev,void *args){
//用来取出定时器到期次数
uint64_t expirations;
ssize_t s = read(ev->getfd(), &expirations, sizeof(uint64_t));
if (s==-1) {
//对于非阻塞处理
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return;
} else {
perror("read");
exit(EXIT_FAILURE);
}
}
std::cout<<expirations<<":world timer_event"<<std::endl;
}

int main() {
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
setnonblock(listen_fd);
setreuse(listen_fd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(5005);
addr.sin_addr.s_addr = INADDR_ANY;

Bind(listen_fd, (sockaddr*)&addr, sizeof(addr));
listen(listen_fd, SOMAXCONN);

EventBase* base = event_base_new();

Event* listen_event = event_new(base, listen_fd, EPOLLIN|EPOLLET, acceptCallback,nullptr);
//5秒触发一次
Event* timer_event=event_timer_new(base,time_hello_cb,nullptr,1,5000);
//5s后只触发一次
Event* timer_ev=event_timer_new(base,time_world_cb,nullptr,0,5000);

//开启循环
struct timeval tv;
tv.tv_sec=1; //设置秒数
tv.tv_usec=0; //设置微妙
event_base_dispatch(base,&tv);

//释放
event_free(timer_event);
event_free(timer_ev);
event_free(listen_event);
event_base_free(base);

return 0;
}

主从Reactor多线程模型

概念

概念

  • 主从Reactor多线程模型:该模型是将职能划分给主Reactor和从Reactor(工作线程),主Reactor负责监听连接事件(不进行任何I/O处理和其他实质性事件),并将新连接对象和事件分发给从Reactor从Reactor负责处理和监听主Reactor分发的事件,这样能够最大限度提高接收连接的响应速度,避免被一些I/O或者其他事件操作所阻塞。
  • one loop one thread:这是一个常见的高性能网络编程的思想与架构(可见陈硕大佬的muduo库),意思就是一个事件循环(Reactor)对应着一个线程,也就是每一个线程都有一个属于自己的事件循环(Reactor),这个架构大大避免锁竞争、频繁上下文切换开销等问题,并充分利用多核 CPU 的性能
  • 本文也将借鉴muduo库,来实现主从Reactor多线程模型
1
2
3
4
5
6
7
8
9
10
11
12
one loop one thread:

+-----------------+
| 主线程(Acceptor)|
+-----------------+
|
+-------------+-------------+
| | |
+---------------+ +---------------+ +---------------+
| 工作线程1 | | 工作线程2 | | 工作线程3 |
| (Event Loop) | | (Event Loop) | | (Event Loop) |
+---------------+ +---------------+ +---------------+

具体架构

  1. 首先不管是主Reactor和从Reactor都是一个由事件循环类实现的
  2. 但是主Reactor不止有事件循环的职能,还需要有监听连接事件的类,也就是Acceptor类
  3. 因为one loop one thread,我们需要将从reactor与线程进行做一个封装,让主Reactor使用线程池更好管理,所以我们需要一个事件循环线程类
  4. 主Reactor还有事件分发的职能,因此应该需要主Reactor来管理从Reactor并将事件分发,因此我们需要一个事件循环线程池
  5. 最后还有个最基础的事件类(用于整合文件描述符、注册事件以及对应回调函数)

注:

  • 事件循环eventloop:也就是封装了select、poll或者epoll作为事件循环监听的基础,然后管理所监听的事件,监听事件触发,调用其对应的回调函数处理
  • Acceptor:用于封装监听套接字和其处理连接请求的回调函数的类
  • 事件循环线程loopthread:用于封装thread和事件循环的类
  • 事件循环线程池loopthreadpool:用于管理从Reactor(事件循环)的线程池,一个线程对应一个从Reactor

具体流程

  1. 将acceptor事件注册到主Reactor中
  2. 初始化事件循环线程池,各事件循环线程启动事件循环
  3. 启动主Reactor事件循环,循环监听acceptor事件的连接请求
  4. 触发acceptor事件,调用创建连接回调函数,将新连接分发给从Reactor
  5. 从Reactor将分发下来的新连接事件注册到事件循环中
  6. 当事件触发,从Reactor进行处理

组件实现

组件实现

  1. eveneloop(Reactor类)/事件循环类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    #ifndef _EVENTLOOP_H_
    #define _EVENTLOOP_H_


    #include "wrap.h"
    #include <atomic>
    #include <cstdlib>
    #include <iostream>
    #include <functional>
    #include <vector>
    #include <list>
    #include <unistd.h>
    #include <strings.h>
    #include <sys/epoll.h>
    #include <sys/eventfd.h>
    #include <sys/timerfd.h>


    #define MAX_EVENTS 65536


    namespace moon {

    class base_event;
    class event;
    class loopthread;

    //reactor类-->事件循环类
    class eventloop{
    public:
    using Callback=std::function<void()>;
    eventloop(loopthread* base=nullptr,int timeout=-1);
    ~eventloop();
    loopthread* getbaseloop();
    int getefd() const;
    int getevfd() const;
    //事件控制函数
    void add_event(event* event);
    void del_event(event* event);
    void mod_event(event* event);

    void loop();
    void loopbreak();

    void create_eventfd(); //创建通知文件描述符
    void read_eventfd(); //通知文件描述符回调函数
    void write_eventfd(); //唤醒通知事件,用于终止服务通知

    void add_pending_del(base_event* ev);


    private:
    int epfd_; //epoll文件描述符
    int eventfd_; //通知文件描述符
    int timeout_=-1;
    std::atomic<bool> shutdown_;
    std::list<event*> evlist_;
    std::vector<epoll_event> events_;
    std::vector<base_event*> delque_; //待删除事件队列
    loopthread *baseloop_;
    };
    }


    #endif
  2. event类/事件类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    #ifndef _EVENT_H_
    #define _EVENT_H_

    #include "wrap.h"
    #include <atomic>
    #include <cstdlib>
    #include <iostream>
    #include <functional>
    #include <list>
    #include <unistd.h>
    #include <strings.h>
    #include <sys/epoll.h>
    #include <sys/eventfd.h>
    #include <sys/timerfd.h>

    namespace moon{

    class eventloop;


    //事件类
    class event{
    public:
    // using Callback=std::function<void(event*)>;
    using Callback=std::function<void()>;
    event(eventloop *base,int fd,uint32_t events);
    ~event();
    int getfd() const; //获取事件文件描述符
    uint32_t getevents() const; //获取获取监听事件类型
    eventloop* getloop() const;
    //设置回调函数,不需要传null即可
    void setcb(const Callback &rcb,const Callback &wcb,const Callback &ecb);
    void setrcb(const Callback &rcb);
    void setwcb(const Callback &wcb);
    void setecb(const Callback &ecb);
    Callback getrcb();
    Callback getwcb();
    Callback getecb();
    void setrevents(const uint32_t revents); //设置触发事件类型
    void enable_events(uint32_t op); //添加监听事件类型
    void disable_events(uint32_t op); //取消监听事件类型
    void update_ep(); //更新监听事件
    void handle_cb(); //处理事件回调函数

    bool readable();
    bool writeable();
    void enable_read();
    void disable_read();
    void enable_write();
    void disable_write();
    void enable_ET();
    void disable_ET();
    void reset_events();
    void del_listen(); //取消监听
    void enable_listen(); //开启监听
    void disable_cb();
    void close(){
    del_listen();
    }
    private:
    eventloop *loop_;
    int fd_;
    uint32_t events_; //监听事件
    uint32_t revents_; //触发事件
    Callback readcb_; //读事件回调函数
    Callback writecb_; //写事件回调函数
    Callback eventcb_; //出了读写事件的其他事件触发回调函数,用作错误事件回调
    };

    }


    #endif
  3. loopthread类/事件循环线程类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    #ifndef _LOOPTHREAD_H_
    #define _LOOPTHREAD_H_


    #include <condition_variable>
    #include <mutex>
    #include <thread>



    #define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000)

    namespace moon {

    class eventloop;

    class loopthread{
    public:
    loopthread(int timeout=-1)
    :loop_(nullptr),t_(std::thread(&loopthread::_init_,this)),timeout_(timeout){
    if(timeout_>MAX_EPOLL_TIMEOUT_MSEC){
    timeout_=MAX_EPOLL_TIMEOUT_MSEC;
    }
    }
    ~loopthread();
    void _init_(); //初始化
    eventloop* getloop(); //获取loop_
    void join(){
    if(t_.joinable()) t_.join();
    }
    private:
    eventloop *loop_;
    std::mutex mx_;
    std::condition_variable cv_;
    std::thread t_;
    int timeout_=-1; //设置epoll间隔检测时间ms
    };

    }


    #endif // !_LOOPTHREAD_H_
  4. looptpool类/事件循环线程池类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    #ifndef _LOOPTPOOL_H_
    #define _LOOPTPOOL_H_



    #include <thread>
    #include <vector>
    #include <set>



    namespace moon {

    class eventloop;
    class loopthread;


    class looptpool{
    public:
    looptpool(eventloop *base);
    ~looptpool();
    void create_pool(int n,int timeout=-1); //初始化线程池
    eventloop* ev_dispatch(); //分发事件
    looptpool(const looptpool&)=delete;
    looptpool& operator=(const looptpool&)=delete;
    void addloop(); //添加事件循环
    void stop(); //终止运行
    private:
    void init_pool(int timeout=-1);
    //设置线程池线程数
    void settnum(int n){
    t_num=n;
    }

    private:
    eventloop *baseloop_;
    std::vector<eventloop*> loadvec_;
    int next_=0;
    int t_num=0;
    int timeout_=-1;
    };

    }




    #endif
  5. acceptor类/监听者类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    #ifndef _ACCEPTOR_H_
    #define _ACCEPTOR_H_

    #include <functional>


    namespace moon {

    class eventloop;
    class event;

    //监听连接类
    class acceptor{
    public:
    using Callback=std::function<void(int)>;
    acceptor(int port,eventloop *base);
    ~acceptor();
    void listen(); //开始监听
    void stop(); //停止监听
    void init_sock(int port); //建立监听套接字
    void setcb(const Callback &accept_cb); //设置回调函数
    void handle_accept(); //acceptor事件回调函数,用来接收连接
    private:
    int lfd_;
    eventloop *loop_;
    event *ev_;
    Callback cb_; //连接后回调函数
    bool shutdown_;
    };

    }

    #endif
  6. 工具头文件/wrap.h实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    #ifndef _WRAP_H_
    #define _WRAP_H_

    #include <netinet/in.h>
    #include <cstddef>
    #include <cstdlib>
    #include <netinet/tcp.h>
    #include <fcntl.h>
    #include <cstdio>

    void perr_exit(const char *s);
    void settcpnodelay(int fd);
    void setreuse(int fd);
    void setnonblock(int fd);
    int Bind(int fd, const struct sockaddr *sa, socklen_t salen);
    int Listen(int fd, int backlog);
    int Socket(int family, int type, int protocol);

    #endif

具体实现

具体实现

  1. eventloop类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    #include "eventloop.h"
    #include "event.h"
    #include <algorithm>

    using namespace moon;

    eventloop::eventloop(loopthread* base,int timeout)
    :baseloop_(base),timeout_(timeout),shutdown_(false),events_(MAX_EVENTS){
    epfd_=epoll_create1(0);
    if(-1==epfd_){
    perror("epoll_create1");
    exit(EXIT_FAILURE);
    }
    create_eventfd();
    }


    eventloop::~eventloop(){
    loopbreak();
    for(auto &ev : evlist_){
    delete ev;
    }
    for(auto &ev:delque_){
    delete ev;
    }
    evlist_.clear();
    delque_.clear();
    close(epfd_);
    close(eventfd_);
    }



    loopthread* eventloop::getbaseloop(){
    if(!baseloop_) return nullptr;
    return baseloop_;
    }


    int eventloop::getefd() const{
    return epfd_;
    }


    int eventloop::getevfd() const{
    return eventfd_;
    }



    void eventloop::add_event(event* event){
    if(std::find(evlist_.begin(), evlist_.end(),event)!=evlist_.end()){
    mod_event(event);
    return;
    }
    int fd=event->getfd();
    struct epoll_event ev;
    ev.data.ptr=event;
    //ev.data.fd=fd;
    ev.events=event->getevents();

    if(epoll_ctl(epfd_,EPOLL_CTL_ADD,fd,&ev)==-1){
    perror("epoll_ctl add error");
    }
    evlist_.emplace_back(event);
    }


    void eventloop::del_event(event* event){
    if(std::find(evlist_.begin(), evlist_.end(),event)==evlist_.end()) return;
    int fd=event->getfd();

    if(epoll_ctl(epfd_,EPOLL_CTL_DEL,fd,nullptr)==-1){
    perror("epoll_ctl del error");
    }
    evlist_.remove(event);
    }


    void eventloop::mod_event(event* event){
    if(std::find(evlist_.begin(), evlist_.end(),event)==evlist_.end()) return;
    int fd=event->getfd();
    struct epoll_event ev;
    ev.data.ptr=event;
    ev.events=event->getevents();

    if(epoll_ctl(epfd_,EPOLL_CTL_MOD,fd,&ev)==-1){
    perror("epoll_ctl modify error");
    }
    }



    void eventloop::loop(){
    while (!shutdown_) {
    int n=epoll_wait(epfd_,events_.data(),MAX_EVENTS,timeout_);
    if(-1==n){
    if(errno==EINTR) continue;
    perror("epoll_wait");
    break;
    }
    for(int i=0;i<n;++i){
    auto ev=static_cast<event*>(events_[i].data.ptr);
    ev->setrevents(events_[i].events);
    ev->handle_cb();
    }
    if(n==events_.size()){
    events_.resize(events_.size() * 2);
    }
    if(!delque_.empty()){
    for(auto ev : delque_){
    delete ev;
    }
    delque_.clear();
    }
    }
    }


    //终止事件循环
    void eventloop::loopbreak(){
    if(shutdown_) return;
    write_eventfd(); //唤醒
    }



    void eventloop::create_eventfd(){
    eventfd_=eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK);
    if(eventfd_<0){
    perror("eventfd create error");
    exit(EXIT_FAILURE);
    }
    event *ev=new event(this,eventfd_,EPOLLIN);
    ev->setcb(std::bind(&eventloop::read_eventfd,this),NULL,NULL);
    add_event(ev);
    }


    void eventloop::read_eventfd(){
    uint64_t opt=1;
    ssize_t n=read(eventfd_,&opt,sizeof(opt));
    shutdown_=true;
    if(n<0){
    if(errno==EINTR || errno == EAGAIN){
    return;
    }
    perror("eventfd read error");
    exit(EXIT_FAILURE);
    }
    }



    void eventloop::write_eventfd(){
    uint64_t opt=1;
    ssize_t n=write(eventfd_,&opt,sizeof(opt));
    if(n<0){
    if(errno==EINTR){
    return;
    }
    perror("eventfd write error");
    exit(EXIT_FAILURE);
    }
    }


    void eventloop::add_pending_del(base_event *ev) {
    delque_.emplace_back(ev);
    }
  2. event类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    #include "event.h"
    #include "eventloop.h"



    using namespace moon;

    event::event(eventloop *base,int fd,uint32_t events)
    :loop_(base),fd_(fd),events_(events){
    }

    event::~event(){
    }

    int event::getfd() const{
    return fd_;
    }

    uint32_t event::getevents() const{
    return events_;
    }


    eventloop* event::getloop() const{
    return loop_;
    }


    void event::setcb(const Callback &rcb,const Callback &wcb,const Callback &ecb){
    readcb_=rcb;
    writecb_=wcb;
    eventcb_=ecb;
    }



    void event::setrcb(const Callback &rcb){
    readcb_=rcb;
    }


    void event::setwcb(const Callback &wcb){
    writecb_=wcb;
    }


    void event::setecb(const Callback &ecb){
    eventcb_=ecb;
    }

    void event::setrevents(const uint32_t revents){
    revents_=revents;
    }

    void event::update_ep(){
    loop_->mod_event(this);
    }

    void event::enable_events(uint32_t op){
    events_|=op;
    update_ep();
    }

    void event::disable_events(uint32_t op){
    events_&=~op;
    update_ep();
    }



    bool event::readable(){
    return (events_&EPOLLIN);
    }


    bool event::writeable(){
    return (events_&EPOLLOUT);
    }


    void event::enable_read(){
    enable_events(EPOLLIN);
    }


    void event::disable_read(){
    disable_events(EPOLLIN);
    }


    void event::enable_write(){
    enable_events(EPOLLOUT);
    }


    void event::disable_write(){
    disable_events(EPOLLOUT);
    }


    void event::enable_ET(){
    enable_events(EPOLLET);
    }


    void event::disable_ET(){
    disable_events(EPOLLET);
    }


    void event::reset_events(){
    events_=0;
    update_ep();
    }


    void event::del_listen(){
    loop_->del_event(this);
    }


    void event::enable_listen(){
    loop_->add_event(this);
    }



    void event::disable_cb() {
    readcb_= nullptr;
    writecb_= nullptr;
    eventcb_= nullptr;
    }

    void event::handle_cb(){
    if((revents_ & EPOLLIN) || (revents_ & EPOLLRDHUP) || (revents_ & EPOLLPRI))
    {
    if(readcb_) readcb_();
    }
    if(revents_&EPOLLOUT){
    if(writecb_) writecb_();
    }
    if(revents_ & (EPOLLERR|EPOLLHUP)){
    if(eventcb_) eventcb_();
    }
    }



    event::Callback event::getrcb() {
    return readcb_;
    }


    event::Callback event::getwcb() {
    return writecb_;
    }


    event::Callback event::getecb() {
    return eventcb_;
    }
  3. loopthread类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    #include "loopthread.h"
    #include "eventloop.h"

    using namespace moon;


    loopthread::~loopthread(){
    if(loop_){
    loop_->loopbreak();
    if(t_.joinable()) t_.join();
    delete loop_;
    }else{
    if(t_.joinable()) t_.join();
    }
    }



    void loopthread::_init_(){
    eventloop *loop=new eventloop(this,timeout_);
    {
    std::unique_lock<std::mutex> lock(mx_);
    loop_=loop;
    cv_.notify_all();
    }
    loop_->loop();
    }



    eventloop* loopthread::getloop(){
    eventloop *ep=nullptr;
    {
    std::unique_lock<std::mutex> lock(mx_);
    cv_.wait(lock,[&](){
    return loop_!=nullptr;
    });
    ep=loop_;
    }
    return ep;
    }
  4. looptpool类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    #include "looptpool.h"
    #include "eventloop.h"
    #include "loopthread.h"

    using namespace moon;


    looptpool::looptpool(eventloop *base)
    :baseloop_(base){
    };


    looptpool::~looptpool(){
    stop();
    for(auto &t:loadvec_){
    delete t->getbaseloop();
    }
    loadvec_.clear();
    t_num=0;
    }



    void looptpool::init_pool(int timeout){
    timeout_=timeout;
    for(int i=0;i<t_num;++i){
    loopthread* lt=new loopthread(timeout);
    loadvec_.emplace_back(lt->getloop());
    }
    }



    void looptpool::create_pool(int n,int timeout){
    settnum(n);
    init_pool(timeout);
    }


    eventloop* looptpool::ev_dispatch(){
    if(t_num==0) return baseloop_;
    next_=(next_+1)%t_num;
    return loadvec_[next_];
    }





    void looptpool::addloop(){
    loopthread* lt=new loopthread(timeout_);
    loadvec_.emplace_back(lt->getloop());
    ++t_num;
    }




    void looptpool::stop() {
    for(auto& ep : loadvec_) {
    ep->loopbreak();
    ep->getbaseloop()->join();
    }
    }
  5. acceptor类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    #include "acceptor.h"
    #include "event.h"
    #include "eventloop.h"
    #include "wrap.h"

    using namespace moon;

    acceptor::acceptor(int port,eventloop *base)
    :loop_(base),shutdown_(true){
    if(port!=-1)
    init_sock(port);
    }


    acceptor::~acceptor(){
    stop();
    delete ev_;
    close(lfd_);
    }




    void acceptor::init_sock(int port){
    int fd=Socket(AF_INET,SOCK_STREAM,0);
    setnonblock(fd);
    setreuse(fd);

    struct sockaddr_in ser_addr;
    bzero(&ser_addr,sizeof(ser_addr));
    ser_addr.sin_family=AF_INET;
    ser_addr.sin_addr.s_addr=INADDR_ANY;
    ser_addr.sin_port=htons(port);

    Bind(fd,(struct sockaddr*)&ser_addr,sizeof(ser_addr));
    Listen(fd,SOMAXCONN);

    lfd_=fd;
    }



    void acceptor::setcb(const Callback &accept_cb){
    cb_=accept_cb;
    }


    void acceptor::handle_accept(){
    struct sockaddr_in cli_addr;
    socklen_t cli_len = sizeof(cli_addr);
    bzero(&cli_addr,cli_len);
    while(true){
    int cfd=accept(lfd_,(struct sockaddr*)&cli_addr,&cli_len);
    if (cfd == -1) {
    if (errno == EAGAIN || errno == EWOULDBLOCK) {
    break; // 没有更多的连接
    } else {
    perror("accept");
    break;
    }
    }
    setnonblock(cfd);
    settcpnodelay(cfd);
    if(cb_) cb_(cfd);
    }

    }


    void acceptor::listen(){
    if(!shutdown_) return;
    ev_=new event(loop_,lfd_,EPOLLIN|EPOLLET);
    ev_->setcb(std::bind(&acceptor::handle_accept,this),NULL,NULL);
    loop_->add_event(ev_);
    shutdown_=false;
    }


    void acceptor::stop(){
    if(shutdown_) return;
    ev_->del_listen();
    shutdown_=true;
    }
  6. wrap.cpp

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    #include "wrap.h"

    void perr_exit(const char *s){
    perror(s);
    exit(1);
    }

    void settcpnodelay(int fd){
    int opt=1;
    int ret=setsockopt(fd,IPPROTO_TCP,TCP_NODELAY,&opt,sizeof(opt));
    if(-1==ret){
    perr_exit("setsockopt tcp nodelay error");
    }
    }

    void setreuse(int fd){
    int opt=1;
    int ret=setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
    if(-1==ret){
    perr_exit("setsockopt ipreuse error");
    }
    ret=setsockopt(fd,SOL_SOCKET,SO_REUSEPORT,&opt,sizeof(opt));
    if(-1==ret){
    perr_exit("setsockopt portreuse error");
    }
    }

    void setnonblock(int fd){
    fcntl(fd,F_SETFL, fcntl(fd,F_GETFL)|O_NONBLOCK);
    }

    int Bind(int fd, const struct sockaddr *sa, socklen_t salen){
    int n;

    if((n=bind(fd,sa,salen))<0){
    perr_exit("bind error");
    }

    return n;
    }

    int Listen(int fd, int backlog){
    int n;

    if((n=listen(fd,backlog))<0){
    perr_exit("listen error");
    return -1;
    }

    return n;
    }

    int Socket(int family, int type, int protocol){
    int n;
    if((n=socket(family,type,protocol))<0){
    perr_exit("socket error");
    return -1;
    }

    return n;
    }

使用示例

使用示例:使用这些组件实现一个简单的主从reactor多线程模型的tcp回声服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include "eventloop.h"
#include "event.h"
#include "acceptor.h"
#include "looptpool.h"
#include "wrap.h"
#include <iostream>
#include <unistd.h>
#include <cstring>

using namespace moon;


void handle_read(event* conn_event){
char buf[1024];
int fd = conn_event->getfd();
while (true) {
ssize_t n = read(fd, buf, sizeof(buf));
if (n > 0) {
// 回显数据给客户端
ssize_t sent = 0;
while (sent < n) {
ssize_t m = write(fd, buf + sent, n - sent);
if (m > 0) {
sent += m;
} else if (m == -1 && errno == EAGAIN) {
// 写缓冲区已满,稍后再试
break;
} else {
// 发生错误,关闭连接
perror("write error");
conn_event->del_listen();
delete conn_event;
close(fd);
return;
}
}
} else if (n == 0) {
// 客户端关闭连接
std::cout << "Client disconnected, fd: " << fd << std::endl;
conn_event->del_listen();
delete conn_event;
close(fd);
return;
} else {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 数据读取完毕
break;
} else {
// 发生错误,关闭连接
perror("read error");
conn_event->del_listen();
delete conn_event;
close(fd);
return;
}
}
}
}



int main() {
// 定义监听端口
int port = 5005; // 可以根据需要更改端口号
// 定义工作线程数量
int thread_num = 4; // 根据需要调整线程数量

// 创建主事件循环(主 Reactor)
eventloop *main_loop = new moon::eventloop();

// 创建监听器(acceptor),设置监听端口和主事件循环
acceptor *acceptor = new moon::acceptor(port, main_loop);

// 创建事件循环线程池(从 Reactor)
looptpool *loop_pool = new moon::looptpool(main_loop);
loop_pool->create_pool(thread_num);

// 设置 acceptor 的回调函数,当有新连接时调用
acceptor->setcb([loop_pool](int fd) {
// 从线程池中获取一个工作事件循环
eventloop *worker_loop = loop_pool->ev_dispatch();

// 在工作事件循环中为客户端连接创建一个事件
event *conn_event = new event(worker_loop, fd, EPOLLIN | EPOLLET);

// 设置读事件的回调函数,实现回声功能
conn_event->setrcb(std::bind(handle_read,conn_event));

// 将事件添加到工作事件循环中
worker_loop->add_event(conn_event);
});

// 开始监听
acceptor->listen();

// 运行主事件循环
main_loop->loop();

// 清理资源
delete acceptor;
delete loop_pool;
delete main_loop;

return 0;
}

MoonNet网络库

MoonNetMoonNet 是一个专注于基于 Reactor 的高性能服务端网络库,提供基于主从 Reactor 多线程模型的服务端模块。它利用 Linux 的 epoll 机制,结合多线程和事件驱动设计,提供高效、可扩展的网络通信能力。MoonNet 支持 TCP 和 UDP 协议,并内置信号处理和定时器功能,适用于构建高并发、低延迟的服务器应用。

关于:MoonNet是我实现的高性能专注于linux服务端的网络库,该库将扩展以上讲的所有内容和代码,可供大家学习,如果感兴趣和喜欢的可以给我点个star

地址https://github.com/MoonforDream/MoonNet


Reactor模型
https://moonfordream.github.io/posts/Reactor模型/
作者
Moon
发布于
2024年10月27日
许可协议