基于消息机制的异步架构之消息队列

清华大佬耗费三个月吐血整理的几百G的资源,免费分享!....>>>

消息队列的头文件msgqueue.h

/*
 * msgqueue.h
 *
 */


#ifndef MSGQUEUE_H_
#define MSGQUEUE_H_


#include "conn.h"


#define MAX_MSG_LENGTH (1024)
typedef struct MSG {
char buf[MAX_MSG_LENGTH];
uint16 buf_len;
CONN* c;
struct MSG* next;


} MSG;


typedef struct MSG_QUEUE {
MSG *head;
MSG *tail;
int size;
} MSG_QUEUE;


MSG_QUEUE* create_msg_queue();


extern int push_msg(MSG_QUEUE *q,CONN* c);


int empty_msg_queue( MSG_QUEUE *q);


int  get_msg(MSG_QUEUE *q ,MSG* temp_msg);


#endif /* MSGQUEUE_H_ */


消息队列的实现文件 * msgqueue.c

/*
 * msgqueue.c
 *
 */


#include "msgqueue.h"


MSG_QUEUE* create_msg_queue()
{


MSG_QUEUE* qmsg=malloc(sizeof(MSG_QUEUE)); /*申请头尾指针结点*/
if(qmsg==NULL){
fprintf(stderr, "create_msg_queue malloc error, errno: %d %m\n", errno);
return NULL;
}
MSG* msg=malloc(sizeof(MSG)); /*申请链队头结点*/
if(msg==NULL){
fprintf(stderr, "create_head_msg malloc error, errno: %d %m\n", errno);
free(qmsg);
return NULL;
}


msg->next=NULL;
qmsg->head=qmsg->tail=msg;
qmsg->size=0;
return qmsg;
}




 int  push_msg(MSG_QUEUE *q,CONN* c)
{
if(q->size>10240){
fprintf(stderr, "push_msg error,msg_queue size is over %d ,errno: %d %m\n",q->size, errno);
return -1;
}


MSG *msg;
msg=malloc(sizeof(MSG)); /*申请新结点*/
if(msg==NULL){
fprintf(stderr, "create_new_msg malloc error, errno: %d %m\n", errno);
return -1;
}
memcpy(msg->buf,c->in_buf,c->in_buf_len);
msg->buf_len=c->in_buf_len;
msg->c=c;
msg->next=NULL;
q->tail->next=msg;
q->tail=msg;
q->size++;


return 0;
}


int empty_msg_queue( MSG_QUEUE *q)
{
if (q->head==q->tail)
return 0;
else
return 1;
}




int  get_msg(MSG_QUEUE *q ,MSG* temp_msg)
{
if (q->head==q->tail){
return -1;
}
MSG *temp=q->head->next;
    if(q->head->next == q->tail) //如果要出队的结点为最后一个结点,使q->rear指向头结点防止出现悬空的指针
        q->tail = q->head;


   // msg_buf = temp->buf; //将出队的数据元素存入*e
    temp_msg->c=temp->c;
    memcpy(temp_msg->buf,temp->buf,temp->buf_len);
    temp_msg->buf_len=temp->buf_len;


    q->head->next = temp->next; //使下一个结点成为队头,如果没有下一个结点则为NULL
q->size--;
//int buf_len=temp->buf_len;
    free(temp); //删除要出队的结点
return 0;


}