NET下RabbitMQ实践[实战篇]

长平狐 发布于 2012/11/06 18:42
阅读 397
收藏 1
     之前的文章中,介绍了如何将RabbitMQ以WCF方式进行发布。今天就介绍一下我们产品中如何使用RabbitMQ的!
     
     在Discuz!NT企业版中,提供了对HTTP错误日志的记录功能,这一点对企业版非常重要,另外存储错误日志使用了MongoDB,理由很简单,MongoDB的添加操作飞快,即使数量过亿之后插入速度依旧不减。
     
     在开始正文之前,先说明一下本文的代码分析顺序,即:程序入口==》RabbitMQ客户端===>RabbitMQ服务端。好了,闲话少说,开始正文!    
     
     首先是程序入口,也就是WCF+RabbitMQ客户端实现:    
     
     因为Discuz!NT使用了HttpModule方式来接管HTTP链接请求,而在.NET的HttpModule模板中,可以通过如下方法来接管程序运行时发生的ERROR,如下:    
    
context.Error  +=   new  EventHandler(Application_OnError);
    
     而“记录错误日志"的功能入口就在这里:    
   
public   void  Application_OnError(Object sender, EventArgs e)
    {
        
string  requestUrl  =  DNTRequest.GetUrl();
        HttpApplication application 
=  (HttpApplication)sender;
        HttpContext context 
=  application.Context;

#if  EntLib
        
if  (RabbitMQConfigs.GetConfig()  !=   null   &&  RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable) // 当开启errlog错误日志记录功能时
        {
            RabbitMQClientHelper.GetHttpModuleErrLogClient().AsyncAddLog(
new  HttpModuleErrLogData(LogLevel.High, context.Server.GetLastError().ToString())); // 异步方式
            
// RabbitMQHelper.GetHttpModuleErrLogClient().AddLog(new HttpModuleErrLogData(LogLevel.High, "wrong message infomation!")); // 同步方式
             return ;
        }
#endif
        ...
     }


     
     当然从代码可以看出,记录日志的工作基本是通过配置文件控制的,即“HttpModuleErrLog.Enable”。
     
     而RabbitMQClientHelper是一个封装类,主要用于反射生成IHttpModuleErrlogClient接口实例,该实例就是“基于WCF发布的RabbitMQ”的客户端访问对象。
    
///   <summary>
///  RabbitMQ
///   </summary>
public   class  RabbitMQClientHelper
{
    
static  IHttpModuleErrlogClient ihttpModuleErrLogClient;

    
private   static   object  lockHelper  =   new   object ();

    
public   static  IHttpModuleErrlogClient GetHttpModuleErrLogClient()
    {
        
if  (ihttpModuleErrLogClient  ==   null )
        {
            
lock  (lockHelper)
            {
                
if  (ihttpModuleErrLogClient  ==   null )
                {
                    
try
                    {
                        
if  (RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)
                        {
                            ihttpModuleErrLogClient 
=  (IHttpModuleErrlogClient)Activator.CreateInstance(Type.GetType(
                                  
" Discuz.EntLib.RabbitMQ.Client.HttpModuleErrLogClient, Discuz.EntLib.RabbitMQ.Client " false true ));
                        }
                    }
                    
catch
                    {
                        
throw   new  Exception( " 请检查 Discuz.EntLib.RabbitMQ.dll 文件是否被放置到了bin目录下! " );
                    }
                }
            }
        }
        
return  ihttpModuleErrLogClient;
    }
}
    
    可以看出它反射的是Discuz.EntLib.RabbitMQ.dll文件的HttpModuleErrLogClient对象(注:使用反射的原因主要是解决企业版代码与普遍版代码在项目引用上的相互依赖),下面就是其接口和具体要求实现:       
  
     ///   <summary>
    
///  IHttpModuleErrlogClient客户端接口类,用于反射实例化绑定
    
///   </summary>
     public   interface  IHttpModuleErrlogClient
    {
        
void  AddLog(HttpModuleErrLogData httpModuleErrLogData);

        
void  AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData);
    }
    
    
