Linux网络编程

Linux C/C++网络编程

Socket编程

套接字概念

概念

  • socket(套接字)可以看成是用户进程与内核网络协议栈的编程接口

    image-20240420192138215

  • socket不仅可以用于本机的进程间通信,还可以用于网络上不同主机的进程间通信

  • socket(套接字)一定是成对出现的

  • 一个套接字其实就是由一个文件描述符指向的(原理图中的sfd和cfd表示的就是文件描述符),因此socket通信中也可以使用I/O方法和函数

  • 文件描述符

    1. 是一个非负整数,并且用于代表一个打开的文件
    2. 文件描述符默认产生3个分别为0:STDIN_FILENO(标准输入)、1:STDOUT_FILENO(标准输出)、2:STDERR_FILENO(标准错误)
    3. 遵循新建文件描述符时选择最小可选择的,因此在没有新建文件描述符的打开一个文件产生的是fd为3的文件描述符
    4. 总共最多1024个文件描述符。也就是1023为最后一个文件描述符

image-20240420190544388

字节序

概念

  • 字节序存储数据所遵循的一套规则
  • 字节序分为大端序和小端序
  • 当我们传输数据时如果双方的字节序的不同就会造成发送的数据和接收数据不一致,所以我们得去关注字节序问题
  • 字节序问题是一个历史遗留的问题,当年IBM也就是第一个数据存储公司,采用了大端序作为数据存储的规则,而Intel公司想要不一样导致了Intel使用了小端序的方法,因此导致了现在使用Intel、AMD等CPU的主机使用的是小端序存储,而网络协议族(TCP、IP等)使用大端序存储,导致网络通信上主机发送数据到网络上导致数据的不一致问题!
  • 网络字节序规定为大端序,也就是网络中存储采用的字节序方法
  • 主机字节序根据CPU决定,一般为小端序,Intel、AMD采用的是小端序,表示主机存储采用的字节序方法
  • 当主机字节序和网络字节序不同时,要传输数据就需要进行字节序的转换

字节序

  • 大端序高位字节存在低位地址,低位字节存在高位地址

    例如:有一个十六位进制数0x12345678,利用大端序来存储

    0x00000001(低位地址) 存 0x12(高位字节)

    0x00000002 存 0x34

    ….

  • 小端序高位字节存在高位地址,低位字节存在低位地址

    例如:有一个十六位进制数0x12345678,利用小端序来存储

    0x00000001(低位地址) 存 0x78(低位字节)

    0x00000002 存 0x56

    ….

字节序转换函数

  • uint32_t htonl(uint32_t hostlong)将主机字节序转网络字节序(也就是小端序转大端序),用来进行转换IP地址的字节序(针对IP协议),无符号32位整型数通常对应着ip地址,对于一般的点分10进数的ip地址(例如192.168.1.110)是string类型需要用atoi()转换为无符号整型,所以不推荐使用这个转换函数

    1
    2
    3
    4
    #include <arpa/inet.h>

    uint32_t htonl(uint32_t hostlong);
    hostlong->表示要转换的主机字节序形式的IP
  • uint16_t htons(uint16_t hostshort):也是将主机字节序转换为网络字节序,但是用来进行转换端口号的字节序(针对端口号),无符号16位整型数通常对应端口号

    1
    2
    3
    4
    #include <arpa/inet.h>

    uint16_t htonl(uint16_t hostshort);
    hostlong->表示要转换的主机字节序形式的端口
  • uint32_t ntohl(uint32_t netlong):将网络字节序转换为主机字节序(也就是大端序转换为小端序),用来进行转化IP地址的字节序

    1
    2
    3
    4
    #include <arpa/inet.h>

    uint32_t ntohl(uint32_t netlong);
    netlong->表示要转换的网络字节序形式的IP
  • uint16_t ntohs(uint16_t netshort):也是将网络字节序转换为主机字节序,但是用来进行转化端口号字节序的

    1
    2
    3
    4
    #include <arpa/inet.h>

    uint16_t ntohs(uint16_t netshort);
    netshort->表示要转换的网络字节序形式的端口号

注:

  • 使用以上函数需要包含头文件#include <arpa/inet.h>
  • h表示host(主机),n表示network(网络),l表示32为长整数,s表示16位短整数

IP地址转换函数

  • int inet_pton(int af,const char *src,void *dst):用于将点分十进制的IP地址转换为网络字节序的IP地址

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #include <arpa/inet.h>

    int inet_pton(int af,const char *src,void *dst);

    //参数:
    af:表示IP协议(ipv4,ipv6),填AF_INET(IPv4)或AF_INET6(IPv6);
    src:传入,IP地址(点分十进制);
    dst:传出,转换后的网络字节序的IP地址;

    //返回值:
    1:转换成功;
    0:转换异常,说明src指向的不是一个有效的IP地址;
    -1:转换失败;
  • const char *inet_ntop(int af,const void *src,char *dst,socklen_t size):用于将网络字节序的IP地址转换为主机字节序的点分十进制IP地址(也就是string类型的)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #include <arpa/inet.h>

    const char *inet_ntop(int af,const void *src,char *dst,socklen_t size);

    //参数:
    af:表示IP协议(ipv4,ipv6),填AF_INET(IPv4)或AF_INET6(IPv6);
    src:网络字节序IP地址;
    dst:传出,主机字节序IP地址(string);
    size:dst的大小;

    //返回值:
    dst:转换成功返回;
    NULL:转换失败返回;
  • struct hostent *gethostbyname(const char *name):用于将域名、主机名、字符串IP转换为网络字节序IP地址,用于客户端程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    #include <netdb.h>

    struct hostent *gethostbyname(const char *name);
    name->IP地址

    //返回一个hostent结构体
    struct hostent{
    char *h_name; //主机名
    char **h_aliases; //主机所有别名构成的字符串数组,同一IP可绑定多个域名
    short h_addrtype; //主机IP地址的类型
    short h_length; //主机IP地址长度,IPv4为4,IPv6为6
    char **h_addr_list; //主机的IP地址,以网络字节序存储
    };

    //返回值
    nullptr:转换失败;
    hostent结构体:转换成功


    //客户端通信使用
    #define h_addr h_addr_list[0]
    struct sockaddr_in s;
    //将转换后的网络字节序IP地址复制到sockaddr_in结构体的sin_addr成员中
    memcpy(&s.sin_addr,h->h_addr,h->h_length);
  • in_addr_t inet_addr(const char *cp):将字符串格式也就是点分十进制的IP地址转换为网络字节序IP地址

  • char *inet_ntoa(struct in_addr in)将网络字节序的IP地址转换为字符串格式IP地址,用于服务端解析客户段IP地址

sockaddr数据结构

概念

  • 在实现网络通信中的接口时,我们常常得使用他们给定的特定结构体作为参数进行传入,而sockaddr结构体就是最早的结构体

网络结构体

  • sockaddr结构体用于存放协议族、端口和地址信息的结构体,客户端和服务端中的接口函数需要使用这个结构体,但是这个结构体将端口和地址信息都用同一个变量进行存储

    1
    2
    3
    4
    struct sockaddr{
    sa_family_t sin_family; //协议族,一般填AF_INET,IPv6填AF_INET6
    char sa_daat[14]; //14字节,包含目标地址和端口号
    };
  • sockaddr_in结构体现在一般都使用这个结构体,他和sockaddr结构体的大小一样,只是将目标IP地址和端口号单独分出来,但是网络通信的接口都是用sockaddr作为参数,因此需要强转为sockaddr结构体

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    #include <netinet/in.h>

    struct sockaddr_in{
    sa_family_t sin_family; //协议族,一般填AF_INET,IPv6填AF_INET6
    in_port_t sin_port; //网络字节序的端口号
    struct in_addr sin_addr; //网络字节序的IP地址
    char sin_zero[8]; //未使用,为了保持与sockaddr一样的长度所添加的
    };

    struct in_addr{
    uint32_t s_addr; //网络字节序的IP地址,32位地址
    };

    //初始化
    struct sockaddr_in s;
    s.sin_family=AF_INET or AF_INET6;
    s.sin_port=htons(port); //将主机字节序端口转换为网络字节序再赋值
    //如果IP地址是整形的主机字节序IP地址
    s.sin_addr.s_addr=htonl(IP);
    //如果IP地址是整形的点分十进制IP地址
    int dst;
    inet_pton(AF_INET,IP,(void *)&dst);
    s.sin_addr.s_addr=dst;

    s.sin_addr.s_addr=inet_addr(IP);

    >>重点
    /*【重点】*/
    //无论IP地址啥格式直接使用下面这段代替
    //使用宏INADDR_ANY,会自动取出系统中有效的任意IP地址,是主机字节序形式的
    s.sin_addr.s_addr=htonl(INADDR_ANY); //将主机字节序转换为网络字节序


    //强转sockaddr
    (struct sockaddr *)&s;
  • sockaddr_in6:IPv6的sockaddr_in结构体

  • sockaddr_un:本地套接字所用

image-20240421163807676

网络套接字函数

socket模型创建流程

服务端流程

  1. socket()创建一个套接字s1
  2. 使用bind()函数绑定IP地址和port端口号
  3. 使用listen()设置同时与服务器建立连接的上限
  4. accept()阻塞监听客户端连接,如果连接成功,释放s1,让s1继续监听客户端连接,返回一个新的套接字s2
  5. 使用read()、recv()读取客户端发送过来的请求与数据
  6. 使用write()、send()回应客户端请求与数据
  7. read()函数读到0时终止与客户端的连接
  8. 使用close()关闭服务端连接

客户端流程

  1. socket()创建套接字s3
  2. 使用connect()将套接字s3绑定IP地址和port端口,并与客户端连接
  3. 使用write()、send()向服务端发送请求和数据
  4. 使用read()、recv()接收服务端的响应和回应
  5. 使用close()关闭连接

socket模型创建总体流程

image-20240421171012820

注:

  • 总体流程一共创建了3个socket套接字
  • read和write函数是对文件IO的API,send和recv是socket编程的API,因为socket实质也是一个文件描述符,所以也可以使用read和write进行发送以及接收数据

函数和方法

概念:网络通信所需要使用的函数和接口

函数和方法

  • int socket(int domain,int type,int protocol)创建一个套接字
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <sys/socket.h>

int socket(int domain,int type,int protocol);

//参数
domain:指定IP地址协议,取值为:AF_INET(IPv4)、AF_INET6(IPv6)、AF_UNIX(本地套接字);

type:选用的数据传输协议,取值为:SOCK_STREAM(流式传输)、SOCK_DGRAM(报式传输);

protocol:表示选用的协议当中代表协议,一般传0,流式传输协议代表协议为TCP,报式传输协议代表为UDP,也可以直接填IPPROTO_TCP或者IPPROTO_UDP;

//返回值
成功:新套接字所对应的文件描述符;

失败:-1 errno;
  • int bind(int sockfd,const struct sockaddr *addr,socklen_t addrlen)给socket套接字绑定一个地址结构(IP地址+端口号)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <sys/socket.h>

int bind(int sockfd,const struct sockaddr *addr,socklen_t addrlen);

//参数
sockfd:要绑定的socket的文件描述符,也就是socket()返回的值;

addr:传入参数,sockaddr结构体,用于传入要绑定的IP地址和端口号,但是我们一般使用sockaddr_in所以需要强转,(struct sockaddr *)&s来强转;

addrlen:地址结构体(addr)的大小;


//返回值
成功:0;

失败:-1,errno;
  • int listen(int sockfd,int backlog)设置同时与服务器建立连接的上限数(同时进行3次握手的客户端数量)
1
2
3
4
5
6
7
8
9
10
11
12
13
#include <sys/socket.h>

int listen(int sockfd,int backlog);

//参数
sockfd:要绑定的socket的文件描述符,也就是socket()返回的值;

backlog:上限数值,最大值128;

//返回值
成功:0;

失败:-1,errno;
  • 【重点】int accept(int sockfd,struct sockaddr *addr,socklen_t *addrlen):*阻塞等待客户端建立连接,成功返回一个与客户端连接的sockfd(socket文件描述符)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <sys/socket.h>

int accept(int sockfd,struct sockaddr *addr,socklen_t *addrlen);


//参数
sockfd:socket的文件描述符,也就是socket()返回的值;

addr:传出参数,sockaddr结构体,用于传出成功与服务器建立连接的那个客户端的地址结构(IP地址和端口号);

addrlen:传入传出参数,传入是地址结构体(addr)的大小,传出是客户端addr实际大小;


//返回值
成功:返回一个与客户端连接的sockfd;

失败:-1,errno;
  • int connect(int sockfd,const struct sockaddr *addr,socklen_t addrlen)客户端使用函数,使用现有的socket与服务器建立连接可以不需要使用bind函数绑定客户端地址结构,因为他会采用”隐式绑定”地址结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <sys/socket.h>

int connect(int sockfd,const struct sockaddr *addr,socklen_t addrlen);

//参数
sockfd:客户端的socket的文件描述符,也就是socket()返回的值;

addr:传入参数,要连接的服务器的地址结构(IP+port);

addrlen:服务器地址结构的大小;

//返回值
成功:0;

失败:-1,errno;
  • ssize_t read(int fd,void *buf,size_t count):原本是用于文件读取的操作,但是也可以用与socket的接收以及读取数据的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <unistd.h>

