1
回答
关于多线程写文件的问题
终于搞明白,存储TCO原来是这样算的>>>   

     本人是Linux C初学者,现在需要写一个实时读取加速网卡数据并将其负载写盘的程序。我现在写的程序实现4个线程同时从网卡接收数据流,并为每一个流建立一个文件写入其负载。目前这种模式已经跑通。但是随着数据量加大,这种模式一般写到1000个文件左右,文件指针就会耗尽,程序报fopen failed!

请各位高手给想想办法,改变一下程序结构,让这个程序既能实时按每个流一个文件的形式落下负载,同时又不把文件指针耗尽。代码如下:

/*略去头文件*/

typedef struct receive_thread_info {
    pthread_t tid;
    int sid;
    unsigned int file_start_num;
    unsigned int file_finish_num;
} receive_thread_info_t;

/*
 * create a file
 */
static FILE* get_file(receive_thread_info_t *info_p)
{
    char file_name[16];
    FILE* fp;
    memset(file_name, 0, 16);
    sprintf(file_name, "%d_%d", info_p->sid, info_p->file_start_num);
    strcat(file_name, ".txt");
 //为每个流建立一个文件
    fp = fopen(file_name, "wb");
    if (fp == NULL) {
        printf("%s open faild!\n", file_name);
        exit(1); 
    }
    return fp;
}

/*
 * write packet to the file
 */
static void write_pkt(struct pktinfo *pktinfo_ptr)
{
    struct tcp_stream *tcp = pktinfo_ptr->pdata;
    FILE *fp = (FILE *)tcp->appinfo;
    struct half_stream *half_p = pag_gethalfstream(pktinfo_ptr);
    if (half_p == NULL) {
        return;
    }
   //写入负载
    if (half_p->count_new > 0) {
        fwrite(half_p->data, half_p->count_new, 1 , fp);
        fflush(fp);
    }
}

static void deal_stream(struct pktinfo *pktinfo_ptr, receive_thread_info_t *info_p)
{
    struct tcp_stream *tcp;
    FILE *fp;
  
    tcp = pktinfo_ptr->pdata;
    if (tcp->appinfo != NULL) {
        write_pkt(pktinfo_ptr);           
    } else {
            fp = get_file(info_p);
            info_p->file_start_num++;
            tcp->appinfo = (void *)fp;
            write_pkt(pktinfo_ptr);
    }
   
    if (pag_isstreamclosed(pktinfo_ptr)) {
        if (tcp->appinfo != NULL) {
            fp = (FILE *)tcp->appinfo;
            fclose(fp);
            info_p->file_finish_num++;
            tcp->appinfo = NULL;
        }
        pag_delstream(pktinfo_ptr->pdata);
    }
}

/*
 *receive thread, get stream infomation
 */
void * receive_pthread(void *arg)
{
    receive_thread_info_t *info_p = (receive_thread_info_t *)arg;
    struct pktinfo *pktinfo_p = NULL;

   
    while (1) {
            /* 从网卡的一个数据流队列中读取数据流信息,可能是流也可能是包信息 */
            pktinfo_p = pag_getstream(info_p->sid);
             /* 判断:如果是udp 包则不处理 */
            if (pktinfo_p == NULL || pktinfo_p->ipproto != 0x06) {
                usleep(10);
                continue;
            }
            /* 对每个tcp包进行处理 */
            deal_stream(pktinfo_p, info_p);
    }
    return NULL;
}

int main(int argc, char **argv)
{
    int i, ret;
    void *value_ptr;
    int sid_counter = 4;
    if ((ret = pag_open()) == -1){
     printf("pag_open error\n");
     exit(1);
    }
    pag_setlinknum(500);
    receive_thread_info_t * info_p = (receive_thread_info_t *)calloc(sid_counter,
            sizeof(receive_thread_info_t));
    if (info_p == NULL) {
        printf("calloc error\n");
        exit(1);
    }
     /* 建立线程 */
    for (i = 0; i < sid_counter; i++) {
        info_p[i].file_start_num = 0;
        info_p[i].file_finish_num = 0;
        info_p[i].sid = i;
        pthread_create(&info_p[i].tid,
            NULL, receive_pthread, info_p + i);
    }

    for (i = 0; i < sid_counter; i++) {
     pthread_join(info_p[i].tid, &value_ptr);
    }
    pag_close();
    free(info_p);
    exit(1);
}

举报
colrain
发帖于6年前 1回/2K+阅
顶部