public   class  HttpModuleErrLogClient : IHttpModuleErrlogClient
    {
        
public   void  AddLog(HttpModuleErrLogData httpModuleErrLogData)
        {
            
try
            {
                
// ((RabbitMQBinding)binding).OneWayOnly = true;
                ChannelFactory < IHttpModuleErrLogService >  m_factory  =   new  ChannelFactory < IHttpModuleErrLogService > (GetBinding(),  " soap.amqp:///HttpModuleErrLogService " );
                m_factory.Open();
                IHttpModuleErrLogService m_client 
=  m_factory.CreateChannel();
                m_client.AddLog(httpModuleErrLogData);
                ((IClientChannel)m_client).Close();
                m_factory.Close();
            }
            
catch  (System.Exception e)
            {
                
string  msg  =  e.Message;
            }
        }

        
private   delegate   void  delegateAddLog(HttpModuleErrLogData httpModuleErrLogData);

        
public   void  AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData)
        {
            delegateAddLog AddLog_aysncallback 
=   new  delegateAddLog(AddLog);
            AddLog_aysncallback.BeginInvoke(httpModuleErrLogData, 
null null );
        }

        
public  Binding GetBinding()
        {
            
return   new  RabbitMQBinding(RabbitMQConfigs.GetConfig().HttpModuleErrLog.RabbitMQAddress);
        }
    }
   
    可以看出,AddLog方法与上一篇中的客户端内容基本上没什么太大差别,只不过它提供了同步和异步访问两种方式,这样做的目的主要是用户可根据生产环境来灵活配置。
    
    下面就来看一下RabbitMQ的服务端实现,首先看一下其运行效果,如下图:
    
    
    
    接着看一下启动rabbitmq服务的代码:   
   
public   void  StartService(System.ServiceModel.Channels.Binding binding)
    {
        m_host 
=   new  ServiceHost( typeof (HttpModuleErrLogService),  new  Uri( " soap.amqp:/// " ));
        
// ((RabbitMQBinding)binding).OneWayOnly = true;
        m_host.AddServiceEndpoint( typeof (IHttpModuleErrLogService), binding,  " HttpModuleErrLogService " );
        m_host.Open();
        m_serviceStarted 
=   true ;            
    }    


    
    上面代码会添加IHttpModuleErrLogService接口实现类HttpModuleErrLogService 的Endpoint,并启动它,下面就是该接口声明:
   
     ///   <summary>
    
///  IHttpModuleErrLogService接口类
    
///   </summary>   
    [ServiceContract]
    
public   interface  IHttpModuleErrLogService
    {
        
///   <summary>
        
///  添加httpModuleErrLogData日志信息
        
///   </summary>
        
///   <param name="httpModuleErrLogData"></param>
        [OperationContract]
        
void  AddLog(HttpModuleErrLogData httpModuleErrLogData);
    }


   
    
    代码很简单,就是定义了一个添加日志的方法:void AddLog(HttpModuleErrLogData httpModuleErrLogData)
        
    下面就是接口的具体实现,首先是类声明及初始化代码:   
   
