swoole源码-socket

iuiuiuiu 发布于 03/01 22:46
阅读 113
收藏 0

swoole.h中与socket.c相关的内容

#ifdef SW_USE_IOCTL
#define swSetNonBlock(sock)   swoole_ioctl_set_block(sock, 1)
#define swSetBlock(sock)      swoole_ioctl_set_block(sock, 0)
#else
#define swSetNonBlock(sock)   swoole_fcntl_set_block(sock, 1)
#define swSetBlock(sock)      swoole_fcntl_set_block(sock, 0)
#endif

#if defined(HAVE_KQUEUE) || !defined(HAVE_SENDFILE)
int swoole_sendfile(int out_fd, int in_fd, off_t *offset, size_t size);
#else
#include <sys/sendfile.h>
#define swoole_sendfile(out_fd, in_fd, offset, limit)    sendfile(out_fd, in_fd, offset, limit)
#endif
//socket的类型
enum swSocket_type
{
    SW_SOCK_TCP          =  1,
    SW_SOCK_UDP          =  2,
    SW_SOCK_TCP6         =  3,
    SW_SOCK_UDP6         =  4,
    SW_SOCK_UNIX_DGRAM   =  5,  //unix sock dgram
    SW_SOCK_UNIX_STREAM  =  6,  //unix sock stream
};
int swSocket_listen(int type, char *host, int port, int backlog);
int swSocket_create(int type);
int swSocket_wait(int fd, int timeout_ms, int events);
void swSocket_clean(int fd, void *buf, int len);
int swSocket_sendto_blocking(int fd, void *__buf, size_t __n, int flag, struct sockaddr *__addr, socklen_t __addr_len);
int swSocket_udp_sendto(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len);
int swSocket_udp_sendto6(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len);
int swSocket_sendfile_sync(int sock, char *filename, double timeout);
int swSocket_write_blocking(int __fd, void *__data, int __len);

socket.c

#include "swoole.h"

#include <sys/stat.h>
#include <sys/poll.h>

//socket发送文件,同步
int swSocket_sendfile_sync(int sock, char *filename, double timeout)
{
    //超时
    int timeout_ms = timeout < 0 ? -1 : timeout * 1000;
    //打开文件,获取文件的fd
    int file_fd = open(filename, O_RDONLY);
    //打开失败
    if (file_fd < 0)
    {
        swWarn("open(%s) failed. Error: %s[%d]", filename, strerror(errno), errno);
        return SW_ERR;
    }
    //文件状态
    struct stat file_stat;
    //获取文件状态
    if (fstat(file_fd, &file_stat) < 0)
    {
        swWarn("fstat() failed. Error: %s[%d]", strerror(errno), errno);
        return SW_ERR;
    }

    int n, sendn;
    //文件偏移量
    off_t offset = 0;
    //文件大小
    size_t file_size = file_stat.st_size;

    while (offset < file_size)
    {
        //写文件
        if (swSocket_wait(sock, timeout_ms, SW_EVENT_WRITE) < 0)
        {
            return SW_ERR;
        }
        else
        {
            //剩余的发送数据
            sendn = (file_size - offset > SW_SENDFILE_TRUNK) ? SW_SENDFILE_TRUNK : file_size - offset;
            //调用系统的sendfile
            n = swoole_sendfile(sock, file_fd, &offset, sendn);
            //调用失败
            if (n <= 0)
            {
                swWarn("sendfile() failed. Error: %s[%d]", strerror(errno), errno);
                return SW_ERR;
            }
            else
            {
                continue;
            }
        }
    }
    return SW_OK;
}

//clear socket buffer.
void swSocket_clean(int fd, void *buf, int len)
{
    //不断的接受直到失败,非阻塞的接收
    while (recv(fd, buf, len, MSG_DONTWAIT) > 0)
        ;
}

