select模型是socket中的一种多路IO复用模型之一,通过轮询的方式来完成多路访问控制。

一个很简单的例子来描述select模型:

幼儿园老师要照顾所有的小朋友,每天他都会轮流去问小朋友:“小朋友小朋友,你饿了吗?”

如果小朋友饿了,那么老师就给这个小朋友喂饭,否则就开始询问下一个朋友,一直循环下去直到放学。

同时,如果班级里有其他的同学来了,也把他加到询问队列。如果有哪个同学生病了,则把它踢出询问队列。

select模型的原理就是这样,把所有连接的客户端socket加入到一个集合中去,然后一直不断轮询,判断哪一个socket有数据到达,就读取数据。否则继续轮询下一个数据。

linux系统在编译的时候就固定了select模型文件描述符集合的大小为1024个,这个大小无法更改,因此,select模型只适用于并发量小于1024个的服务连接。

> grep "FD_SETSIZE" /usr/include/ -R
/usr/include/linux/posix_types.h:#define __FD_SETSIZE    1024

一、相关函数

#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

select用来判断文件描述符结合中是否有文件描述符发生了相应的事件,如果有事件则返回(这个策略可以改变)。

参数说明:

nfds: 监控的文件描述符集里最大文件描述符加1,因为此参数会告诉内核检测前多少个文件描述符的状态。

readfds:监控有读数据到达文件描述符集合,传入传出参数。

writefds:监控写数据到达文件描述符集合,传入传出参数。

exceptfds:监控异常发生达文件描述符集合,如带外数据到达异常,传入传出参数。

timeout:定时阻塞监控时间,3种情况:1.NULL,永远等下去;2.设置timeval,等待固定时间;3.设置timeval里时间均为0,检查描述字后立即返回

返回值: 文件描述符集合中有响应时间的文件描述符个数。

超时类型的结构体:

struct timeval {
    long tv_sec; /* seconds */
    long tv_usec; /* microseconds */
};

对监控描述符集合的操作:

void FD_CLR(int fd, fd_set *set); // 把文件描述符集合里的fd设置0
int FD_ISSET(int fd, fd_set *set); // 测试文件描述符集合里fd是否置1
void FD_SET(int fd, fd_set *set); // 把文件描述符集合里fd位置1
void FD_ZERO(fd_set *set); // 把文件描述符集合里所有位清0

通常情况下,我们需要一个客户端数组保存每一个客户端的状态,表示当前集合中的该元素是否已经有存在或者连接。然后还要有一个最大的文件描述符标志,用来轮询。

一旦select返回,我们要先判断是否为服务端的socket,如果是服务端的,说明有新的连接加入,把这个新连接加入到集合中去。否则则说明是客户端连接有响应,这时候需要一个一个去循环判断,看哪个socket有数据到达,然后处理。

二、示例代码

#include<stdio.h>
#include<ctype.h>
#include<sys/select.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<stdlib.h>
#include<string.h>

const int MAX_CONN_NUM  = 1024;
const int MAX_BUFF_SIZE = 1024;
const int MAX_IP_LENGTH = 16;

int main(int argc, char** argv){
    if (argc != 2){
        printf("Usage: ./server port\n");
        return 0;
    }

    fd_set t_set, all_set;
    int serv_fd, clnt_fd, max_index, max_fd, port;
    struct sockaddr_in serv_addr, clnt_addr;
    socklen_t addr_len = sizeof(clnt_addr);
    char ip[MAX_IP_LENGTH], buff[MAX_BUFF_SIZE];
    int ret, i, n, client[FD_SETSIZE]; // FD_SETSIZE = 1024

    serv_fd = socket(AF_INET, SOCK_STREAM, 0);

    bzero(&serv_addr, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = INADDR_ANY;
    serv_addr.sin_port = htons(atoi(argv[1]));

    if (-1 == bind(serv_fd, (struct sockaddr*)&serv_addr, addr_len)){
        perror("bind error");
        return 0;
    }

    if (-1 == listen(serv_fd, MAX_CONN_NUM)){
        perror("listen error");
        return 0;
    }

    printf("server start and listen %s\n", argv[1]);

    // 初始化客户端文件描述符
    for (i = 0; i < FD_SETSIZE; i++)
        client[i] = -1;

    // 初始化数组下标和最大文件描述符
    max_index = -1;
    max_fd = serv_fd;

    // 把监控文件符清零
    FD_ZERO(&all_set);
    // 添加服务端socket到fd_sets
    FD_SET(serv_fd, &all_set);

    while(1){
        t_set = all_set;
        ret = select(max_fd + 1, &t_set, 0, 0, 0);
        if (ret == -1){
            perror("select error");
            break;
        }

        if (FD_ISSET(serv_fd, &t_set)){
            // 服务端socket响应,表示有新连接
            clnt_fd = accept(serv_fd, (struct sockaddr*)&clnt_addr, &addr_len);
            if (-1 == clnt_fd){
                perror("accept error");
                break;
            }
            // 判断连接数量是否超过限制
            for (i = 0; i < FD_SETSIZE; i++){
                if (client[i] < 0){
                    client[i] = clnt_fd;
                    break;
                }
            }

            if (i == FD_SETSIZE){
                printf("too many connetions\n");
                close(clnt_fd);
                continue;
            }

            inet_ntop(AF_INET, &clnt_addr.sin_addr, ip, sizeof(clnt_addr));
            port = clnt_addr.sin_port;

            printf("connect to [%s:%d]\n", ip, port);

            // 添加新的客户端到集合
            if (i > max_index) max_index = i;
            if (clnt_fd > max_fd) max_fd = clnt_fd;

            FD_SET(clnt_fd, &all_set);
            // 如果有响应的描述符只有一个,开始下一次select
            if (--ret == 0) continue;
        }

        for (i = 0; i <= max_index; i++){
            // 轮询所有的socket
            if ((clnt_fd = client[i]) < 0){
                continue;
            }
            // 如果有数据,正常接受字符
            if (FD_ISSET(clnt_fd, &t_set)){
                bzero(buff, MAX_BUFF_SIZE);
                n = read(clnt_fd, buff, MAX_BUFF_SIZE);
                if (-1 == n){
                    perror("read error");
                    continue;
                }else if (0 == n){
                    FD_CLR(clnt_fd, &all_set);
                    client[i] = -1;
                    printf("closed!\n");
                    sleep(1);
                    close(clnt_fd);
                    continue;
                }

                printf("recv %d data: %s\n", n, buff);

                for(int j = 0; j < n; j++){
                    buff[j] = toupper(buff[j]);
                }

                n = write(clnt_fd, buff, n);
                if (-1 == n){
                    perror("write error");
                    continue;
                }
            }
        }
    }
    close(serv_fd);
    return 0;
}
最后修改:2018 年 04 月 09 日
如果觉得我的文章对你有用,请随意赞赏