rt-thread中用消息队列实现广播功能的一种方法

长平狐 发布于 2013/03/19 18:19
阅读 659
收藏 0

前面几天在逛论坛时看见有人说RTT中没有广播机制,于是心血来潮,想自己动手写一个,于是有了此文.

1 广播机制分析

广播,这个词首先让我想到Android下的广播机制,其是基于Binder来实现的,然而RTT并不是Linux内核的东东,也没有Binder这样的IPC,RTT有自己的一套IPC机制,前面的文章中有说到信号,互斥量,事件,邮箱和消息队列,我们得从这方面动动脑筋.

再回来广播这个词,现实中,电视就是一个广播的例子,我们就以电视来举例,在现实的生活中,不同的人到电视台做节目,然后家家户户就可以在电视机中收到节目了.分析一个这个模型,不同的人到电视台做节目,这里将不同的人暂且叫做广播发送者(广播可以有多个发送方),电视台提供广播发送服务,这里就叫做服务好了,然后每家都可以打开电视机接收电视节目信号,这里每台的电视机就是电视台的一个终端.

因此,这里归结一个,在RTT中,要实现广播机制,首先,广播的发送者是线程,接收者也是线程,做为发送线程和接收线程中间就需要一个服务器,专门实现将发送线程传过来的消息转发给各个已经注册的接收线程.各个接收线程看成是这个服务器的终端.在实现这个功能,首先,作为广播消息的接收终端,那么它得有一定的缓冲能力,得保存一定条数的消息,其次,作为广播的服务器,得唤醒所有等待接收的接收线程.因此,我们得分别给服务器和客户端定义一种数据结构.

2 广播服务器和客户端的控制块

在rtdef.h头文件中,在消息队列宏内添加广播控制块的定义:

//广播服务器的控制块
struct rt_broadcast_server
{
	//客户端节点链表
    rt_list_t client_list;
};
typedef struct rt_broadcast_server *rt_broadcast_server_t;

//广播接客户端的控制块
struct rt_broadcast_client
{    //接收消息队列
     struct rt_messagequeue receive_mq;
	 //客户端节点
     rt_list_t node;
     //此广播客户端对应的服务器
     //rt_broadcast_server_t server;
};
typedef struct rt_broadcast_client *rt_broadcast_client_t;

由上述可见,广播服务器控制块只包含一个链表client_list,用来保存广播客户端. 而广播客户端的控制块定义中,包含了一个消息队列receive_mq,此队列用来缓存来自广播服务器的消息,除此之外,还有一链表节点,用来作链表操作.

3 广播的接口实现