ssize_t read(int fd,void *buf,size_t count);

//参数
fd:要操作的文件描述符;

buf:是一个指向读或写数据的缓冲区指针;

count:缓冲区长度;


//返回值
成功:返回读到的字节数,0,表示已经到达文件末尾;

失败:-1,errno;
-1:如果read返回-1并且errno等于EAGIN或EWOULDBLOCK,说明不是read失败而是read以非阻塞读文件并且文件无数据


//创建缓冲区,可以使用BUFSIZ,BUFSIZ=4096
char buf[BUFSIZ];
  • ssize_t write(int fd,const void *buf,size_t count)用于进行文件写数据的操作,但是也可以用于socket通信的发送以及写数据的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <unistd.h>

ssize_t write(int fd,const void *buf,size_t count);

//参数
fd:要操作的文件描述符;

buf:待写出数据的缓冲区;

count:数据大小;

//返回值
成功:返回写入的字节数;

失败:-1,errno;
  • ssize_t send(int sockfd,const void *buf,size_t len,int flags)用于网络套接字上发送数据和请求,只能用于TCP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <sys/socket.h>

ssize_t send(int sockfd,const void *buf,size_t len,int flags);

//参数
sockfd:要接收数据或请求的socket文件描述符;

buf:是一个指向读或写数据的缓冲区指针;

len:缓冲区长度;

flags:是一组标志参数,控制着函数的行为,一般都置为0;
//send()常用标志参数:
- MSG_DONTWAIT:非阻塞发送数据;
- MSG_MORE:告诉内核还有更多数据需要发送;
- MSG_NOSIGNAL:在发送数据时忽略SIGPIPE信号;

//返回值
成功:返回发送的字节数;

失败:-1,errno;
  • ssize_t recv(int sockfd,void *buf,size_t len,int flags)用于在网络套接字接收数据或请求,只能用于TCP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <sys/socket.h>

ssize_t recv(int sockfd,void *buf,size_t len,int flags);

//参数
sockfd:要发送数据或请求的socket文件描述符;

buf:是一个指向读或写数据的缓冲区指针;

len:缓冲区长度;

flags:是一组标志参数,控制着函数的行为,一般都置为0;
//recv()常用标志参数
- MSG_WAITALL:阻塞等待直到接收到指定长度的数据;
- MSG_PEEK:接收数据但不从接收缓冲区中删除数据;
- MSG_OOB:接收带外数据;

//返回值
成功:返回读到的字节数,0,表示连接已经关闭;

失败:-1,errno;
  • int shutdown(int sockfd,int how)跟close函数差不多的功能,但是shutdown不考虑描述符的引用计数,直接关闭描述符,并且可以选择关闭读端还是写端还是读写端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <sys/socket.h>

int shutdown(int sockfd, int how);


//参数
sockfd:套接字文件描述符;

how:允许为shutdown操作选择参数;
//选择
SHUT_RD(0):关闭sockfd上的读端,任何当前在套接字接收缓冲区的数据将会被无声丢弃掉;
SHUT_WR(1):关闭sockfd上的写端;
SHUT_RDWR(2):关闭sockfd的读写端;


//返回值
成功:0;

失败:-1,errno;

注:

  • 使用close中止一个连接,但它只是减少描述符的引用计数,并不直接关闭连接,只有当描述符的引用计数为0时才关闭连接.
  • shutdown不考虑描述符的引用计数,直接关闭描述符.
  • 例如当我们使用dup2进行文件描述符重定向时,使用close源文件描述符并不能阻止其他指向源文件的文件描述符操作,而shutdown会关闭全部.

C/S模型 -TCP

server的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <cstdio>
#include <iostream>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <netinet/in.h>
#include <stdlib.h>

// 封装报错函数
void sys_err(const char *str){
perror(str);
exit(1);
}

int main (int argc, char *argv[]) {
//定义需要的变量
int sfd,cfd,ret;
struct sockaddr_in serv_addr,cli_addr;
socklen_t cli_addrlen;
char buf[BUFSIZ],cli_IP[1024];

//创建服务器socket
sfd=socket(AF_INET,SOCK_STREAM,0);
if(sfd==-1){
sys_err("socket error");
}

//将服务端地址结构初始化,服务器绑定地址结构
serv_addr.sin_family=AF_INET;
serv_addr.sin_port=htons(5003);
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
if(bind(sfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr))==-1){
sys_err("bind error");
}

//设置同时建立连接上限
if(listen(sfd, 128)==-1){
sys_err("listen error");
}

//阻塞等待客户端连接
cli_addrlen=sizeof cli_addr;
cfd=accept(sfd, (struct sockaddr *)&cli_addr,&cli_addrlen);

if(cfd==-1){
sys_err("accept error");
}
//输出客户端IP地址和端口号
std::cout<<"client ip:"<<inet_ntoa(cli_addr.sin_addr)<<" port:"<<ntohs(cli_addr.sin_port)<<std::endl; //以前的使用方法inet_ntoa
//或者:
//std::cout<<"client ip:"<<inet_ntop(AF_INET,&cli_addr.sin_addr.s_addr,cli_IP,sizeof(cli_IP))<<" port:"<<ntohs(cli_addr.sin_port)<<std::endl; //现在的使用方法inet_ntop

//开始发送数据以及接收数据操作
while (1) {
ret=read(cfd,buf,sizeof(buf));
if(ret==-1){
sys_err("read error");
}
int n=write(STDOUT_FILENO,buf,ret);
for(int i=0;i<ret;++i){
buf[i]=toupper(buf[i]);
}
n=write(cfd,buf,ret);
}

//关闭连接
close(cfd);
close(sfd);
return 0;
}

client的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <cstdio>
#include <iostream>


//封装报错函数
void sys_err(const char *str){
perror(str);
exit(1);
}

int main (int argc, char *argv[]) {
//定义所需变量
int cfd;
char buf[BUFSIZ];
//初始化服务端地址结构
struct sockaddr_in serv_addr;
serv_addr.sin_family=AF_INET;
serv_addr.sin_port=htons(5003);
// serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_addr.s_addr=inet_addr("127.0.0.1"); //以前使用
//或者:
// inet_pton(AF_INET,"127.0.0.1",&serv_addr.sin_addr.s_addr); //现在使用的

//创建客户端socket
cfd=socket(AF_INET,SOCK_STREAM,0);
if(cfd==-1) sys_err("socket error");

//进行与服务端连接
if(connect(cfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr))==-1) sys_err("connect error");
int count=10;

//与服务端通信
while (--count) {
write(cfd,"hello",5);
int ret=read(cfd,buf,sizeof buf);
write(STDOUT_FILENO,buf,ret);
std::cout<<std::endl;
sleep(1);
}
//关闭客户端连接
close(cfd);
return 0;
}

TCP状态时序图

TCP状态转换图

TCP各阶段状态图

TCP各阶段状态图

出错处理函数封装

函数声明

wrap.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#ifndef _WRAP_H_
#define _WRAP_H_

#include <cstddef>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
void perr_exit(const char *s);
int Accept(int fd,struct sockaddr *sa,socklen_t *salenptr);
int Bind(int fd,const struct sockaddr *sa,socklen_t salen);
int Connect(int fd,const struct sockaddr *sa,socklen_t salen);
int Listen(int fd,int backlog);
int Socket(int family,int type,int protocol);
ssize_t Read(int fd,void *ptr,size_t nbytes);
ssize_t Write(int fd,const void *ptr,size_t nbytes);
int Close(int fd);
ssize_t Readn(int fd,void *vptr,size_t n);
ssize_t Writen(int fd,const void *vptr,size_t n);
static ssize_t my_read(int fd,char *ptr);
ssize_t Readline(int fd,void *vptr,size_t maxlen);

#endif

具体函数封装

wrap.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#include <cerrno>
#include <cstddef>
#include <cstdlib>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <cstdio>
#include "wrap.h"


void perr_exit(const char *s){
perror(s);
exit(1);
}


int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr){
int n;
again:
if((n=accept(fd,sa,salenptr))<0){
//ECONNABORTED表示连接被本地软件意外中断
//EINTR表示一个被阻塞的系统调用(如read,write,accept,connect等)被signal打断了
if((errno==ECONNABORTED)||(errno==EINTR)) goto again;
else if (errno == EAGAIN || errno == EWOULDBLOCK)
return -1; // 暂时没有新的连接请求到达,返回-1
else perr_exit("accept error");
}
return n;
}



int Bind(int fd, const struct sockaddr *sa, socklen_t salen){
int n;

if((n=bind(fd,sa,salen))<0){
perr_exit("bind error");
}

return n;
}


int Connect(int fd, const struct sockaddr *sa, socklen_t salen){
int n;
if((n=connect(fd,sa,salen))<0){
perr_exit("connect error");
return -1;
}
return n;
}


int Listen(int fd, int backlog){
int n;

if((n=listen(fd,backlog))<0){
perr_exit("listen error");
return -1;
}

return n;
}


int Socket(int family, int type, int protocol){
int n;
if((n=socket(family,type,protocol))<0){
perr_exit("socket error");
return -1;
}

return n;
}


ssize_t Read(int fd, void *ptr, size_t nbytes){
ssize_t n;

again:
if((n=read(fd,ptr,nbytes))==-1){
if(errno==EINTR) goto again;
else return -1;
}

return n;
}


ssize_t Write(int fd, const void *ptr, size_t nbytes){
ssize_t n;

again:
if((n=write(fd,ptr,nbytes))==-1){
if(errno==EINTR) goto again;
else return -1;
}

return n;
}


int Close(int fd){
int n;
if((n=close(fd))==-1) perr_exit("close error");
return n;
}

//参数三:是应该读取的字节数
ssize_t Readn(int fd, void *vptr, size_t n){
size_t nleft;
ssize_t nread;
char *ptr;

ptr=(char *)vptr;
nleft=n;

while (nleft>0) {
if((nread=read(fd,ptr,nleft))<0){
if(errno==EINTR) nread=0;
else return -1;
}else if (nread==0) {
break;
}

nleft-=nread;
ptr+=nread;
}
return n-nleft;
}


ssize_t Writen(int fd, const void *vptr, size_t n){
size_t nleft;
ssize_t nwritten;
const char *ptr;

ptr=(char*)vptr;
nleft=n;
while (nleft>0) {
if((nwritten=write(fd,ptr,nleft))<=0){
if(nwritten<0&&errno==EINTR) nwritten=0;
else return -1;
}
nleft-=nwritten;
ptr+=nwritten;
}
return n;
}


static ssize_t my_read(int fd, char *ptr){
static int read_cnt;
static char *read_ptr;
static char read_buf[100];

if(read_cnt<=0){
again:
if((read_cnt=read(fd,read_buf,sizeof(read_buf)))<0){
if(errno==EINTR) goto again;
return -1;
}else if(read_cnt==0) return 0;
read_ptr=read_buf;
}
read_cnt--;
*ptr=*read_ptr++;
return 1;
}

//readline ---fgets
//传出参数 vptr
ssize_t Readline(int fd, void *vptr, size_t maxlen){
ssize_t n,rc;
char c,*ptr;
ptr=(char*)vptr;

for(n=1;n<maxlen;++n){
if((rc=my_read(fd, &c))==1){
*ptr++=c;
if(c=='\n') break;
}else if(rc==0){
*ptr=0;
return n-1;
}else {
return -1;
}
}
*ptr=0;
return n;
}

新client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <cstdio>
#include <string.h>
#include <strings.h>
#include "wrap.h"

#define SER_PORT 5005

int main (int argc, char *argv[]) {
//定义所需变量
int cfd,n;
struct sockaddr_in ser_addr;
char buf[BUFSIZ];
//初始化服务端地址结构
struct sockaddr_in serv_addr;
bzero(&serv_addr,sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_port=htons(SER_PORT);
// serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_addr.sin_addr.s_addr=inet_addr("127.0.0.1");
//或者:
// inet_pton(AF_INET,"127.0.0.1",&serv_addr.sin_addr.s_addr);

//创建客户端socket
cfd=Socket(AF_INET,SOCK_STREAM,0);

//进行与服务端连接
Connect(cfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr));

//与服务端通信
while (fgets(buf,BUFSIZ,stdin)!=NULL) {
Write(cfd,buf,strlen(buf));
int ret=Read(cfd,buf,BUFSIZ);
if(ret==0){
printf("the other side has been closed.\n");
break;
}
Write(STDOUT_FILENO,buf,ret);
}
//关闭客户端连接
Close(cfd);
return 0;
}

新server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <cstdio>
#include <iostream>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <netinet/in.h>
#include "wrap.h"


int main (int argc, char *argv[]) {
//定义需要的变量
int sfd,cfd,ret;
struct sockaddr_in serv_addr,cli_addr;
socklen_t cli_addrlen;
char buf[BUFSIZ],cli_IP[1024];

//创建服务器socket
sfd=Socket(AF_INET,SOCK_STREAM,0);

//将服务端地址结构初始化,服务器绑定地址结构
serv_addr.sin_family=AF_INET;
serv_addr.sin_port=htons(5003);
serv_addr.sin_addr.s_addr=htonl(INADDR_ANY);
Bind(sfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr));

//设置同时建立连接上限
Listen(sfd,128);

//阻塞等待客户端连接
cli_addrlen=sizeof cli_addr;
cfd=Accept(sfd, (struct sockaddr *)&cli_addr,&cli_addrlen);

