/* main.h 定义了三个数据结构 conn_queue_item,work_thread,dispatch_thread. conn_queue_item 只是存dispatch_thread accept 以后的描述符,然后 dispatch_thread 将conn_queue_item 存入某一个work_thread. work_thread 真正负责work的thread. dispatch_thread 监听9877端口,并且将accept后的fd传给work_thread. */ #ifndef MAINH #define MAINH

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <event.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>
#include <errno.h>

typedef struct conn_queue_item CQ;

struct conn_queue_item {
    int sfd;
};

struct WORK_THREAD {
    pthread_t thread_id;
    struct event_base *base;
    struct event notify_event;
    int notify_receive_fd;
    int notify_send_fd;
    struct conn_queue_item cq;
};
typedef struct WORK_THREAD wk_thread;

struct DISPATCH_THREAD {
    pthread_t thread_id;
    struct event_base *base;
};
typedef struct DISPATCH_THREAD dh_thread;

#endif

主要的执行函数main.c
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <event.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>
#include <errno.h>
#include "main.h"

static struct event_base *main_base;

void call_accept(int fd, short event, void *arg)
{
    fputs("a socket has come\n", stdout);
    struct sockaddr_in cliaddr;
    socklen_t clilen;
    int connfd;
    connfd = accept(fd, (struct sockaddr *) &cliaddr, &clilen);
    dispatch_new_thread(connfd);
}



int main()
{

    int listenfd, connfd;
    struct sockaddr_in cliaddr, servaddr;
    socklen_t clilen;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9877);
    bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
    listen(listenfd, 10);

    struct event_base *mb;
    struct event ev;
    mb = event_init();
    event_set(&ev, listenfd, EV_READ | EV_PERSIST, call_accept, &ev);
    //event_base_set(main_base, &ev);
    event_add(&ev, NULL);
    printf("add the event\n");

    thread_init(10, mb);

    printf("block before accept\n");
    event_base_loop(mb, 0);

    return 0;
}

线程池模型. 每一个work_thread监听自己的notify_receive_fd READ 事件,然后dispatch_thread 往notify_receive_fd 写入一字节的数据.接着 work_thread
就处理从dispatch_thread 传送过来的fd 的请求
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <event.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <string.h>
#include <fcntl.h>
#include <pthread.h>
#include <errno.h>
#include "main.h"


static dh_thread dispatch_thread;

static wk_thread *threads;

static int last_thread = -1;

void dispatch_new_thread(int fd)
{
    int tid = (last_thread + 1) % 10;
    wk_thread *thread = threads + tid;

    thread->cq.sfd = fd;
    write(thread->notify_send_fd, "", 1);
}


void thread_libevent_process(int fd, short which, void *arg)
{
    wk_thread *work_thread = arg;
    char unuse[1];
    if (read(fd, unuse, 1) != 1) {
        fprintf(stderr, "Can't read from libevent\n");
    }
    char buf[100];
    int n;
    n = read(work_thread->cq.sfd, buf, 100);
    write(work_thread->cq.sfd, buf, n);
}

void setup_thread(wk_thread *work_thread)
{   
    work_thread->base = event_init();
    if (!work_thread->base) {
        fprintf(stdout, "Can't allocate event base\n");
        exit(1);
    }

    event_set(&work_thread->notify_event, work_thread->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, work_thread);
    event_base_set(work_thread->base, &work_thread->notify_event);
    if (event_add(&work_thread->notify_event, 0) == -1) {
        fprintf(stdout, "Can't add libevent notify pipe\n");
        exit(1);
    }
}

void worker_libevent(void *arg)
{
    wk_thread *work_thread = arg;
    event_base_loop(work_thread->base, 0);
}

void create_worker(void *(*func)(void *), void *arg)
{
    pthread_t thread;
    pthread_attr_t attr;
    int ret;
    pthread_attr_init(&attr);

    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stdout, "Can't create thread: %s\n", strerror(ret));
        exit(1);
    }
}


void thread_init(int t_num, struct event_base *main_base)
{
    dispatch_thread.base = main_base;
    dispatch_thread.thread_id = pthread_self();
    int i;
    threads = calloc(t_num, sizeof(wk_thread));
    if (!threads) {
        perror("Can't alloc so many thread\n");
        exit(1);
    }

    for (i = 0; i < t_num; i++) {
        int fds[2];
        if (pipe(fds)) {
            perror("can't pipe\n");
            exit(1);
        }
        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        setup_thread(&threads[i]);
    }

    for (i = 0; i < t_num; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

}

<
Previous Post
Memcache threads analysis
>
Next Post
Memcache 内存分配