在ipc.c实现其接口:
//初始化广播服务器
rt_err_t rt_broadcast_server_init(rt_broadcast_server_t server)
{
    RT_ASSERT(server !=RT_NULL);

	//初始化客户端链表
    rt_list_init(&(server->client_list));

    return RT_EOK;
}
RTM_EXPORT(rt_broadcast_server_init);
//初始化广播客户端
rt_err_t rt_broadcast_client_init(rt_broadcast_client_t client,
                                    const char *name,
                                    void *msgpool,
                                    rt_size_t   msg_size,
                                    rt_size_t   pool_size,
                                    rt_uint8_t  flag)
{
    rt_err_t err;

    RT_ASSERT(client !=RT_NULL);
   	//使用传入的消息池初始化接收消息队列
    err =rt_mq_init(&client->receive_mq,name,msgpool,msg_size,pool_size,flag);
    if(err !=RT_EOK)
    {
        return err;
    }

	//初始化客户端节点
    rt_list_init(&client->node);
    return RT_EOK;    
}
RTM_EXPORT(rt_broadcast_client_init);
//给广播服务器注册客户端
rt_err_t rt_broadcast_client_regist(rt_broadcast_server_t server,rt_broadcast_client_t client)
{
    register rt_base_t temp;

    RT_ASSERT(server !=RT_NULL);
    RT_ASSERT(client !=RT_NULL);
   	//开关断
    temp = rt_hw_interrupt_disable();
	//将客户端节点注册到到服务器
    rt_list_insert_after(&server->client_list,&client->node);

    //开中断
    rt_hw_interrupt_enable(temp);
    return RT_EOK;
}
RTM_EXPORT(rt_broadcast_client_regist);
//卸载一个客户端
rt_err_t rt_broadcast_client_unregist(rt_broadcast_client_t client)
{
    register rt_base_t temp;

    RT_ASSERT(client !=RT_NULL);
    //关中断
    temp = rt_hw_interrupt_disable();
	//将客户端节点从服务器上移除
    rt_list_remove(&client->node);

    //开中断
    rt_hw_interrupt_enable(temp);
    return RT_EOK;
}
RTM_EXPORT(rt_broadcast_client_unregist);
//脱离广播客户端
rt_err_t rt_broadcast_client_detach(rt_broadcast_client_t client)
{
	rt_err_t err;

    RT_ASSERT(client !=RT_NULL);

	//扯载此客户端
	err =rt_broadcast_client_unregist(client);
	if(err !=RT_EOK)
	{
		return err;
	}
	//脱离此客户端的消息队列
	return rt_mq_detach(&client->receive_mq);
}
RTM_EXPORT(rt_broadcast_client_detach);
//脱离广播服务器
rt_err_t rt_broadcast_server_detach(rt_broadcast_server_t server)
{
	struct rt_list_node *n;
    rt_broadcast_client_t client;
    rt_err_t err;
    rt_err_t result =RT_EOK;

    RT_ASSERT(server != RT_NULL);
 

	//遍历服务服务器上已注册的客户端节点
	if (!rt_list_isempty(&server->client_list))
    {
        n = server->client_list.next;
        while (n != &server->client_list)
        {
             client =rt_list_entry(n,struct rt_broadcast_client,node);

			 //脱离此客户端
             err =rt_broadcast_client_detach(client);
             if(err !=RT_EOK)
             {
                result =err;
             } 
        }
    }

    return result;

}
RTM_EXPORT(rt_broadcast_server_detach); 
//接收广播 
rt_err_t rt_broadcast_recv(rt_broadcast_client_t client,void *buffer,rt_size_t size,rt_int32_t timeout)
{
    RT_ASSERT(client != RT_NULL);
    RT_ASSERT(buffer != RT_NULL);
    RT_ASSERT(size != 0);

    return rt_mq_recv(&client->receive_mq,buffer,size,timeout);
}
RTM_EXPORT(rt_broadcast_recv); 
//发送广播
rt_err_t rt_broadcast_send(rt_broadcast_server_t server, void *buffer, rt_size_t size)
{
    struct rt_list_node *n;
    rt_broadcast_client_t client;
    rt_err_t err;
    rt_err_t result =RT_EOK;

    RT_ASSERT(server != RT_NULL);
    RT_ASSERT(buffer != RT_NULL);
    RT_ASSERT(size != 0);
    
    //进入临界区,关调度器
    rt_enter_critical();
	//遍历服务器上所有注册的客户端节点,给每个客户端发送消息
    if (!rt_list_isempty(&server->client_list))
    {
        n = server->client_list.next;
		while (n != &server->client_list)
        {
             client =rt_list_entry(n,struct rt_broadcast_client,node);
             err =rt_mq_send(&client->receive_mq,buffer,size);
             if(err !=RT_EOK)
             {
                result =err;
             }
			 n = n->next; 
        }
    }

    //出临界区,再次使用调度器
    rt_exit_critical();

    return result;
}
RTM_EXPORT(rt_broadcast_send);
//紧急发送广播
rt_err_t rt_broadcast_ugent(rt_broadcast_server_t server, void *buffer, rt_size_t size)
{
    struct rt_list_node *n;
    rt_broadcast_client_t client;
    rt_err_t err;
    rt_err_t result =RT_EOK;

    RT_ASSERT(server != RT_NULL);
    RT_ASSERT(buffer != RT_NULL);
    RT_ASSERT(size != 0);
    
    //进入临界区,关调度器
    rt_enter_critical();

	//遍历服务器上所有注册的客户端节点,给每个客户端紧急发送消息
    if (!rt_list_isempty(&server->client_list))
    {
        n = server->client_list.next;
        while (n != &server->client_list)
        {
             client =rt_list_entry(n,struct rt_broadcast_client,node);
             err =rt_mq_urgent(&client->receive_mq,buffer,size);
             if(err !=RT_EOK)
             {
                result =err;
             }
			 n = n->next; 
        }
    }

    //出临界区,再次使用调度器
    rt_exit_critical();

    return result;
}
RTM_EXPORT(rt_broadcast_ugent);