//输出客户端IP地址和端口号
// std::cout<<"client ip:"<<inet_ntoa(cli_addr.sin_addr)<<" port:"<<ntohs(cli_addr.sin_port)<<std::endl;
//或者:
std::cout<<"client ip:"<<inet_ntop(AF_INET,&cli_addr.sin_addr.s_addr,cli_IP,sizeof(cli_IP))<<" port:"<<ntohs(cli_addr.sin_port)<<std::endl;
//开始发送数据以及接收数据操作
while (1) {
ret=Read(cfd,buf,sizeof(buf));
int n=Write(STDOUT_FILENO,buf,ret);
for(int i=0;i<ret;++i){
buf[i]=toupper(buf[i]);
}
n=Write(cfd,buf,ret);
}

//关闭连接
Close(cfd);
Close(sfd);
return 0;
}

端口复用

概念

  • 端口复用解决服务端先关闭导致服务端并未真正关闭,导致端口被占用需要等待2MSL(40s左右)才能重新启动同一端口的服务端程序问题,使端口复用!

  • 端口复用解决的具体问题产生原因:

    当我们每次先关闭服务端程序运行再关闭客户端的时候,我们会发现再次启动服务端程序会发现报错说端口正在被使用,这涉及到了TCP中的连接问题,在TCP的四次挥手中,TCP会提供客户端未收到服务端的FIN包的等待重传机制(这个时候服务端并未真正处于CLOSED状态),将会导致客户端不知道服务端是否收到导致ack包到底发不发的问题,因此TCP设置让客户端等待2MSL(40多s)后再继续重发机制!而这个2MSL中服务端是处于TIME_WAIT状态,并未真正关闭,因此我们需要等待2MSL时间才能成功启动服务端程序

函数原型

int setsockopt(int sockfd,int level,int optname,const void *optval,socklen_t optlen)套接字设置函数,可以设置套接字的选项以及属性(选项很多复杂),我们这里用来进行端口复用,这里只讨论端口复用如何使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <sys/socket.h>


int setsockopt(int sockfd,int level,int optname,const void *optval,socklen_t optlen);


//参数
sockfd:套接字文件描述符;

level:给套接字设置的选项的级别;
//端口复用选择
SOL_SOCKET;

optname:设置的选项;
//选择
SO_REUSEADDR:允许复用本地地址,数据类型为int;
SO_REUSEPORT:允许复用本地端口,数据类型为int;

optval:选项值设置;
1:端口复用;
0:不使用端口复用;

optlen:选项值长度;



//返回值
成功:0;

失败:-1,errno;


//端口复用使用示例
int opt=1; //设置端口复用
setsockopt(sfd,SOL_SOCKET,SO_REUSEPORT,(void *)&opt,sizeof(opt));

注:需要在socket函数和bind函数之间进行执行!

多进程并发服务器

实现思路

  1. 创建监听套接字sfd,Socket()
  2. 绑定地址结构sockaddr_in并强转成sockaddr,Bind()
  3. 设置监听上限,Listen()
  4. 父进程(pid>0)进行监听,创建cfd与子进程(pid==0),注册信号捕捉函数非阻塞回收子进程,父进程关闭通信套接字cfd,Accept(),fork(),close(cfd),SIGCHLD,waitpid()。
  5. 子进程接收客户端连接请求以及执行服务端代码,子进程关闭监听套接字sfd,close(sfd),read()

实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include <cctype>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <strings.h>
#include <sys/wait.h>
#include "wrap.h"


#define SER_PORT 5005


void catch_child(int signum){
while(waitpid(0,NULL,WNOHANG)>0);
return;
}

int main (int argc, char *argv[]) {
int sfd,cfd,pid,ret,i;
struct sockaddr_in ser_addr,cli_addr;
socklen_t cli_addr_len;
char buf[BUFSIZ];

//将地址结构清0
// memset(&ser_addr,0,sizeof(ser_addr));
bzero(&ser_addr,sizeof(ser_addr));

ser_addr.sin_family=AF_INET;
ser_addr.sin_port=htons(SER_PORT);
ser_addr.sin_addr.s_addr=htonl(INADDR_ANY);

sfd=Socket(AF_INET,SOCK_STREAM,0);
Bind(sfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));

Listen(sfd, 128);

cli_addr_len=sizeof(cli_addr);
while (1) {
cfd=Accept(sfd,(struct sockaddr *)&cli_addr,&cli_addr_len);

//创建子进程
pid=fork();
if(pid<0){
perr_exit("fork error");
}else if (pid==0) { //子进程
close(sfd);
break;
}else{ //父进程
struct sigaction act;
act.sa_handler=catch_child;
sigemptyset(&act.sa_mask);
act.sa_flags=0;
ret=sigaction(SIGCHLD,&act,NULL);
if(ret!=0){
perr_exit("sigaction error");
}

close(cfd);
continue;
}
}
//子进程主体代码
if(pid==0){
for(;;){
ret=Read(cfd,buf,sizeof(buf));
if(ret==0){
close(cfd);
exit(1);
}
for(i=0;i<ret;++i){
buf[i]=toupper(buf[i]);
}
Write(cfd,buf,ret);
Write(STDOUT_FILENO,buf,ret);
}
}
return 0;
}

//编译
g++ process_server.cpp wrap.cpp -o process_server


//运行
./process_server
//开多个终端连接服务端
nc 127.0.0.1 5005

可以使用scp -r 本地代码目录 目标服务器用户名@公网IP地址:要存入的路径,将代码上传到服务器上测试

多线程并发服务器

实现思路

  1. 创建监听套接字sfd,Socket()
  2. 绑定地址结构sockaddr_in并强转成sockaddr,Bind()
  3. 设置监听上限,Listen()
  4. 父线程进行监听,创建子线程和cfd,使用线程分离非阻塞回收子线程(如果需要返回值,则需要创建一个专门子线程回收子线程,pthread_join回收子线程退出值),父线程关闭通信套接字cfd,Accept(),pthread_create(),pthread_detach,close(cfd),
  5. 子线程接收客户端请求并执行服务端代码,子线程关闭监听套接字sfd,close(sfd),read()

实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <netinet/in.h>
#include <strings.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <unistd.h>
#include "wrap.h"

#define SER_PORT 5005
#define MAXLINE 8192


//定义一个结构体,将地址结构跟cfd捆绑,也可以不使用
struct s_info {
struct sockaddr_in cli_addr;
int cfd;
};

void *do_work(void *arg){
int n,i;
struct s_info *ts=(struct s_info*)arg;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN]; //INET_ADDRSTRLEN=16

while(1){
//读客户端
n=Read(ts->cfd,buf,MAXLINE);
//读完跳出循环,关闭cfd
if(n==0){
printf("the client %d closed ...\n",ts->cfd);
break;
}
//打印客户端信息
printf("recevied from %s at PORT %d\n",inet_ntoa(ts->cli_addr.sin_addr),ntohs((*ts).cli_addr.sin_port)); //以前使用的
// printf("recevied from %s at PORT %d\n",inet_ntop(AF_INET,&ts->cli_addr.sin_addr,str,sizeof(str)),ntohs((*ts).cli_addr.sin_port)); //现在使用的
for(i=0;i<n;++i){
buf[i]=toupper(buf[i]);
}
//写数据
Write(ts->cfd,buf,n);
Write(STDOUT_FILENO,buf,n);
}
Close(ts->cfd);
return (void*)0; //pthread_exit(0);
}


int main (int argc, char *argv[]) {
struct sockaddr_in ser_addr,cli_addr;
socklen_t cli_addr_len;
int sfd,cfd;
pthread_t tid;

struct s_info ts[256]; //创建结构体数组
int i=0;

sfd=Socket(AF_INET,SOCK_STREAM,0);
//地址结构清0
bzero(&ser_addr, sizeof(ser_addr));
ser_addr.sin_family=AF_INET;
ser_addr.sin_addr.s_addr=htonl(INADDR_ANY);
ser_addr.sin_port=htons(SER_PORT);

Bind(sfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));

Listen(sfd,128);

printf("Accepting client connect ...\n");
cli_addr_len=sizeof(cli_addr);
while (1) {
//阻塞监听客户端连接请求
cfd=Accept(sfd,(struct sockaddr *)&cli_addr,&cli_addr_len);
ts[i].cli_addr=cli_addr;
ts[i].cfd=cfd;
//创建子线程来执行服务端代码
pthread_create(&tid,NULL,do_work,(void *)&ts[i]);
//子线程分离,进行子线程回收,防止僵尸线程产生
pthread_detach(tid);
++i;
}
return 0;
}

多路I/O转接服务器

引言

  • 之前讲的服务端程序使用的方式就是阻塞模式或者非阻塞忙轮询模式,而这些模式的效率很低,因此我们引入了响应式的多路IO转接模式
  • 阻塞一直等待事件发生
  • 非阻塞忙轮询就是不阻塞,但是会一直循环查看事件是否发生
  • 多路I/O转接响应式,当事件发生就唤醒服务端
  • 多路IO转接服务器也叫做多任务IO服务器,这类服务器实现的主旨思想是,不再游应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件

select模型

概念

  • select模型采用内核作为代理代替应用程序进行轮询,解决了accept监听阻塞等待问题,并没有解决客户端与服务端通信之间的阻塞
  • 在非阻塞忙轮询的模式中,我们的应用程序需要不断地轮询(polling)获得客户端的请求,而每次轮询就是一次系统调用,这样的效率不够好,而在select模型中,我们选择使用内核作为代理代替应用程序去轮询

select模型原理

  1. select作为内核代理,将会持有原先服务端的监听套接字sfd,来监听客户端,解放服务端程序
  2. 对于多个客户端连接套接字cfd(文件描述符),select会将其放入一个等待队列中
  3. 然后select调用的时候会遍历队列,如果客户端发送请求或数据,会调用服务端程序的回调事件(也就是服务端和客户端还是直接通信,select并未进行协助)
  4. 遍历结束之后,如果仍然没有一个可用的文件描述符(也就是客户端没有请求),select会让用户进程睡眠,直到等待资源可用的时候(也就是客户端有请求时)再唤醒用户进程并返回对应的文件描述符(调用应用进程的回调事件)

优点

  • 跨平台进行文件描述符监听,例如windows、macos、linux、Unix、类Unix、mipe系统都可以使用

缺点

  • 监听上限受文件描述符限制,最大1024个
  • select模型需要遍历所有文件描述符队列,没有监听或者事件发生的文件描述符也被遍历,导致效率不够理想
  • select的每次调用都需要重复的复制集合,而且在select内部需要将集合从用户区复制到内核区,等有事件来又需要将数据从内核区复制到用户区,导致select的性能受限

函数

  • int select(int nfds,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout):创建select
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <sys/select.h>

int select(int nfds,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);

struct timeval{
long tv_sec; //秒
long tv_usec; //微妙
};


//参数
nfds:所监听的所有文件描述符当中最大的文件描述符+1,也就是等待队列长度;

readfds:传入传出参数,传入要监听读事件的文件描述符集合,传出刚刚传入的集合中发生读事件的文件描述符集合,没有传NULL;

writefds:传入传出参数,传入要监听写事件的文件描述符集合,传出刚刚传入的集合中发生写事件的文件描述符集合,没有传NULL;

exceptfds:传入传出参数,传入要监听异常事件的文件描述符集合,传出刚刚传入的集合中发生异常事件的文件描述符集合,没有传NULL;

timeout:定时阻塞监控时间;
//选择
NULL:阻塞监听;
>0:设置监听超时时长;
0:非阻塞监听,轮询;

//返回值
成功:返回满足对应监听事件的文件描述符总个数;

失败:-1,errno;
  • void FD_ZERO(fd_set *set)把文件描述符集合(监听集合)清空
1
2
3
4
5
6
#include <sys/select.h>

void FD_ZERO(fd_set *set);

//参数
set:要清空的文件描述符集合;
  • void FD_CLR(int fd,fd_set *set)把文件描述符集合某个文件描述符从监听集合中移除
1
2
3
4
5
6
7
8
#include <sys/select.h>

void FD_CLR(int fd,fd_set *set);

//参数
fd:要移除的文件描述符;

set:文件描述符集合;
  • void FD_SET(int fd,fd_set *set)将fd文件描述符添加到set监听集合中
1
2
3
4
5
6
7
8
#include <sys/select.h>

void FD_SET(int fd,fd_set *set);

//参数
fd:要添加的文件描述符;

set:文件描述符集合;
  • int FD_ISSET(int fd,fd_set *set)判断fd文件描述符是否存在于set监听集合中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <sys/select.h>

int FD_ISSET(int fd,fd_set *set);

//参数
fd:要判断的文件描述符;

set:文件描述符集合;


//返回值
>0:说明fd存在于set集合;

=0:说明fd不存在于set集合;

select实现多路IO转接服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <cctype>
#include <cstdio>
#include <cstdlib>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/select.h>
#include <strings.h>
#include "wrap.h"

#define SER_PORT 5005