[ServiceBehavior(InstanceContextMode  =  InstanceContextMode.Single)]  // Single - 为所有客户端调用分配一个服务实例。
     public   class  HttpModuleErrLogService : IHttpModuleErrLogService
    {    
        
///   <summary>
        
///  获取HttpModuleErrLogInfo配置文件对象实例
        
///   </summary>
         private   static  HttpModuleErrLogInfo httpModuleErrorLogInfo  =  RabbitMQConfigs.GetConfig().HttpModuleErrLog;
        
///   <summary>
        
///  定时器对象
        
///   </summary>
         private   static  System.Timers.Timer _timer;
        
///   <summary>
        
///  定时器的时间
        
///   </summary>
         private   static   int  _elapsed  =   0 ;

        
public   static   void  Initial(System.Windows.Forms.RichTextBox msgBox,  int  elapsed)
        {
            _msgBox 
=  msgBox;
            _elapsed 
=  elapsed;

            
// 初始定时器
             if  (_elapsed  >   0 )
            {
                _timer 
=   new  System.Timers.Timer() { Interval  =  elapsed  *   1000 ,  Enabled  =   true , AutoReset  =   true  };            
                _timer.Elapsed 
+=   new  System.Timers.ElapsedEventHandler(Timer_Elapsed);
                _timer.Start();
            }
        }

        
///   <summary>
        
///  时间到时执行出队操作
        
///   </summary>
        
///   <param name="sender"></param>
        
///   <param name="e"></param>
         private   static   void  Timer_Elapsed( object  sender, System.Timers.ElapsedEventArgs e)
        {    
            Dequeue();    
        }


        
        可以看出,这里使用了静态定时器对象,来进行定时访问队列信息功能(“非同步出队”操作),这样设计的原因主要是为用户提供适合的配置方式,即如果不使用定时器(为0时),则系统会在日志入队后,就立即启动出队(“同步出队”)操作获取日志信息并插入到MongoDB数据库中。
      下面介绍一下入队操作实现:      
      
         ///   <summary>
        
///  添加httpModuleErrLogData日志信息
        
///   </summary>
        
///   <param name="httpModuleErrLogData"></param>
         public   void  AddLog(HttpModuleErrLogData httpModuleErrLogData)
        {
            Enqueue(httpModuleErrLogData);

            
if  (_elapsed  <= 0 // 如果使用定时器(为0时),则立即执行出队操作
                Dequeue();
        }   

        
///   <summary>
        
///  交换机名称
        
///   </summary>
         private   const   string  EXCHANGE  =   " ex1 " ;
        
///   <summary>
        
///  交换方法,更多内容参见: http://melin.javaeye.com/blog/691265
        
///   </summary>
         private   const   string  EXCHANGE_TYPE  =   " direct " ;
        
///   <summary>
        
///  路由key,更多内容参见: http://sunjun041640.blog.163.com/blog/static/256268322010328102029919/
        
///   </summary>
         private   const   string  ROUTING_KEY  =   " m1 " ;

        
///   <summary>
        
///  日志入队
        
///   </summary>
        
///   <param name="httpModuleErrLogData"></param>
         public   static   void  Enqueue(HttpModuleErrLogData httpModuleErrLogData)
        {
            Uri uri 
=   new  Uri(httpModuleErrorLogInfo.RabbitMQAddress);         
            ConnectionFactory cf 
=   new  ConnectionFactory()
            {
                UserName 
=  httpModuleErrorLogInfo.UserName,
                Password 
=  httpModuleErrorLogInfo.PassWord,
                VirtualHost 
=   " dnt_mq " ,
                RequestedHeartbeat 
=   0 ,
                Endpoint 
=   new  AmqpTcpEndpoint(uri)
            };
            
using  (IConnection conn  =  cf.CreateConnection())
            {
                
using  (IModel ch  =  conn.CreateModel())
                {
                    
if  (EXCHANGE_TYPE  !=   null )
                    {
                        ch.ExchangeDeclare(EXCHANGE, EXCHANGE_TYPE);
// ,true,true,false,false, true,null);
                        ch.QueueDeclare(httpModuleErrorLogInfo.QueueName,  true ); // true, true, true, false, false, null);
                        ch.QueueBind(httpModuleErrorLogInfo.QueueName, EXCHANGE, ROUTING_KEY,  false null );
                    }
                    IMapMessageBuilder b 
=   new  MapMessageBuilder(ch);
                    IDictionary target 
=  b.Headers;
                    target[
" header " =   " HttpErrLog " ;
                    IDictionary targetBody 
=  b.Body;
                    targetBody[
" body " =  SerializationHelper.Serialize(httpModuleErrLogData);
                    ((IBasicProperties)b.GetContentHeader()).DeliveryMode 
=   2 ; // persistMode                   
                    ch.BasicPublish(EXCHANGE, ROUTING_KEY,
                                               (IBasicProperties)b.GetContentHeader(),
                                               b.GetContentBody());
                }
            }
        }
       
        代码很简单,主要构造rabbitmq链接(ConnectionFactory)并初始化相应参数如用户名,密码,ROUTING_KEY等。
        
        然后将传入的日志对象序列化成字符串对象,赋值给targetBody["body"],这样做主要是因为我没找到更好的方法来赋值(之前尝试直接绑定httpModuleErrLogData到targetBody["body"],但在出队操作中找不到合适方法将httpModuleErrLogData对象解析出来)。
        
        下面就是出队操作:      
               
         ///   <summary>
        
///  日志出队
        
///   </summary>
         public   static   void  Dequeue()
        {       
            
string  serverAddress  =  httpModuleErrorLogInfo.RabbitMQAddress.Replace( " amqp:// " "" ).TrimEnd( ' / ' );  // "10.0.4.85:5672";
            ConnectionFactory cf  =   new  ConnectionFactory()
            {
                UserName 
=  httpModuleErrorLogInfo.UserName,
                Password 
=  httpModuleErrorLogInfo.PassWord,
                VirtualHost 
=   " dnt_mq " ,
                RequestedHeartbeat 
=   0 ,
                Address 
=  serverAddress
            };
      
            
using  (IConnection conn  =  cf.CreateConnection())
            {
                
using  (IModel ch  =  conn.CreateModel())
                {
                    
while  ( true )
                    {
                        BasicGetResult res 
=  ch.BasicGet(httpModuleErrorLogInfo.QueueName,  false );
                        
if  (res  !=   null )
                        {
                            
try
                            {
                                
string  objstr  =  System.Text.UTF8Encoding.UTF8.GetString(res.Body).Replace( " \0\0\0body\0\n " "" ); // 去掉头部信息
                                 object  obj  =  SerializationHelper.DeSerialize( typeof (HttpModuleErrLogData), objstr);
                                HttpModuleErrLogData httpModuleErrLogData 
=  obj  as  HttpModuleErrLogData;
                                
if  (httpModuleErrLogData  !=   null )
                                {
                                    MongoDbHelper.Insert(
new  Mongo(httpModuleErrorLogInfo.MongoDB),  " dnt_httpmoduleerrlog " , LoadAttachment(httpModuleErrLogData));
                                    _msgBox.BeginInvoke(
new  ShowMsg(SetMsgRichBox),  " \r发生时间: "   +  httpModuleErrLogData.TimeStamp  +   " \r错误等级: "   +  httpModuleErrLogData.Level  +   " \r详细信息: "   +  httpModuleErrLogData.Message);
                                    ch.BasicAck(res.DeliveryTag, 
false );
                                }
                            }
                            
catch  { }
                        }
                        
else
                            
break ;
                    }
                }
            }           
        } 


        
        出队操作也是先实例化链接到rabbitmq 的实例,并循环使用BasicGet方法来单条获取队列信息,并最终将res.Body的数据序列化成HttpModuleErrLogData对象,并最终插入到mongodb数据库中。同时将获取的队列消息显示出来:
        
    _msgBox.BeginInvoke( new  ShowMsg(SetMsgRichBox),  " \r发生时间: "   +  httpModuleErrLogData.TimeStamp  +   " \r错误等级: "   +  httpModuleErrLogData.Level  +   " \r详细信息: "   +  httpModuleErrLogData.Message);
        
        这里使用异步方式显示出队的日志信息,其声明的delegate 方法“ShowMsg”如下:
      
         ///   <summary>
        
///  声明委托
        
///   </summary>
        
///   <param name="message"></param>
         public   delegate   void  ShowMsg( string  message);
        
///   <summary>
        
///  绑定到上面delegate的方法
        
///   </summary>
        
///   <param name="outPut"></param>
         public   static   void  SetMsgRichBox( string  outPut)
        {
            _msgBox.Text 
+=   " \r==================================\r下列错误信息出队时间=> "   +  DateTime.Now  +  outPut  +   " \r " ;
        }


        
        同时使用LoadAttachment方法来实现HttpModuleErrLogData到mongodb的Document类型的转换:       
      
         ///   <summary>
        
///  将HttpModuleErrLogData转换成Document类型
        
///   </summary>
        
///   <param name="httpModuleErrLogData"></param>
        
///   <returns></returns>
         public   static  Document LoadAttachment(HttpModuleErrLogData httpModuleErrLogData)
        {
           Document doc 
=   new  Document();
            doc[
" _id " =  httpModuleErrLogData.Oid;
            doc[
" level " =  httpModuleErrLogData.Level;
            doc[
" message " =  httpModuleErrLogData.Message;
            doc[
" timestamp " =  httpModuleErrLogData.TimeStamp;
            
return  doc;
        }       
    
    
     到这里,主要的功能介绍就差不多了。当然本文所阐述的只是一个原型,相信会随着对rabbitmq的理解深入而不断完善,感兴趣的朋友欢迎讨论交流,以纠正我认识上的偏差,呵呵。
    
   原文链接: http://www.cnblogs.com/daizhj/archive/2010/10/25/1860442.html   
   Tags:discuz!nt,Rabbitmq,NET,mongodb
   BLOG: http://daizhj.cnblogs.com/
   作者:daizhj,代震军
原文链接:http://www.cnblogs.com/daizhj/archive/2010/10/25/1860442.html
加载中
返回顶部
顶部