4 广播测试

下面用一个测试代码来演示广播测试的结果,在这个测试代码中,有三个接收线程用来接收广播的消息,有三个发送线程分别以三个不同的间隔时间来通过服务器来发送广播消息.

测试代码如下:

broadcast_test.h:

#ifndef _BROADCAST_TEST_H_
#define _BROADCAST_TEST_H_
#include <rtthread.h>
//消息结构体定义
struct MSG
{
	rt_uint8_t head;//消息头
	char msg_body[64];//消息体
};

int broadcast_test(void);
#endif

broadcast_test.c:

#include "broadcast_test.h"

//广播服务器
static struct rt_broadcast_server s_broadcast_server;

//客户端1
static struct rt_broadcast_client s_client_1; //广播客户端1
static rt_uint8_t s_listener1_mq_buffer[2048];
static struct rt_thread s_client_entry1;//接收线程1
static rt_int8_t s_client_entry1_stack[2048];//接收线程的线程栈
//广播接收测试线程1
static void broadcast_thread_entry_rev1(void* parameter)
{
	struct MSG msg;
	while(1)
	{
		rt_memset(&msg,0,sizeof(struct MSG));
		if(RT_EOK ==rt_broadcast_recv(&s_client_1,&msg,sizeof(struct MSG),RT_WAITING_FOREVER))
		{
			rt_kprintf("<-client1 recv msg:%02x  %s\n",msg.head,msg.msg_body);
		}
	}
}
//客户端2
static struct rt_broadcast_client s_client_2; //广播客户端2
static rt_uint8_t s_listener2_mq_buffer[2048];
static struct rt_thread s_client_entry2;//接收线程2
static rt_int8_t s_client_entry2_stack[2048];//接收线程2的线程栈
//广播接收测试线程2
static void broadcast_thread_entry_rev2(void* parameter)
{
	struct MSG msg;
	while(1)
	{
		rt_memset(&msg,0,sizeof(struct MSG));
		if(RT_EOK ==rt_broadcast_recv(&s_client_2,&msg,sizeof(struct MSG),RT_WAITING_FOREVER))
		{
			rt_kprintf("<-client2 recv msg:%02x  %s\n",msg.head,msg.msg_body);
		}
	}
}
//客户端3
static struct rt_broadcast_client s_client_3; //广播客户端3
static rt_uint8_t s_listener3_mq_buffer[2048];
static struct rt_thread s_client_entry3;//接收线程3
static rt_int8_t s_client_entry3_stack[2048];//接收线程3的线程栈
//广播接收测试线程3
static void broadcast_thread_entry_rev3(void* parameter)
{
	struct MSG msg;
	while(1)
	{
		rt_memset(&msg,0,sizeof(struct MSG));
		if(RT_EOK ==rt_broadcast_recv(&s_client_3,&msg,sizeof(struct MSG),RT_WAITING_FOREVER))
		{
			rt_kprintf("<-client3 recv msg:%02x  %s\n",msg.head,msg.msg_body);
		}
	}
}