int main (int argc, char *argv[]) {
int sfd,ret,cfd,n;
int mfd=0; //最大文件描述符
sockaddr_in ser_addr,cli_addr;
socklen_t cli_addr_len;
char buf[BUFSIZ];

sfd=Socket(AF_INET,SOCK_STREAM,0);

bzero(&ser_addr,sizeof(ser_addr));
ser_addr.sin_family=AF_INET;
ser_addr.sin_addr.s_addr=htonl(INADDR_ANY);
ser_addr.sin_port=htons(SER_PORT);

int opt=1;
ret=setsockopt(sfd,SOL_SOCKET,SO_REUSEPORT,&opt,sizeof(opt));
if(ret!=0){
perr_exit("setsockopt error");
}
Bind(sfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));

Listen(sfd,128);

//构造select监听文件描述符集合
fd_set rset,allset; //rset读事件文件描述符集合,allset用来暂存

mfd=sfd;
FD_ZERO(&allset); //集合清空;
FD_SET(sfd,&allset); //添加sfd到read集合;

while(1){
rset=allset; //每次循环都更新设置select监控集合
int nready=select(mfd+1,&rset,NULL,NULL,NULL);
if(nready<0){
perr_exit("select error");
}
//说明有新的客户端连接请求
if(FD_ISSET(sfd, &rset)){
cli_addr_len=sizeof(cli_addr);
cfd=Accept(sfd,(struct sockaddr *)&cli_addr,&cli_addr_len); //不会阻塞
FD_SET(cfd, &allset); //向监控集合添加新客户端的文件描述符
if(mfd<cfd){ //更新最大文件描述符
mfd=cfd;
}
if(0==--nready){ //只有sfd有事件(也就是客户端文件描述符没有事件),后续的for不需要执行
continue;
}
//等价于
// if(1==nready) continue;
}
//检测哪个客户端需要发送数据
for(int i=sfd+1;i<=mfd;++i){
if(FD_ISSET(i, &rset)){
//当客户端关闭连接时,服务端也关闭对应连接
if((n=Read(i,buf,sizeof(buf)))==0){
Close(i);
FD_CLR(i, &allset);
}else if(n==-1){
perr_exit("read error");
}
for(int j=0;j<n;++j){
buf[j]=toupper(buf[j]);
}
Write(i,buf,n);
Write(STDOUT_FILENO,buf,n);
}
}
}
Close(sfd);
return 0;
}

优化

  • 对于select模型需要遍历不必遍历的文件描述符而造成的效率丢失,我们可以使用一个自定义数组用于存储客户端的文件描述符来提高效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <arpa/inet.h>
#include <cctype>
#include <cstdio>
#include <cstdlib>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/select.h>
#include <strings.h>
#include "wrap.h"

#define SER_PORT 5005


int main (int argc, char *argv[]) {
int sfd,ret,cfd,clifd,n;
int mfd=0; //最大文件描述符
sockaddr_in ser_addr,cli_addr;
socklen_t cli_addr_len;
char buf[BUFSIZ];

sfd=Socket(AF_INET,SOCK_STREAM,0);

bzero(&ser_addr,sizeof(ser_addr));
ser_addr.sin_family=AF_INET;
ser_addr.sin_addr.s_addr=htonl(INADDR_ANY);
ser_addr.sin_port=htons(SER_PORT);

int opt=1;
ret=setsockopt(sfd,SOL_SOCKET,SO_REUSEPORT,&opt,sizeof(opt));
if(ret!=0){
perr_exit("setsockopt error");
}
Bind(sfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));

Listen(sfd,128);

//构造select监听文件描述符集合
fd_set rset,allset; //rset读事件文件描述符集合,allset用来暂存

mfd=sfd;
int maxi=-1,cli[FD_SETSIZE]; //自定义数组cli,FD_SETSIZE默认为1024
for(int i=0;i<FD_SETSIZE;++i){
cli[i]=-1;
}
FD_ZERO(&allset); //集合清空;
FD_SET(sfd,&allset); //添加sfd到read集合;

while(1){
rset=allset; //每次循环都更新设置select监控集合
int nready=select(mfd+1,&rset,NULL,NULL,NULL);
if(nready<0){
perr_exit("select error");
}
//说明有新的客户端连接请求
if(FD_ISSET(sfd, &rset)){
cli_addr_len=sizeof(cli_addr);
cfd=Accept(sfd,(struct sockaddr *)&cli_addr,&cli_addr_len); //不会阻塞
printf("recevied from %s at PROT %d\n",inet_ntoa(cli_addr.sin_addr),ntohs(cli_addr.sin_port));
int k;
for(k=0;k<FD_SETSIZE;++k){
if(cli[k]<0){ //找cli[]中没有使用的位置
cli[k]=cfd; //保存accept返回的文件描述符到cli
break;
}
}
if(k==FD_SETSIZE){
fputs("too many clients\n",stderr);
exit(1);
}
FD_SET(cfd, &allset); //向监控集合添加新客户端的文件描述符
if(mfd<cfd){ //更新最大文件描述符
mfd=cfd;
}

if(k>maxi){
maxi=k; //保证maxi存的总是cli[]最后一个元素下标
}


if(--nready<=0){ //只有sfd有事件(也就是客户端文件描述符没有事件),后续的for不需要执行
continue;
}
//等价于
// if(1==nready) continue;
}
//检测哪个客户端需要发送数据
for(int i=0;i<=maxi;++i){
if((clifd=cli[i])<0) continue;
if(FD_ISSET(clifd, &rset)){
//当客户端关闭连接时,服务端也关闭对应连接
if((n=Read(clifd,buf,sizeof(buf)))==0){
Close(clifd);
FD_CLR(clifd, &allset);
cli[i]=-1;
}else if(n==-1){
perr_exit("read error");
}
for(int j=0;j<n;++j){
buf[j]=toupper(buf[j]);
}
Write(clifd,buf,n);
Write(STDOUT_FILENO,buf,n);
//将满足事件发生的文件描述符的事件处理完后就可以跳出for循环,继续监听事件发生
if(--nready<=0) break; //跳出for循环
}
}
}
Close(sfd);
return 0;
}

poll模型

概念

  • poll本质上是对select的改进,但是由于一些原因,poll并没有解决性能问题,只是在参数层面做了优化和解除了通过宏定义来设置的限制。因此poll只能说是一个半成品

函数原型

int poll(struct pollfd *fds,nfds_t nfds,int timeout)创建poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
int fd; //待监听的文件描述符
short events; //待监听的事件(读、写、异常)
//events取值:读POLLIN、写POLLOUT、异常POLLERR
short revents; //返回监听事件触发事件次数,传入要传0,如果传出非0就监听事件触发
};

//参数
fds:要监听的文件描述符[数组];

nfds:监听数组的实际有效监听个数;

timeout:监听超时时长;
//以毫秒为单位
-1:阻塞等待;
0:立即返回,非阻塞;
>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值;


//返回值
成功:返回满足对应监听事件的文件描述符总个数;

失败:-1,errno;

优点

  • 自带数组结构
  • 可以将监听事件集合和返回事件集合分离,无需每次调用时,重新设定监听事件
  • 可以拓展监听上限,超出1024上限

缺点

  • 不能跨平台,只能在Linux系统下使用
  • 无法直接定位满足监听事件的文件描述符,需要挨个轮询

poll实现多路IO转接服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include "wrap.h"
#include <arpa/inet.h>
#include <cerrno>
#include <strings.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cctype>
#include <cstdio>
#include <poll.h>

#define SER_PORT 5005
#define OPENMAX 1024

int main (int argc, char *argv[]) {
int sfd,cfd,clifd,ret,n;
sockaddr_in ser_addr,cli_addr;
socklen_t cli_addr_len;
char buf[BUFSIZ];
struct pollfd cli[OPENMAX];

sfd=Socket(AF_INET,SOCK_STREAM,0);

bzero(&ser_addr, sizeof(ser_addr));
ser_addr.sin_family=AF_INET;
ser_addr.sin_addr.s_addr=htonl(INADDR_ANY);
ser_addr.sin_port=htons(SER_PORT);


int opt=1;
ret=setsockopt(sfd,SOL_SOCKET,SO_REUSEPORT,&opt,sizeof(opt));
if(ret<0){
perr_exit("setsockopt error");
}

Bind(sfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));

Listen(sfd,128);

cli[0].fd=sfd; //将服务端监听文件描述符加入监听数组
cli[0].events=POLLIN; //监听事件是读事件
// cli[0].revents=0; //默认为0
cli_addr_len=sizeof(cli_addr);
for(int i=1;i<OPENMAX;++i){
cli[i].fd=-1;
}
int maxi=0;
while (1) {
int nready=poll(cli,maxi+1,-1); //阻塞监听是否有客户端链接请求
if(nready<0){
perr_exit("poll error");
}
//sfd有读事件就绪
if(cli[0].revents&POLLIN){
cfd=Accept(sfd,(struct sockaddr *)&cli_addr,&cli_addr_len);
printf("received from %s at PORT %d\n",inet_ntoa(cli_addr.sin_addr),ntohs(cli_addr.sin_port));
int k;
//找到cli[]中空闲位置,存放新客户端的cfd
for(k=1;k<OPENMAX;++k){
if(cli[k].fd<0){
cli[k].fd=cfd;
break;
}
}
//达到来最大客户端数
if(k==OPENMAX){
perr_exit("too many clients");
}
//设置新客户端的监听事件
cli[k].events=POLLIN;
//更新cli[]中最大元素下标
if(k>maxi){
maxi=k;
}
//没有事件发生,继续回到poll阻塞监听
if(--nready<=0) continue;
}
//查看是否有客户端事件就绪
for(int i=1;i<=maxi;++i){
if((clifd=cli[i].fd)<0) continue;
if(cli[i].revents&POLLIN){
if((n=Read(clifd,buf,sizeof(buf)))<0){
//受到RST标志,客户端异常退出
if(errno==ECONNRESET){
printf("client[%d] aborted connection\n",i);
Close(clifd);
cli[i].fd=-1; //将其移除监听数组
}else perr_exit("read error");
}else if (n==0) { //客户端先关闭连接
printf("client[%d] closed connection\n",i);
Close(clifd);
cli[i].fd=-1;
}else {
for(int j=0;j<n;++j){
buf[j]=toupper(buf[j]);
}
Writen(clifd,buf,n);
}
if(--nready<=0) break;
}
}
}
Close(sfd);
return 0;
}

epoll模型

概念

  • epoll复用了等待队列,并且添加了就绪列表,当监听事件发生时,进行回传通知,而事件通知的时机又提供了水平触发(LT)和边缘触发(ET)

  • epoll既提供了select/poll的IO事件的水平触发(LT),也提供了边缘触发(ET)机制,这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的使用,提供应用程序效率

  • 等待队列采用红黑树进行存储被监听的客户端文件描述符

  • 就绪列表使用双向链表进行存储事件触发就绪的文件描述符

  • epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率

    原因

    1. 因为它会复用文件描述符集合(等待队列)来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合
    2. 无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的文件描述符集合即可

ET和LT模式

  • 边缘触发(ET):全名edge trigger,只有新事件触发,或是timeout时epoll_wait才会回传,不管缓冲区是否还有数据只支持非阻塞socket,使用fcntl函数设置非阻塞套接字

    优点:开销小,不用过度处理消息

    缺点:编码难度大,需要注意事件无法一次性处理完,导致下次触发事件却处理着上次事件剩下的数据(遗漏事件处理)

  • 水平触发(LT):默认采用模式,全名level trigger,epoll_wait在事件状态未变更前都会回传,只要缓冲区有数据都会触发

    优点:编码难度较简单,不会遗漏事件处理

    缺点:开销较大

注意

  • epoll的本质就是监听文件描述符,因此epoll不仅可以用来监听套接字,也可用于管道、fifo、mmap映射等

突破1024文件描述符限制

  • 可以使用cat命令查看当前计算机可以打开的socket描述符上限,受硬件影响
1
cat /proc/sys/fs/file-max
  • 使用ulimit命令查看当前用户下的进程,默认打开文件描述符上限,缺省为1024
1
ulimit -a
  • 修改配置文件修改上限值
1
2
3
4
5
6
sudo nvim /etc/security/limits.conf

#在文件尾部#end of file之前写下以下配置,soft为软限制,hard为硬限制
#soft为默认值,可以使用命令修改,但是不能超过hard值
* soft nofile 65536
* hard nofile 100000

工作流程

  1. epoll维护一个等待队列(红黑树)和一个就绪列表(链表)
  2. 然后需要监听的socket文件描述符就添加到等待队列中
  3. 当哪个客户端socket有事件触发,就将其加入到就绪列表中
  4. 然后回传通知给服务端,服务端直接遍历就绪列表中,挨个处理即可

函数原型

  • int epoll_create(int size)创建epoll文件描述符,也就是指向红黑树根节点的文件描述符
1
2
3
4
5
6
7
8
9
10
11
#include <sys/epoll.h>

int epoll_create(int size);

//参数
size:创建的红黑树的监听节点数量.(仅供内核参考);

//返回值
成功:epoll文件描述符,也就是指向红黑树根节点的文件描述符;

失败:-1,errno;
  • int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event)用来控制红黑树
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);

struct epoll_event{
uint32_t events; //事件类型
epoll_data_t data; //数据
};

struct union epoll_data{
void *ptr; //Reactor模型使用
int fd; //对应监听事件的fd
uint32_t u32; //不用
uint64_t u64; //不用
}epoll_data_t;


//参数
epfd:epoll文件描述符;