//Wait socket can read or write.
int swSocket_wait(int fd, int timeout_ms, int events)
{
    /*
    struct pollfd 
    {
        //文件描述符
        int fd;
        //等待的需要测试事件
        short events;
        //实际发生了的事件,也就是返回结果
        short revents;
    };
    */
    struct pollfd event;
    event.fd = fd;
    event.events = 0;

    //读事件
    if (events & SW_EVENT_READ)
    {
        event.events |= POLLIN;
    }
    //写事件
    if (events & SW_EVENT_WRITE)
    {
        event.events |= POLLOUT;
    }
    while (1)
    {
        //int poll(struct pollfd fds[], nfds_t nfds, int timeout);
        //fds:是一个struct pollfd结构类型的数组,用于存放需要检测其状态的Socket描述符;
        //每当调用这个函数之后,系统不会清空这个数组,操作起来比较方便;
        //特别是对于socket连接比较多的情况下,在一定程度上可以提高处理的效率;
        //这一点与select()函数不同,调用select()函数之后,
        //select()函数会清空它所检测的socket描述符集合,
        //导致每次调用select()之前都必须把socket描述符重新加入到待检测的集合中;
        //因此,select()函数适合于只检测一个socket描述符的情况,
        //而poll()函数适合于大量socket描述符的情况;

        //nfds:nfds_t类型的参数,用于标记数组fds中的结构体元素的总数量;

        //timeout:是poll函数调用阻塞的时间,单位:毫秒;
        int ret = poll(&event, 1, timeout_ms);

        //返回值
        //>0:数组fds中准备好读、写或出错状态的那些socket描述符的总数量;
        //==0:数组fds中没有任何socket描述符准备好读、写,或出错;此时poll超时,超时时间是timeout毫秒;
        //换句话说,如果所检测的socket描述符上没有任何事件发生的话,
        //那么poll()函数会阻塞timeout所指定的毫秒时间长度之后返回,
        //如果timeout==0,那么poll() 函数立即返回而不阻塞,
        //如果timeout==INFTIM,那么poll() 函数会一直阻塞下去,
        //直到所检测的socket描述符上的感兴趣的事件发生是才返回,
        //如果感兴趣的事件永远不发生,那么poll()就会永远阻塞下去;
        //-1: poll函数调用失败,同时会自动设置全局变量errno

        //超时
        if (ret == 0)
        {
            return SW_ERR;
        }
        //调用失败
        else if (ret < 0 && errno != EINTR)
        {
            swWarn("poll() failed. Error: %s[%d]", strerror(errno), errno);
            return SW_ERR;
        }
        else
        {
            return SW_OK;
        }
    }
    return SW_OK;
}

//非阻塞的写入
int swSocket_write_blocking(int __fd, void *__data, int __len)
{
    int n = 0;
    int written = 0;

    while (written < __len)
    {
        n = write(__fd, __data + written, __len - written);
        if (n < 0)
        {
            if (errno == EINTR)
            {
                continue;
            }
            else if (errno == EAGAIN)
            {
                //调用poll进行非阻塞写入
                swSocket_wait(__fd, SW_WORKER_WAIT_TIMEOUT, SW_EVENT_WRITE);
                continue;
            }
            else
            {
                swSysError("write %d bytes failed.", __len);
                return SW_ERR;
            }
        }
        written += n;
    }

    return written;
}

//发送UDP报文IPV4
int swSocket_udp_sendto(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len)
{
    struct sockaddr_in addr;
    //地址转换
    if (inet_aton(dst_ip, &addr.sin_addr) == 0)
    {
        swWarn("ip[%s] is invalid.", dst_ip);
        return SW_ERR;
    }
    addr.sin_family = AF_INET;
    //转换端口
    addr.sin_port = htons(dst_port);
    return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr));
}

//发送UDP报文IPV6
int swSocket_udp_sendto6(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len)
{
    struct sockaddr_in6 addr;
    bzero(&addr, sizeof(addr));
    if (inet_pton(AF_INET6, dst_ip, &addr.sin6_addr) < 0)
    {
        swWarn("ip[%s] is invalid.", dst_ip);
        return SW_ERR;
    }
    addr.sin6_port = (uint16_t) htons(dst_port);
    addr.sin6_family = AF_INET6;
    return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr));
}

//非阻塞的发送
int swSocket_sendto_blocking(int fd, void *__buf, size_t __n, int flag, struct sockaddr *__addr, socklen_t __addr_len)
{
    int n = 0;

    while (1)
    {
        n = sendto(fd, __buf, __n, flag, __addr, __addr_len);
        if (n >= 0)
        {
            break;
        }
        else
        {
            if (errno == EINTR)
            {
                continue;
            }
            else if (errno == EAGAIN)
            {
                swSocket_wait(fd, 1000, SW_EVENT_WRITE);
                continue;
            }
            else
            {
                break;
            }
        }
    }

    return n;
}