//发送器1
static struct rt_thread s_send_entry1;//发送线程1
static rt_int8_t s_send1_entry_stack[2048];//发送线程1的线程栈
//发送线程1
static void broadcast_thread_entry_send1(void *parameter)
{
	struct MSG msg;
	rt_uint8_t temp =0;
	rt_err_t err;

	while(1)
	{
		rt_memset(&msg,0,sizeof(struct MSG));
		msg.head =temp ++;
		strncpy(msg.msg_body,"entry_send1 msg",sizeof(msg.msg_body));
		err =rt_broadcast_send(&s_broadcast_server,&msg,sizeof(struct MSG)); 
		if(RT_EOK ==err)
		{
			rt_kprintf("->send1 msg:%02x %s\n",msg.head,msg.msg_body);
		}
		else
		{
			rt_kprintf("send1 msg failed:%d\n",err);
		}

		rt_thread_delay(500);
	}
}

//发送器2
static struct rt_thread s_send_entry2;//发送线程2
static rt_int8_t s_send2_entry_stack[2048];//发送线程2的线程栈
//发送线程2
static void broadcast_thread_entry_send2(void *parameter)
{
	struct MSG msg;
	rt_uint8_t temp =0;
	rt_err_t err;

	while(1)
	{
		rt_memset(&msg,0,sizeof(struct MSG));
		msg.head =temp ++;
		strncpy(msg.msg_body,"entry_send2 msg",sizeof(msg.msg_body));
		err =rt_broadcast_send(&s_broadcast_server,&msg,sizeof(struct MSG)); 
		if(RT_EOK ==err)
		{
			rt_kprintf("->send2 msg:%02x %s\n",msg.head,msg.msg_body);
		}
		else
		{
			rt_kprintf("send2 msg failed:%d\n",err);
		}

		rt_thread_delay(300);
	}
}

//发送器3
static struct rt_thread s_send_entry3;//发送线程3
static rt_int8_t s_send3_entry_stack[2048];//发送线程3的线程栈
//发送线程3
static void broadcast_thread_entry_send3(void *parameter)
{
	struct MSG msg;
	rt_uint8_t temp =0;
	rt_err_t err;

	while(1)
	{
		rt_memset(&msg,0,sizeof(struct MSG));
		msg.head =temp ++;
		strncpy(msg.msg_body,"entry_send3 msg",sizeof(msg.msg_body));
		err =rt_broadcast_send(&s_broadcast_server,&msg,sizeof(struct MSG)); 
		if(RT_EOK ==err)
		{
			rt_kprintf("->send3 msg:%02x %s\n",msg.head,msg.msg_body);
		}
		else
		{
			rt_kprintf("send3 msg failed:%d\n",err);
		}

		rt_thread_delay(400);
	}
}