op:对该监听红黑树所做的操作参数;
//选择:
EPOLL_CTL_ADD:添加一个节点到红黑树上;
EPOLL_CTL_MOD:修改红黑树上节点的监听事件;
EPOLL_CTL_DEL:将fd从红黑树上移除;

fd:待监听文件描述符;

event:要设定监听事件参数,本质为epoll_event结构体;
//成员:
events:传事件类型,默认为LT机制,要想使用ET就需要或上EPOLLET;
//读EPOLLIN、写EPOLLOUT、异常EPOLLERR

data:存储用户数据,epoll_data_t 联合体;


//返回值
成功:0;

失败:-1,errno;
  • int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout)监听等待队列的客户端socket是否有监听事件触发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

struct epoll_event{
uint32_t events; //事件类型
epoll_data_t data; //数据
};

struct union epoll_data{
void *ptr; //指定与fd相关的用户数据
int fd; //对应监听事件的fd
uint32_t u32; //不用
uint64_t u64; //不用
}epoll_data_t;


//参数
epfd:epoll文件描述符;

events:传出参数,传出监听事件被触发或者就绪的结构体列表,【数组】;

maxevents:events数组元素的总个数;

timeout:设置超时事件;
-1:阻塞;
0:非阻塞;
>0:设定的超时时长(单位毫秒);


//返回值
成功:返回监听事件就绪的文件描述符个数,可以用作循环上限;
超时:0;
失败:-1,errno;

epoll实现多路IO转接服务器

  • LT模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include "wrap.h"
#include <arpa/inet.h>
#include <strings.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <cctype>
#include <cstdio>



#define SER_PORT 5005
#define OPEN_MAX 5000


int main (int argc, char *argv[]) {
int sfd,cfd,clifd,i,epfd;
int n,ret,num=0;
char buf[BUFSIZ];
sockaddr_in ser_addr,cli_addr;
socklen_t cli_addr_len;
epoll_event tep,ep[OPEN_MAX]; //tep:epolL_ctl参数,ep[]:epoll_wait参数

sfd=Socket(AF_INET,SOCK_STREAM,0);

bzero(&ser_addr, sizeof(ser_addr));
ser_addr.sin_family=AF_INET;
ser_addr.sin_port=htons(SER_PORT);
ser_addr.sin_addr.s_addr=htonl(INADDR_ANY);

int opt=1;
ret=setsockopt(sfd,SOL_SOCKET,SO_REUSEPORT,&opt,sizeof(opt));
if(ret<0){
perr_exit("setsockopt error");
}
Bind(sfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));

Listen(sfd,128);

epfd=epoll_create(OPEN_MAX); //创建epoll模型
if(epfd==-1) perr_exit("epoll_create error");

tep.events=EPOLLIN; //指定sfd监听事件为读
tep.data.fd=sfd;
ret=epoll_ctl(epfd,EPOLL_CTL_ADD,sfd,&tep); //将sfd结构体节点加入到监听红黑树中
if(ret==-1) perr_exit("epoll_ctl error");

cli_addr_len=sizeof(cli_addr);
while (1) {
//epoll为server阻塞监听事件,ep为epoll_event类型数组,OPEN_MAX为数组容量,-1表永久阻塞
int nready=epoll_wait(epfd,ep,OPEN_MAX,-1);
if(nready==-1){
perr_exit("epoll_wait error");
}
for(int i=0;i<nready;++i){
//如果不是读事件继续循环
if(!(ep[i].events&EPOLLIN)) continue;
//判断就绪事件的是不是监听文件描述符sfd
if(ep[i].data.fd==sfd){
cfd=Accept(sfd,(struct sockaddr *)&cli_addr,&cli_addr_len);
printf("received from %s at PORT %d\n",inet_ntoa(cli_addr.sin_addr),ntohs(cli_addr.sin_port));
printf("cfd %d---client %d\n",cfd,++num);

tep.events=EPOLLIN;
tep.data.fd=cfd;
ret=epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&tep);
if(ret==-1) perr_exit("EPOLL_CTL_ADD error");
}else{ //不是sfd,是客户端文件描述符
clifd=ep[i].data.fd;
n=Read(clifd,buf,sizeof(buf));
if(n==0){ //读到0说明客户端先关闭连接
ret=epoll_ctl(epfd,EPOLL_CTL_DEL,clifd,NULL); //删除监听节点
if(ret==-1) perr_exit("EPOLL_CTL_DEL error");
Close(clifd);
printf("client[%d] closed connection\n",clifd);
}else if (n<0) { //出错
perror("read n<0 error");
ret=epoll_ctl(epfd,EPOLL_CTL_DEL,clifd,NULL); //删除监听节点
if(ret==-1) perr_exit("EPOLL_CTL_DEL error");
Close(clifd);
}else {
for(i=0;i<n;++i){
buf[i]=toupper(buf[i]);
}
Write(clifd,buf,n);
Write(STDOUT_FILENO,buf,n);
}
}
}
}
Close(sfd);
Close(epfd);
return 0;
}
  • 非阻塞ET模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include "wrap.h"
#include <cctype>
#include <cerrno>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <strings.h>
#include <cstdio>

#define SER_PORT 5005
#define OPEN_MAX 5000


int main (int argc, char *argv[]) {
int sfd,cfd,clifd,efd;
int n,ret,num=0,flag;
char buf[BUFSIZ];
sockaddr_in ser_addr,cli_addr;
socklen_t cli_addr_len;
epoll_event tep,ep[OPEN_MAX];

// sfd=Socket(AF_INET,SOCK_STREAM,0);
//设置非阻塞也可以直接在socket函数操作
sfd=Socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,0);

//设置非阻塞
/* flag=fcntl(sfd, F_GETFL);
flag|=O_NONBLOCK;
fcntl(sfd,F_SETFL,flag); */
// fcntl(sfd,F_SETFL,fcntl(sfd,F_GETFL)|O_NONBLOCK);

bzero(&ser_addr, sizeof(ser_addr));
ser_addr.sin_family=AF_INET;
ser_addr.sin_port=htons(SER_PORT);
ser_addr.sin_addr.s_addr=htonl(INADDR_ANY);

int opt=1;
ret=setsockopt(sfd, SOL_SOCKET,SO_REUSEADDR, &opt,sizeof(opt));
if(ret==-1){
perr_exit("setsockopt error");
}

Bind(sfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));

Listen(sfd,128);
efd=epoll_create(OPEN_MAX);
if(efd==-1) perr_exit("epoll_create error");

tep.events=EPOLLIN|EPOLLET;
tep.data.fd=sfd;
ret=epoll_ctl(efd,EPOLL_CTL_ADD,sfd,&tep);
if(ret==-1) perr_exit("EPOLL_CTL_ADD error");

cli_addr_len=sizeof(cli_addr);
while (1) {
int nready=epoll_wait(efd,ep,OPEN_MAX,-1);
if(nready==-1) perr_exit("epoll_wait error");
for(int i=0;i<nready;++i){
if(!(ep[i].events&EPOLLIN)) continue;
if(ep[i].data.fd==sfd){
// 多个连接同时到达,服务器的 TCP 就绪队列瞬间积累多个就绪
// 连接,由于是边缘触发模式,epoll 只会通知一次,accept 只处理一个连接,导致 TCP 就绪队列中剩下的连接都得不到处理。
// 解决办法是用 while 循环抱住 accept 调用,处理完 TCP 就绪队列中的所有连接后再退出循环,或者将监听套接字更改为LT模式
while((cfd=Accept(sfd,(struct sockaddr *)&cli_addr,&cli_addr_len))>0){
printf("recevied from %s at port %d\n",inet_ntoa(cli_addr.sin_addr),ntohs(cli_addr.sin_port));
printf("cfd %d --------client %d\n",cfd,++num);
tep.events=EPOLLIN|EPOLLET;
tep.data.fd=cfd;
/* flag=fcntl(cfd,F_GETFL);
flag|=O_NONBLOCK;
fcntl(cfd,F_SETFL,flag); */
fcntl(cfd,F_SETFL,fcntl(cfd,F_GETFL)|O_NONBLOCK);

ret=epoll_ctl(efd,EPOLL_CTL_ADD,cfd,&tep);
if(ret==-1) perr_exit("EPOLL_CTL_ADD error");
}
}else {
clifd=ep[i].data.fd;
while (1) {
n=Read(clifd,buf,sizeof(buf));
if(n==0){
ret=epoll_ctl(efd,EPOLL_CTL_DEL,clifd,NULL);
if(ret==-1) perr_exit("EPOLL_CTL_DEL error");
Close(clifd);
printf("client[%d] closed connection\n",clifd);
break;
}else if (n<0) {
//非阻塞模式下,errno等于EAGAIN或者EWOULDBLOCK只是代表无数据读取并不是报错
if(errno==EAGAIN||errno==EWOULDBLOCK) break;
printf("read n<0 error\n");
ret=epoll_ctl(efd,EPOLL_CTL_DEL,clifd,NULL);
if(ret==-1) perr_exit("EPOLL_CTL_DEL error");
Close(clifd);
break;
}
else {
for(i=0;i<n;++i){
buf[i]=toupper(buf[i]);
}
Write(clifd,buf,n);
Write(STDOUT_FILENO,buf,n);
}
}
}
}
}
Close(sfd);
Close(efd);
return 0;
}

非阻塞ET注意问题:

  • 掉连接问题:当我们的监听套接字设定为非阻塞时,多个连接同时到达,服务器的 TCP 就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll 只会通知一次,accept 只处理一个连接,导致 TCP 就绪队列中剩下的连接都得不到处理,这样就是我们所说的掉连接

    解决方法使用while循环抱住accept调用,使得一次accept处理完就绪队列中的就绪连接再结束,或者在设置监听套接字阻塞以及事件上使用LT模式

  • 事件丢失/未处理完问题:使用非阻塞模式的ET模式,容易因为网络问题、数据断传或者服务端读取字节小于数据,导致一部分数据得等到下个新事件就绪才能获取到,这样就会导致最后一个事件并没有被真正处理,导致事件丢弃/未处理完问题

    解决办法使用while循环抱住read函数,使得缓冲区可读就一直读下去,使得一次读取干净,相对应的写事件也是一样的处理方式

Reactor模型

概念

  • Reactor模型:又称epoll反应堆模型,是基于epoll的ET模式实现的,这个模型即为非阻塞IO+IO复用的同步IO模型
  • 操作流程:通过IO多路复用检测多条连接的IO是否就绪,如果有IO就绪,再通过非阻塞IO去操作具体IO
  • Reactor模式是一个事件驱动机制模型,他逆转了事件处理的流程,不再是检测发生事件类型来调用对应api处理,而是应用程序在Reactor上提前注册好回调函数(使用泛型指针ptr来注册),如果相应事件发送,reactor就直接调用应用程序注册的接口
  • 注意:Reactor模型还分几种,我们这里先讲述单reactor单线程模型,所以以下内容都只是单reactor单线程模型的内容,但是基本思想一致

Reactor模型弃用epoll_data中的fd设置fd,而是使用成员ptr

  • Reactor模型不再使用epoll_data(联合体)中的fd来设置fd,而是可以使用泛型指针ptr来进行设定
  • 因为ptr是泛型指针,并且epoll_data是联合体,里面成员共用一个地址空间,所以我们可以自定义结构体里面包含fd和回调函数等等成员,这样我们就可以通过一个ptr操作多个成员以及使用结构体

reactor架构方案如下图:

区别

  • select、poll、epoll只解决了IO是否就绪问题,也就是IO多路复用,并没有解决具体IO操作问题
  • reactor使用非阻塞IO+IO多路复用,就既解决了IO是否就绪问题,也解决了具体IO操作问题

工作流程

  1. 配置服务端配置,socket、bind、listen,设置非阻塞后
  2. 使用epoll_create创建监听红黑树
  3. epoll_ctl添加监听套接字lfd
  4. 循环epoll_wait监听,有事件发生,返回触发事件数组
  5. 当监听套接字lfd有事件发生,创建新客户端cfd,并添加到红黑树上
  6. 当cfd有读事件触发,执行read(),将cfd从监听红黑树摘下
  7. 重新添加一个写事件就绪的cfd(刚刚摘下的cfd)到监听红黑树上,需要设定事件为写事件(EPOLLOUT),以及注册好回调函数(使用ptr)
  8. 等待epoll_wait返回,也就是cfd可写,然后将数据write回去
  9. 将cfd从红黑树摘下,将事件改为读事件EPOLLIN,将其重新放回红黑树,然后继续监听
  10. 剩下就是重复之前步骤