//创建socket
int swSocket_create(int type)
{
    //域
    int _domain;
    //类型
    int _type;

    switch (type)
    {
    case SW_SOCK_TCP:
        _domain = PF_INET;
        _type = SOCK_STREAM;
        break;
    case SW_SOCK_TCP6:
        _domain = PF_INET6;
        _type = SOCK_STREAM;
        break;
    case SW_SOCK_UDP:
        _domain = PF_INET;
        _type = SOCK_DGRAM;
        break;
    case SW_SOCK_UDP6:
        _domain = PF_INET6;
        _type = SOCK_DGRAM;
        break;
    case SW_SOCK_UNIX_DGRAM:
        _domain = PF_UNIX;
        _type = SOCK_DGRAM;
        break;
    case SW_SOCK_UNIX_STREAM:
        _domain = PF_UNIX;
        _type = SOCK_STREAM;
        break;
    default:
        return SW_ERR;
    }
    //创建socket,调用系统的socket
    return socket(_domain, _type, 0);
}

//监听套接字
int swSocket_listen(int type, char *host, int port, int backlog)
{
    int sock;
    int option;
    int ret;

    //IPV4的地址
    struct sockaddr_in addr_in4;
    //IPV6的地址
    struct sockaddr_in6 addr_in6;
    //UNIX内部的通信
    struct sockaddr_un addr_un;

    //创建socket
    sock = swSocket_create(type);
    //创建失败
    if (sock < 0)
    {
        swWarn("create socket failed. Error: %s[%d]", strerror(errno), errno);
        return SW_ERR;
    }
    //reuse
    option = 1;
    //设置socket
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(int));

    //unix socket
    if (type == SW_SOCK_UNIX_DGRAM || type == SW_SOCK_UNIX_STREAM)
    {
        bzero(&addr_un, sizeof(addr_un));
        unlink(host);
        addr_un.sun_family = AF_UNIX;
        strcpy(addr_un.sun_path, host);
        ret = bind(sock, (struct sockaddr*) &addr_un, sizeof(addr_un));
    }
    //IPv6
    else if (type > SW_SOCK_UDP)
    {
        bzero(&addr_in6, sizeof(addr_in6));
        inet_pton(AF_INET6, host, &(addr_in6.sin6_addr));
        addr_in6.sin6_port = htons(port);
        addr_in6.sin6_family = AF_INET6;
        ret = bind(sock, (struct sockaddr *) &addr_in6, sizeof(addr_in6));
    }
    //IPv4
    else
    {
        bzero(&addr_in4, sizeof(addr_in4));
        inet_pton(AF_INET, host, &(addr_in4.sin_addr));
        addr_in4.sin_port = htons(port);
        addr_in4.sin_family = AF_INET;
        ret = bind(sock, (struct sockaddr *) &addr_in4, sizeof(addr_in4));
    }
    //bind failed
    if (ret < 0)
    {
        swWarn("bind(%s:%d) failed. Error: %s [%d]", host, port, strerror(errno), errno);
        return SW_ERR;
    }
    //如果是UDP或者UNIX socket的话直接返回即可,不用继续监听
    if (type == SW_SOCK_UDP || type == SW_SOCK_UDP6 || type == SW_SOCK_UNIX_DGRAM)
    {
        return sock;
    }
    //listen stream socket
    ret = listen(sock, backlog);
    if (ret < 0)
    {
        swWarn("listen(%d) failed. Error: %s[%d]", backlog, strerror(errno), errno);
        return SW_ERR;
    }
    swSetNonBlock(sock);
    return sock;
}

你是不是多少有了解一点,但是你却对这个不精啊!可以加xv:phpyasi520 交流学习,分享tp,laravel,swoole,swoft微服务、SQL性能优化,分布式、高并发等教程,各种大牛都是1-78年PHP开发者,每天还有11年的架构师做课程讲解,助你进阶中高级PHP程序员,增值涨薪!

 

加载中
返回顶部
顶部