//广播测试函数
int broadcast_test(void)
{
	rt_err_t err;

	rt_broadcast_server_init(&s_broadcast_server);//初始化广播服务器

	//初始化广播客户端1
	err =rt_broadcast_client_init(&s_client_1,
								"client1",
								&s_listener1_mq_buffer[0],
								sizeof(struct MSG),
								sizeof(s_listener1_mq_buffer),
								RT_IPC_FLAG_FIFO);
	if(err !=RT_EOK)
	{
		rt_kprintf("rt_broadcast_client1_init failed:%d\n",err);
		return err;
	}
	//注册广播客户端1
	rt_broadcast_client_regist(&s_broadcast_server,&s_client_1);
	//初始化接收线程1
	err = rt_thread_init(&s_client_entry1,
								"client1",
								broadcast_thread_entry_rev1, RT_NULL,
								s_client_entry1_stack,
								sizeof(s_client_entry1_stack),								
								5, 20);

	if (err == RT_EOK)
	{
		rt_thread_startup(&s_client_entry1);
		rt_kprintf("broadcast_thread_entry_rev1 startup ok!\n");
	}
	else
	{
		rt_kprintf("broadcast_thread_entry_rev1 startup failed!\n");
	}


	//初始化广播客户端2
	err =rt_broadcast_client_init(&s_client_2,
								"client2",
								&s_listener2_mq_buffer[0],
								sizeof(struct MSG),
								sizeof(s_listener2_mq_buffer),
								RT_IPC_FLAG_FIFO);
	if(err !=RT_EOK)
	{
		rt_kprintf("rt_broadcast_client2_init failed:%d\n",err);
		return err;
	}
	//注册广播客户端2
	rt_broadcast_client_regist(&s_broadcast_server,&s_client_2);
	//初始化接收线程2
	err = rt_thread_init(&s_client_entry2,
								"client2",
								broadcast_thread_entry_rev2, RT_NULL,
								s_client_entry2_stack,
								sizeof(s_client_entry2_stack),								
								5, 20);

	if (err == RT_EOK)
	{
		rt_thread_startup(&s_client_entry2);
		rt_kprintf("broadcast_thread_entry_rev2 startup ok!\n");
	}
	else
	{
		rt_kprintf("broadcast_thread_entry_rev2 startup failed!\n");
	}

	//初始化广播客户端3
	err =rt_broadcast_client_init(&s_client_3,
								"client3",
								&s_listener3_mq_buffer[0],
								sizeof(struct MSG),
								sizeof(s_listener3_mq_buffer),
								RT_IPC_FLAG_FIFO);
	if(err !=RT_EOK)
	{
		rt_kprintf("rt_broadcast_client3_init failed:%d\n",err);
		return err;
	}
	//注册广播客户端3
	rt_broadcast_client_regist(&s_broadcast_server,&s_client_3);
	//初始化接收线程3
	err = rt_thread_init(&s_client_entry3,
								"client3",
								broadcast_thread_entry_rev3, RT_NULL,
								s_client_entry3_stack,
								sizeof(s_client_entry3_stack),								
								5, 20);

	if (err == RT_EOK)
	{
		rt_thread_startup(&s_client_entry3);
		rt_kprintf("broadcast_thread_entry_rev3 startup ok!\n");
	}
	else
	{
		rt_kprintf("broadcast_thread_entry_rev3 startup failed!\n");
	}

	//初始化发送线程1
	err =rt_thread_init(&s_send_entry1,"send1",
								broadcast_thread_entry_send1, RT_NULL,
								s_send1_entry_stack,sizeof(s_send1_entry_stack), 5, 20);

	if (err ==RT_EOK)
	{
		rt_thread_startup(&s_send_entry1);
		rt_kprintf("broadcast_thread_entry_send1 startup ok!\n");
	}
	else
	{
		rt_kprintf("broadcast_thread_entry_send1 startup failed!\n");
	}

	//初始化发送线程2
	err =rt_thread_init(&s_send_entry2,"send2",
								broadcast_thread_entry_send2, RT_NULL,
								s_send2_entry_stack,sizeof(s_send2_entry_stack), 5, 20);

	if (err ==RT_EOK)
	{
		rt_thread_startup(&s_send_entry2);
		rt_kprintf("broadcast_thread_entry_send2 startup ok!\n");
	}
	else
	{
		rt_kprintf("broadcast_thread_entry_send2 startup failed!\n");
	}

	//初始化发送线程3
	err =rt_thread_init(&s_send_entry3,"send3",
								broadcast_thread_entry_send3, RT_NULL,
								s_send3_entry_stack,sizeof(s_send3_entry_stack), 5, 20);

	if (err ==RT_EOK)
	{
		rt_thread_startup(&s_send_entry3);
		rt_kprintf("broadcast_thread_entry_send3 startup ok!\n");
	}
	else
	{
		rt_kprintf("broadcast_thread_entry_send3 startup failed!\n");
	}

	return 0;
}


测试结果:



原文链接:http://blog.csdn.net/flydream0/article/details/8614849
加载中
返回顶部
顶部