代码结构

  • myevent_s结构体:替换掉event_data结构体中的ptr,用来设置事件参数包括文件描述符,监听事件类型以及回调函数等等.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //描述就绪文件描述符相关信息
    struct myevent_s{
    int fd; //要监听的文件描述符
    int events; //对应的监听事件
    void *arg; //泛型参数
    void (*call_back)(int fd,int events,void *arg); //回调函数
    int status; //判断是否监听参数,1:监听,0:不在监听
    char buf[BUFLEN]; //缓冲区
    int len; //缓冲区长度
    long last_active; //记录每次加入红黑树g_efd的时间,用于超时设定
    };
  • initlistensocket函数创建和初始化监听套接字lfd

    1
    2
    3
    4
    5
    void initlistensocket(int efd,short port);

    efd:epoll文件描述符,指向监听红黑树的根节点;

    port:端口号,绑定监听套接字lfd上的服务端端口;
  • eventset函数初始化事件结构体myevent_s,设置回调函数

    1
    2
    3
    4
    5
    void eventset(struct myevent_s *ev,int fd,void(*call_back)(int,int,void*),void *arg);

    ev:就绪文件结构体,用来描述就绪文件描述符的信息结构体,自定义结构体,事件参数结构体event_data的ptr指向的结构体;

    fd:要监听的文件描述符;
  • eventadd函数将fd以及对应事件结构体添加到监听红黑树,设置监听事件。

    1
    2
    3
    4
    5
    6
    7
    void eventadd(int efd,int events,struct myevent_s *ev);

    efd:epoll文件描述符,指向监听红黑树的根节点;

    events:设置对应监听事件的参数;

    ev:就绪文件结构体,用来描述就绪文件描述符的信息结构体,自定义结构体,事件参数结构体event_data的ptr指向的结构体;
  • eventdel函数从epoll监听的红黑树删除文件描述符

    1
    2
    3
    4
    5
    void eventdel(int efd,struct myevent_s *ev);

    efd:epoll文件描述符;

    ev:要删除的文件描述符的就绪文件结构体(里面包含了要删除的文件描述符);
  • acceptconn函数监听套接字lfd的回调函数,用于服务端创建与客户端的连接

    1
    2
    3
    4
    5
    6
    7
    void acceptconn(int lfd,int events,void *arg);

    fd:lfd;

    eventds:要监听的事件类型;

    arg:用来传入lfd对应的就绪结构体myevent_s参数
  • recvdata函数客户端套接字cfd的读就绪事件的回调函数,服务端接收客户端请求和数据

    1
    2
    3
    4
    5
    6
    7
    void recvdata(int fd,int events,void *arg);

    fd:cfd;

    events:监听事件(这个地方是读事件);

    arg:用来传入cfd对应的就绪结构体myevent_s参数;
  • senddata函数客户端套接字cfd的写就绪事件的回调函数,服务端的向客户端发送数据

    1
    2
    3
    4
    5
    6
    7
    void senddata(int fd,int events,void *arg);

    fd:cfd;

    events:监听事件(这个地方是写事件);

    arg:用来传入cfd对应的就绪文件结构体myevent_s参数;

单reactor单线程模型示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#include "wrap.h"
#include <cerrno>
#include <cstddef>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <fcntl.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <cstdio>
#include <stdlib.h>

#define MAX_EVENTS 1000
#define BUFLEN 4096
#define SER_PORT 5005
}
}
// 退出前释放所有资源
//回调函数
void acceptconn(int lfd,int events,void *arg);
void recvdata(int fd,int events,void *arg);
void senddata(int fd,int events,void *arg);

//描述就绪文件描述符相关信息
struct myevent_s{
int fd; //要监听的文件描述符
int events; //对应的监听事件
void *arg; //泛型参数
void (*call_back)(int fd,int events,void *arg); //回调函数
int status; //判断是否监听参数,1:监听,0:不在监听
char buf[BUFSIZ]; //缓冲区
int len; //缓冲区长度
long last_active; //记录每次加入红黑树g_efd的时间,用于超时设定
};


int g_efd; //全局变量,epoll文件描述符,指向监听红黑树根部
struct myevent_s g_events[MAX_EVENTS+1]; //自定义结构体类型数组,用于存储监听的文件描述符


//将就绪文件结构体myevent_s成员变量初始化
void eventset(struct myevent_s *ev,int fd,void(*call_back)(int,int,void *),void *arg){
ev->fd=fd;
ev->call_back=call_back;
ev->arg=arg;
//当是监听套接字或者当是写事件套接字结束后清0,如果是读事件后清0要不然将无法将数据写回客户端
if(fd==g_events[MAX_EVENTS].fd||ev->events==EPOLLOUT){
memset(ev->buf,0,sizeof(ev->buf));
ev->len=0;
}
ev->events=0;
ev->status=0;
ev->last_active=time(NULL); //获取当前时间

return;
}

//添加文件描述符到红黑树上
void eventadd(int efd,int events,struct myevent_s *ev){
//初始化结构体,因为包含联合体data所以以下初始化
struct epoll_event epv={0,{0}};
int op;
epv.data.ptr=ev;
epv.events=ev->events=events;
//将其加入红黑树efd,并将status置为1
if(ev->status==0){
op=EPOLL_CTL_ADD;
ev->status=1;
}

if(epoll_ctl(g_efd,op,ev->fd,&epv)<0){
printf("event add failed [fd=%d],events[%d]\n",ev->fd,events);
}else {
printf("event add OK [fd=%d],op=%d,events[%0X]\n",ev->fd,op,events);
}

return;
}


//从红黑树上摘除文件描述符
void eventdel(int efd,struct myevent_s *ev){
//初始化结构体,因为包含联合体data所以配置服务端配置,socket、bind、listen,设置非阻塞后以下初始化
struct epoll_event epv={0,{0}};
//判断是否在红黑树上
if(ev->status!=1) return;

epv.data.ptr=NULL;
ev->status=0; //修改状态
epoll_ctl(efd,EPOLL_CTL_DEL,ev->fd,&epv); //从红黑树efd上将ev->fd摘除

return;
}


//监听套接字lfd回调函数,当有新客户端连接时,调用该函数与客户端建立连接
void acceptconn(int lfd,int events,void *arg){
struct sockaddr_in cin; //客户端地址结构
socklen_t len=sizeof(cin);
int cfd,i;
//使用错误封装函数Accept
// cfd=Accept(lfd,(struct sockaddr *)&cin,&len);
//源码写法
if((cfd=accept(lfd,(struct sockaddr *)&cin,&len))==-1){
if(errno!=EAGAIN&&errno!=EINTR){
/*暂时不做出错处理*/
}
//打印出错信息
printf("%s:accept,%s\n",__func__,strerror(errno)); //宏__func__表示当前函数名

}
do {
//从g_events中找到可用的空闲位置
for(i=0;i<MAX_EVENTS;++i){
if(g_events[i].status==0) break; //找到了空闲位置
}
//g_events没有空闲位置
if(i==MAX_EVENTS){
printf("%s:max connect limit[%d]\n",__func__,MAX_EVENTS);
break;
}
int flag=0;
//设置cfd非阻塞
if((flag=fcntl(cfd,F_SETFL,(fcntl(cfd,F_GETFL)|O_NONBLOCK)))<0){
printf("%s:fcntl nonblocking failed,%s\n",__func__,strerror(errno));
break;
}

//给cfd设置一个myevent_s结构体,回调函数设置为recvdata
eventset(&g_events[i],cfd,recvdata,&g_events[i]);
eventadd(g_efd,EPOLLIN,&g_events[i]);
}while (0);

printf("new connect [%s:%d][time:%ld],pos[%d]\n",inet_ntoa(cin.sin_addr),ntohs(cin.sin_port),g_events[i].last_active,i);

return;
}



void recvdata(int fd,int events,void *arg){
struct myevent_s *ev=(struct myevent_s *)arg;
//读文件描述符,数据存入myevent_s成员buf中
int len=recv(fd,ev->buf,sizeof(ev->buf),0);

eventdel(g_efd,ev); //将该节点从红黑树上摘除

if(len>0){
ev->len=len;
ev->buf[len]='\0'; //手动添加字符串结束标记
printf("C[%d]:%s\n",fd,ev->buf);

eventset(ev,fd,senddata,ev); //设置该fd对应的回调函数为senddata
eventadd(g_efd,EPOLLOUT,ev); //将fd加入红黑树g_efd,监听其写事件
}else if (len==0) {
close(ev->fd);
//ev-g_events地址相减得到偏移元素位置
printf("[fd=%d] pos[%ld],closed\n",fd,ev-g_events);
}else {
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n",fd,errno,strerror(errno));
}

return;
}



void senddata(int fd,int events,void *arg){
struct myevent_s *ev=(struct myevent_s *)arg;
int len=send(fd,ev->buf,ev->len,0); //直接将数据回写给客户端,不作处理,形成回声服务器

eventdel(g_efd,ev); //从红黑树g_efd中摘除

if(len>0){
printf("send[fd=%d],[%d]%s\n",fd,len,ev->buf);
eventset(ev,fd,recvdata,ev); //将该fd的回调函数改为recvdata
eventadd(g_efd,EPOLLIN,ev); //重新添加到红黑树上,设为监听读事件
}else{
close(ev->fd); //关闭连接
printf("send[fd=%d] error %s\n",fd,strerror(errno));
}

return;
}



//创建socket,初始化lfd
void initlistensocket(int efd,short port){
struct sockaddr_in sin;
//设置非阻塞
int lfd=Socket(AF_INET,SOCK_STREAM|SOCK_NONBLOCK,0);
/* int lfd=Socket(AF_INET,SOCK_STREAM,0);
fcntl(lfd, F_SETFL,(fcntl(lfd,F_GETFL))|O_NONBLOCK); */
int opt=1;
int ret=setsockopt(lfd,SOL_SOCKET,SO_REUSEADDR,(void *)&opt,sizeof(opt));
if(ret!=0){
perr_exit("setsockopt error");
}
bzero(&sin,sizeof(sin));
sin.sin_family=AF_INET;
sin.sin_port=htons(port);
sin.sin_addr.s_addr=htonl(INADDR_ANY);

Bind(lfd,(struct sockaddr *)&sin,sizeof(sin));

Listen(lfd,128);

//设置最后一个元素为监听套接字
eventset(&g_events[MAX_EVENTS],lfd,acceptconn,&g_events[MAX_EVENTS]);

//将监听套接字(使用myevent_s结构体)加入到监听红黑树中
eventadd(efd,EPOLLIN,&g_events[MAX_EVENTS]);

return;
}


int main (int argc, char *argv[]) {
unsigned short port=SER_PORT;

//使用用户指定端口,如未指定,使用默认端口
if(argc==2){
port=atoi(argv[1]);
}
//创建红黑树,返回给全局g_efd
g_efd=epoll_create(MAX_EVENTS+1);
if(g_efd<=0){
printf("create efd in %s err %s\n",__func__,strerror(errno));
}

//创建socket,初始化lfd
initlistensocket(g_efd, port);

struct epoll_event events[MAX_EVENTS+1]; //保存已经满足就绪事件的文件描述符数组
printf("server running:port[%d]\n",port);
int checkpos=0,i;
//超时验证,每次测试100个,不测试lfd,当客户端60s内没有和服务端通信,则客户端关闭连接
while (1) {
long now=time(NULL);
//一次循环检测100个,使用checkpos控制检测对象
for(i=0;i<100;++i,++checkpos){
if(checkpos==MAX_EVENTS) checkpos=0;
//不在红黑树上
if(g_events[checkpos].status!=1) continue;

//客户端不活跃时间,也就是没跟服务端通信的时间
long duration=now-g_events[checkpos].last_active;
//因为如果客户端与服务端通信,则都会调用eventset将last_active进行更新,所以不会产生冤枉断开客户端
if(duration>=60){
//超时关闭连接
Close(g_events[checkpos].fd);
printf("[fd=%d] timout\n",g_events[checkpos].fd);
eventdel(g_efd,&g_events[checkpos]);
}
}
//监听红黑树g_efd,将满足的事件的文件描述符加入到events数组中,非阻塞模式,1s没有事件满足,返回0
int nfd=epoll_wait(g_efd,events,MAX_EVENTS+1,1000);
if(nfd<0){
printf("epoll_wait error,exit\n");
break;
}

for(i=0;i<nfd;++i){
//使用自定义结构体myevent_s类型指针,接收联合体data的void *ptr成员
struct myevent_s *ev=(struct myevent_s *)events[i].data.ptr;

//读事件就绪
if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)){
ev->call_back(ev->fd,events[i].events,ev->arg);
}
if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)){
ev->call_back(ev->fd,events[i].events,ev->arg);
}
}
}
// 退出前释放所有资源
return 0;
}

线程池

引言与分析

  • 在多进程、多线程并发以及多路io转接服务器中,我们说多路IO转接服务器的效率较高,是因为多进程、多线程服务器会为客户端的到来而创建进程和线程,然后为客户端的离开将进程、线程销毁回收掉,而频繁这样操作将会大大消耗系统资源,而多路IO复用不需要,它只有一个进程,很多事情也交给了内核处理
  • 对于以上问题,我们可以避免多次的创建进程和线程,我们可以在程序开始时就一次性创建多个线程或进程来处理。
  • 这样就引出了以下的线程池

概念

  • 线程池:顾名思义就是一个放线程的池子(线程聚集的地方),里面有很多线程,用于管理程序的线程创建与销毁回收的,会在程序开始时创建一堆线程。
  • 对于多线程并发服务器中,我们使用的是类似于生产者消费者模型,线程池的作用就是用于处理客户端发送的数据,当有一个客户端发送数据来时,线程池就会为此分配一个线程来处理

实现前提分析

  • 初始线程数:当我们要为服务端起始创建线程池,我们需要对具体服务功能分析出初始线程数,并设定初始线程池中线程数
  • 线程池扩容机制:为了解决高峰期线程不够以及不及时补给问题,我们需要设置线程池的扩容机制,我们可以根据线程池空闲线程live_num跟正在运行的线程数busy_num的比例来进行扩容,例如当我的线程池为38时,当busy_num/live_num大于等于80%时就进行扩容指定个数的线程
  • 线程池最大线程数有了扩容机制,我们也不能无限的扩容,所以得设置线程池最大线程数,防止扩容越界
  • 线程池瘦身机制:有了扩容机制相对应的也得有瘦身机制,当高峰期过去时,我们就不需要这么多的线程来处理工作了,就得进行减少线程池中的线程数,因此我依然可以根据线程池空闲线程live_num跟正在运行的线程数busy_num的比例来进行瘦身,例如当busy_num/live_numx小于等于20%时就进行减少指定个数的线程
  • 管理线程对于以上扩容、瘦身操作,我们也不应该让服务器进行处理增加服务器压力,因此我们需要创建一个管理线程来专门进行管理线程池,让他进行扩容、瘦身和销毁等操作
  • :我们使用线程池的线程进行处理数据,对于任务队列进行读写处理等操作,为了保证数据的正确性,所以我们使用锁来进行线程同步
  • 条件变量:对于生产者消费者模型,我们线程应该当且仅当任务队列有任务才进行竞争处理,在没有数据时,我们不应该使用阻塞的方式去等待,因此我们需要使用条件变量来进行非阻塞等待,当任务队列中有任务要处理再将线程唤醒

具体实现

  1. 程序运行开始时,先创建线程池并初始化线程池结构体成员变量,为线程池结构体部分成员分配空间,锁、条件变量的初始化以及为处理任务线程、管理线程绑定执行主体函数
  2. 当线程池初始化后,会创建最小线程池大小的线程数,然后阻塞等待条件变量(队列不为空时,就会有任务来的条件变量)的唤醒
  3. 任务到来,如果任务队列满了,就将会阻塞等待队列不为满的条件变量的唤醒否则将其添加到线程池中的任务队列中,然后将阻塞在队列不为空的条件变量上的线程唤醒
  4. 抢到任务线程从任务队列中取出任务后(任务队列出队),然后就可以通知阻塞等待队列不为满的线程(如果任务队列满了的时候,没满不会影响),告诉他们可以添加任务了,然后抢到任务的线程将执行任务
  5. 任务处理结束后,则线程自行退出
  6. 管理线程将会循环检查线程池中的忙线程数、存活线程数(空闲线程数)、最大线程数、最小线程数和任务队列任务数之间的关系,判断是否执行扩容和瘦身
  7. 判断扩容算法,任务数大于最小线程池个数,且存活的线程数少于最大线程个数时,如:30>=10&&40<100
  8. 判断瘦身算法,忙线程*2小于存活的线程数,并且存活的线程数大于最小线程数时
  9. 当程序结束运行前,将线程池销毁,并将分配的空间资源进行释放

注意:

  • 单线程池中,内核是不会自动帮忙处理任务的回调函数,因此得手动在处理任务线程的执行主体函数手动添加处理

示例代码

  • 结构体定义

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    //各子线程任务结构体
    struct threadpool_task_t {
    void *(*function)(void *); //回调函数
    void *arg; //回调函数参数
    };


    //描述线程池相关信息
    struct threadpool_t {
    pthread_mutex_t lock; //用于锁住本结构体
    pthread_mutex_t thread_cnt; //记录忙状态线程个数的锁 ---busy_thr_num

    pthread_cond_t quque_not_full; //当任务队列满时,添加任务的线程阻塞,等待此条件变量
    pthread_cond_t quque_not_empty; //任务队列不为空时,通知等待任务的线程

    pthread_t *threads; //存放线程池中每个线程的tid,数组
    pthread_t adjust_tid; //管理线程tid
    threadpool_task_t *task_queue; //任务队列(数组首地址)

    int min_thr_num; //线程池最小线程数
    int max_thr_num; //线程池最大线程数
    int live_thr_num; //当前存活线程个数
    int busy_thr_num; //忙状态线程个数
    int wait_exit_thr_num; //要销毁的线程个数

    int queue_front; //task_queue队头下标
    int queue_rear; //task_queue队尾下标
    int queue_size; //task_queue队中实际任务数
    int queue_max_size; //task_queue队列可容纳任务数上限

    int shutdown; //标志位,线程池使用状态,true或false
    };
  • 函数定义

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //创建线程池
    threadpool_t *threadpool_create(int min_thr_num,int max_thr_num,int queue_max_size);

    //向线程池添加任务,借助回调函数处理数据
    int threadpool_add(threadpool_t *pool,void*(*function)(void *arg),void *arg);

    //销毁线程池
    int threadpool_destroy(threadpool_t *pool);

    void *threadpool_thread(void *threadpool);
    void *adjust_thread(void *threadpool);

    // int is_thread_alive(pthread_t tid);
    int threadpool_free(threadpool_t *pool);
  • 具体代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    #include <cstddef>
    #include <cstdlib>
    #include <cstring>
    #include <pthread.h>
    #include <unistd.h>
    #include <stdio.h>


    #define DEFAULT_TIME 10
    #define DEFAULT_THREAD_VARY 10
    #define true 1
    #define false 0


    threadpool_t *threadpool_create(int min_thr_num,int max_thr_num,int queue_max_size){
    int i;
    //线程池结构体
    threadpool_t *pool=NULL;

    do{
    if((pool=(threadpool_t *)malloc(sizeof(threadpool_t)))==NULL){
    printf("malloc threadpool fail\n");
    break;
    }

    pool->min_thr_num=min_thr_num;suan f
    pool->max_thr_num=max_thr_num;
    pool->busy_thr_num=0;
    pool->live_thr_num=min_thr_num; //活着的线程数初值=最小线程数
    pool->wait_exit_thr_num=0;
    pool->queue_size=0;
    pool->queue_max_size=queue_max_size;
    pool->queue_front=0;
    pool->queue_rear=0;
    pool->shutdown=false;

    //根据最大线程数上限数,给工作线程数组开辟空间,并清零
    pool->threads=(pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
    if(pool->threads==NULL){
    printf("malloc threads fail\n");
    break;
    }
    memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
    suan f
    //给任务队列开辟空间
    pool->task_queue=(threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
    if(pool->task_queue==NULL){
    printf("malloc task_queue fail\n");
    break;
    }

    if(pthread_mutex_init(&(pool->lock),NULL)!=0
    || pthread_mutex_init(&pool->thread_cnt,NULL)!=0
    || pthread_cond_init(&pool->quque_not_empty,NULL)!=0
    || pthread_cond_init(&(pool->quque_not_full), NULL)!=0){
    printf("init the lock or cond fail\n");
    break;
    }

    for(i=0;i<min_thr_num;++i){
    //pool指向当前线程池
    pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void *)pool);
    printf("start thread 0x%x...\n",(unsigned int)pool->threads[i]);
    }
    //创建管理者线程
    pthread_create(&(pool->adjust_tid),NULL,adjust_thread,(void *)pool);
    return pool;
    }while(0);
    //前面代码调用失败时,释放pool存储空间
    threadpool_free(pool);
    return NULL;
    }


    //向线程池添加一个任务
    int threadpool_add(threadpool_t *pool,void*(*function)(void *arg),void *arg){
    pthread_mutex_lock(&pool->lock);
    //==为真,队列已经满了,调wait阻塞
    while ((pool->queue_size==pool->queue_max_size)&&(!pool->shutdown)) {
    pthread_cond_wait(&pool->quque_not_full,&pool->lock);
    }

    if(pool->shutdown){
    pthread_cond_broadcast(&pool->quque_not_empty);
    pthread_mutex_unlock(&pool->lock);
    return 0;
    }
    //清空工作线程调用的回调函数的参数arg
    if(pool->task_queue[pool->queue_rear].arg!=NULL){
    pool->task_queue[pool->queue_rear].arg=NULL;
    }
    //添加任务到任务队列中
    pool->task_queue[pool->queue_rear].function=function;
    pool->task_queue[pool->queue_rear].arg=arg;
    pool->queue_rear=(pool->queue_rear+1)%pool->queue4_max_size; //队尾指针移动,模拟环形
    pool->queue_size++; //任务队列中实际任务数增加

    //添加完任务后,队列不为空,唤醒线程池中等待处理任务诉的线程
    pthread_cond_signal(&pool->quque_not_empty);
    pthread_mutex_unlock(&pool->lock);

    return 0;
    }



    void *threadpool_thread(void *threadpool){
    threadpool_t *pool=(threadpool_t*) threadpool;
    threadpool_task_t task;

    while (true) {
    //刚创建出线程,等待任务队列出现任务,否则阻塞等待任务队列里有任务后再唤醒接收任务
    pthread_mutex_lock(&pool->lock);
    //queue_size==0 说明没有任务,调wait阻塞在条件变量上,若有任务,跳过该while
    while ((pool->queue_size==0)&&(!pool->shutdown)) {
    printf("thread 0x%x is waiting\n",(unsigned int)pthread_self());
    pthread_cond_wait(&(pool->quque_not_empty),&(pool->lock));
    //被唤醒说明有任务需要分配线程处理,空闲线程减少
    if(pool->wait_exit_thr_num>0){
    pool->wait_exit_thr_num--;
    //如果线程池线程个数大于最小值时可以结束当前线程
    if(pool->live_thr_num>pool->min_thr_num){
    printf("thread 0x%x is exiting\n",(unsigned int)pthread_self());
    pool->live_thr_num--;
    pthread_mutex_unlock(&pool->lock);

    pthread_exit(NULL);
    }
    }
    }
    //如果指定了true,表示线程池关闭,则需要将线程池里的每个线程关闭,自行退出处理--销毁线程池
    if(pool->shutdown){
    pthread_mutex_unlock(&pool->lock);
    printf("thread 0x%x is exiting\n",(unsigned int)pthread_self());
    pthread_detach(pthread_self());
    pthread_exit(NULL); //线程自行结束
    }

    task.function=pool->task_queue[pool->queue_front].function;
    task.arg=pool->task_queue[pool->queue_front].arg;
    //出队,模拟环形队列
    pool->queue_front=(pool->queue_front+1)%pool->queue_max_size;
    pool->queue_size--;

    //通知可以有新的任务添加进来
    pthread_cond_broadcast(&pool->quque_not_full);

    //任务取出后,立即将线程池锁释放
    pthread_mutex_unlock(&pool->lock);

    //执行任务
    printf("thread 0x%x start working\n",(unsigned int)pthread_self());
    pthread_mutex_lock(&pool->thread_cnt); //忙线程数变量锁
    pool->busy_thr_num++; //忙线程数+1
    pthread_mutex_unlock(&pool->thread_cnt);
    //模拟处理任务
    task.function(task.arg); //执行回调函数任务
    // (*task.function)(task.arg);

    //任务结束处理
    printf("thread 0x%x end working\n",(unsigned int)pthread_self());
    pthread_mutex_lock(&pool->thread_cnt);
    pool->busy_thr_num--; //处理掉一个任务,忙线程数--
    pthread_mutex_unlock(&pool->thread_cnt);
    }
    pthread_exit(NULL);
    return NULL;
    }

    //管理线程
    void *adjust_thread(void *threadpool){
    int i;
    threadpool_t *pool=(threadpool_t *)threadpool;

    while (!pool->shutdown) {
    //定时对线程池管理
    sleep(DEFAULT_TIME);

    pthread_mutex_lock(&pool->lock);
    int queue_size=pool->queue_size; //任务数
    int live_thr_num=pool->live_thr_num; //存活线程数
    pthread_mutex_unlock(&pool->lock);

    pthread_mutex_lock(&pool->thread_cnt);
    int busy_thr_num=pool->busy_thr_num; //忙线程数量
    pthread_mutex_unlock(&pool->thread_cnt);

    //创建新线程算法:任务数大于最小线程池个数,且存活的线程数少于最大线程个数时,如:30>=10&&40<100
    if(queue_size>live_thr_num&&live_thr_num<pool->max_thr_num){
    pthread_mutex_lock(&pool->lock);
    int add=0;

    //一次增加DEFAULT_THREAD个线程
    for(i=0;i<pool->max_thr_num&&add<DEFAULT_THREAD_VARY
    &&pool->live_thr_num<pool->max_thr_num;++i){
    if(pool->threads[i]==0){
    pthread_create(&pool->threads[i],NULL,threadpool_thread,(void *)pool);
    add++;
    pool->live_thr_num++;
    }
    }

    pthread_mutex_unlock(&pool->lock);
    }

    //销毁多余的空闲线程,算法:忙线程*2小于存活的线程数,并且存活的线程数大于最小线程数时
    if((busy_thr_num*2)<live_thr_num&&live_thr_num>pool->min_thr_num){
    //一次销毁DEFAULT_THREAD个线程,随机10个即可
    pthread_mutex_lock(&pool->lock);
    pool->wait_exit_thr_num=DEFAULT_THREAD_VARY; //要销毁的线程数设置为10
    pthread_mutex_unlock(&pool->lock);

    for(i=0;i<DEFAULT_THREAD_VARY;++i){
    //通知处在空闲状态的线程,他们会自行终止
    pthread_cond_signal(&pool->quque_not_empty);
    }
    }
    }
    return NULL;
    }

    //销毁线程池
    int threadpool_destroy(threadpool_t *pool){
    int i;
    if(pool==NULL){
    return -1;
    }
    pool->shutdown=true;

    //先销毁管理线程
    pthread_join(pool->adjust_tid,NULL);

    for(i=0;i<pool->live_thr_num;++i){
    //通知所有空闲线程
    pthread_cond_broadcast(&pool->quque_not_empty);
    }
    for(i=0;i<pool->live_thr_num;++i){
    pthread_join(pool->threads[i],NULL);
    }
    threadpool_free(pool);

    return 0;
    }


    //释放线程池空间
    int threadpool_free(threadpool_t *pool){
    if(pool==NULL){
    return -1;
    }

    if(pool->task_queue){
    free(pool->task_queue);
    }
    if(pool->threads){
    free(pool->threads);
    pthread_mutex_lock(&pool->lock);
    pthread_mutex_destroy(&pool->lock);
    pthread_mutex_lock(&pool->thread_cnt);
    pthread_mutex_destroy(&pool->thread_cnt);
    pthread_cond_destroy(&pool->quque_not_empty);
    pthread_cond_destroy(&pool->quque_not_full);
    }
    free(pool);
    pool=NULL;
    return 0;
    }

    //模拟线程池中的线程,处理业务
    void *process(void *arg){
    printf("thread 0x%x working on task %d\n",(unsigned int)pthread_self(),*(int *)arg);
    sleep(1);
    printf("task %d is end\n",*(int *)arg);

    return NULL;
    }


    int main (int argc, char *argv[]) {
    //创建线程池
    threadpool_t *thp=threadpool_create(3,10,100);
    printf("pool inited\n");

    int num[100],i;
    for(i=0;i<100;++i){
    num[i]=i;
    printf("add task %d\n",i);
    //向线程池中添加任务
    threadpool_add(thp,process,(void *)&num[i]);
    }
    sleep(30); //模拟等待子线程完成任务
    threadpool_destroy(thp); //销毁线程池
    return 0;
    }

UDP服务器

概念

引言

  • UDP无连接的、不可靠的数据报传递协议,对于不稳定的网络层,采取完全不弥补的通信方式,默认还原网络状态

  • 优点

    1. 传输速度快
    2. 效率高
    3. 开销小
  • 缺点:数据流量、速度、顺序不稳定

  • 使用场景对时效性要求较高场合,稳定性其次

  • 在socket编程中,我们讲了TCP的服务器实现,相对应的UDP服务器的实现也是有必要学的,因为UDP的特性也就铸就着他的应用场景,例如:游戏、视频会议、视频电话

概念

  • UDP通信服务器基于UDP协议实现的通信服务器,根据UDP的特点,将部分的不需要的函数去除(accept和connect函数去除)
  • 使用场景对时效性要求较高场合,稳定性其次
  • 对于大厂来说,当使用TCP和各种模型搭建的服务器的速度很难再提高了,毕竟TCP的特性已经表明了,但是对于UDP,我们可以在应用层使用数据校验协议来弥补UDP的不足。

函数原型

  • ssize_t recvfrom(int sockfd,void *buffer,size_t len,int flags,struct sockaddr *src_addr,socklen_t *addrlen)适用于UDP的接收数据函数,替换tcp中的recv

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    #include <sys/socket.h>

    ssize_t recvfrom(int sockfd,void *buffer,size_t len,int flags,struct sockaddr *src_addr,socklen_t *addrlen);


    //参数
    sockfd:接收数据或请求的套接字文件描述符,数据接收地;

    buffer:缓冲区;

    len:缓冲区长度;

    flags:是一组标志参数,控制着函数的行为,一般都置为0;

    src_addr:传出参数,数据起始地地址结构;

    addrlen:数据起始地地址结构结构体长度;


    //返回值
    成功:返回接收到的字节数,0,表示连接已经关闭;

    失败:-1,errno;
  • ssize_t sendto(int sockfd,const void *buf,size_t len,int flags,const struct sockaddr *dest_addr,socklen_t addrlen)适用于UDP的发送数据的函数,代替TCP中的send

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    #include <sys/socket.h>

    ssize_t sendto(int sockfd,const void *buf,size_t len,int flags,const struct sockaddr *dest_addr,socklen_t addrlen);


    //参数
    sockfd:发送数据的套接字文件描述符,数据起始地;

    buf:要发送的数据,也就是缓冲区;

    len:缓冲区长度;

    flags:是一组标志参数,控制着函数的行为,一般都置为0;

    dest_addr:传入参数,数据目的地地址结构;

    addrlen:数据目的地地址结构体长度;


    //返回值
    成功:返回发送的字节数;

    失败:-1,errno;

server

实现思路

  1. socket建立服务端套接字文件描述符sockfd,使用报式协议
  2. bind绑定服务端地址结构到服务端套接字上
  3. 因为UDP没有三次握手,所以取消掉listen(可有可无),connect以及accept,直接进行收发数据操作
  4. 使用recvfrom接收数据
  5. 使用sendto发送数据
  6. 服务端结束运行close关闭服务端套接字

注意:注意UDP使用的是报式协议,TCP使用的是流式协议,所以socket函数的第二个参数应该为SOCK_DGRAM

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include "wrap.h"
#include <cctype>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstdio>
#include <strings.h>

#define SER_PORT 5005


int main (int argc, char *argv[]) {
struct sockaddr_in ser_addr,cli_addr;
socklen_t cli_addr_len;
int sockfd;
char buf[BUFSIZ];
//注意UDP使用的是报式协议,TCP使用的是流式协议
sockfd=Socket(AF_INET,SOCK_DGRAM,0);

bzero(&ser_addr,sizeof(ser_addr));
ser_addr.sin_family=AF_INET;
ser_addr.sin_port=htons(SER_PORT);
ser_addr.sin_addr.s_addr=htonl(INADDR_ANY);

Bind(sockfd,(struct sockaddr *)&ser_addr,sizeof(ser_addr));

printf("Accepting connections ... \n");
cli_addr_len=sizeof(cli_addr);
while (1) {
int n=recvfrom(sockfd,buf,BUFSIZ,0,(struct sockaddr *)&cli_addr,&cli_addr_len);
if(n==-1){
perror("recvfrom error");
}
printf("recvied from %s at port %d\n",inet_ntoa(cli_addr.sin_addr),ntohs(cli_addr.sin_port));
for(int i=0;i<n;++i){
buf[i]=toupper(buf[i]);
}
n=sendto(sockfd,buf,n,0,(struct sockaddr *)&cli_addr,cli_addr_len);
if(n==-1){
perror("sendto error");
}
}
close(sockfd);
return 0;
}

client

实现思路

  1. socket创建客户端套接字cfd,使用报式协议
  2. 初始化服务端地址结构
  3. 与服务端直接通信,不需要连接
  4. 使用sendto发送数据
  5. 使用recvfrom接收数据
  6. close关闭客户端套接字

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include "wrap.h"
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <strings.h>
#include <cstdio>


#define SER_PORT 5005



int main (int argc, char *argv[]) {
sockaddr_in ser_addr;
char buf[BUFSIZ];
socklen_t ser_addr_len;
int cfd=Socket(AF_INET,SOCK_DGRAM,0);

bzero(&ser_addr, sizeof(ser_addr));
ser_addr.sin_family=AF_INET;
ser_addr.sin_port=htons(SER_PORT);
ser_addr.sin_addr.s_addr=htonl(INADDR_ANY);

ser_addr_len=sizeof(ser_addr);
while (fgets(buf,BUFSIZ,stdin)!=NULL) {
int n=sendto(cfd,buf,strlen(buf),0,(sockaddr *)&ser_addr,ser_addr_len);
if(n==-1)jade-sable-f2b637.netlify.app{
perror("sendto error");
}
n=recvfrom(cfd,buf,n,0,(sockaddr *)&ser_addr,&ser_addr_len);
if(n==-1){
perror("recvfrom error");
}
Write(STDOUT_FILENO,buf,n);
}MySQL

close(cfd);
return 0;
}

本地套接字

概念

  • 本地套接字(domain socket)本地进程间通信的一种实现方式,除了本地套接字以外,其他技术,如管道、共享信息队列等也是进程间通信的常用方法,因为本地套接字开发便捷,接受度高,所以普遍适用于同一台主机上进程间通信的各种场景

  • 利用本地套接字可完成可靠字节流和数据报两种协议。

  • 本地套接字地址结构为sockaddr_un

    image-20240530221419825

sockaddr_un数据结构

1
2
3
4
5
6
#include <sys/un.h>

struct sockaddr_un{
sa_family_t sun_family; //协议族,本地套接字填AF_UNIX/AF_LOCAL
char sun_path[UNIX_PATH_MAX]; //socket通信文件名(含路径)
};

注意:因为sockaddr_un跟sockaddr的数据结构不同,因此我们不能再用sizeof求解地址结构长度,而是需要分别计算sun_family和sun_path的长度然后加起来即可

函数原型

  • int socket(int domain,int type,int protocol):创建套接字函数,将domain参数设定为AF_UNIX/AF_LOCAL即可创建本地套接字

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    #include <sys/socket.h>

    int socket(int domain, int type, int protocol);

    //参数
    domain:AF_UNIX/AF_LOCAL;

    type:SOCK_STREAM/SOCK_DGRAM;

    protocol:表示选用的协议当中代表协议,一般传0,流式传输协议代表协议为TCP,报式传输协议代表为UDP,也可以直接填IPPROTO_TCP或者IPPROTO_UDP;

    //返回值
    成功:新套接字所对应的文件描述符;

    失败:-1 errno;
  • size_t offsetof(type,member):宏函数,用于求解type结构体中的member成员变量的地址与结构体首地址的偏移量

    1
    2
    3
    #include <stddef.h>

    size_t offsetof(type, member);

sockaddr_un长度求解

1
int len=offsetof(struct sockaddr_un,sun_path)+strlen(sun_path);

服务端示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include "wrap.h"
#include <cstddef>
#include <cstring>fluid美化
#include <strings.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <ctype.h>
#include <cstdio>


#define SER_ADDR "ser.socket"

int main (int argc, char *argv[]) {
int lfd,cfd,len,size,i;
struct sockaddr_un ser_addr,cli_addr;
char buf[BUFSIZ];

lfd=Socket(AF_UNIX,SOCK_STREAM,0);

bzero(&ser_addr, sizeof(ser_addr));
ser_addr.sun_family=AF_UNIX;
strcpy(ser_addr.sun_path,SER_ADDR);

//sockaddr_un地址结构长度
len=offsetof(struct sockaddr_un, sun_path)+strlen(ser_addr.sun_path);

//确保bind之前ser.socket文件不存在,bind会创建该文件
unlink(SER_ADDR);
Bind(lfd,(struct sockaddr *)&ser_addr,len);

Listen(lfd,128);

printf("Accept ...\n");
len=sizeof(cli_addr);
while (1) {
cfd=Accept(lfd,(struct sockaddr *)&cli_addr,(socklen_t *)&len);
len-=offsetof(struct sockaddr_un, sun_path); //得到通信文件名长度
cli_addr.sun_path[len]='\0'; //确保打印时,没有乱码出现

printf("client bind filename %s\n",cli_addr.sun_path);

while ((size=Read(cfd,buf,sizeof(buf)))>0) {
for(i=0;i<size;++i){
buf[i]=toupper(buf[i]);
}
Write(cfd,buf,size);
}
Close(cfd);
}
Close(lfd);
return 0;
}

注意:

为了保证bind函数调用成功,我们需要在bind之前使用unlink将socket通信的文件删除掉,防止因为同名而调用失败

客户端示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include "wrap.h"
#include <cstddef>
#include <cstring>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <cstdio>
#include <strings.h>


#define SER_ADDR "ser.socket"
#define CLI_ADDR "cli.socket"

int main (int argc, char *argv[]) {
int cfd,len;
struct sockaddr_un ser_addr,cli_addr;
char buf[BUFSIZ];

cfd=Socket(AF_UNIX,SOCK_STREAM,0);
//不能依赖于隐式绑定,因为需要socket通信文件名
bzero(&cli_addr,sizeof(cli_addr));
cli_addr.sun_family=AF_UNIX;
strcpy(cli_addr.sun_path, CLI_ADDR);

len=offsetof(struct sockaddr_un, sun_path)+strlen(cli_addr.sun_path);

unlink(CLI_ADDR);
Bind(cfd,(struct sockaddr *)&cli_addr,len);


bzero(&ser_addr,sizeof(ser_addr));
ser_addr.sun_family=AF_UNIX;
strcpy(ser_addr.sun_path, SER_ADDR);

len=offsetof(struct sockaddr_un, sun_path)+strlen(ser_addr.sun_path);

//跟服务端连接,所以需要初始化服务器的地址结构
Connect(cfd,(struct sockaddr *)&ser_addr,len);

while (fgets(buf,sizeof(buf),stdin)!=NULL) {
Write(cfd,buf,strlen(buf));
len=Read(cfd,buf,sizeof(buf));
Write(STDOUT_FILENO,buf,len);
}

Close(cfd);
return 0;
}

注意:

  • 不能依赖于隐式绑定,因为需要socket通信文件名,所以要绑定客户端地址结构
  • 客户端还需连接服务端然后通信,所以客户端需要初始化服务端地址结构,然后进行connect连接

Linux网络编程
https://moonfordream.github.io/posts/Linux网络编程/
作者
Moon
发布于
2024年5月20日
许可协议