norbert Hello World 教程

韭菜根 发布于 2012/07/26 16:10
阅读 1K+
收藏 4

为了避免依赖的问题,我采用了maven创建项目

下面是依赖的包

<dependency>
  		<groupId>com.linkedin</groupId>
  		<artifactId>norbert_2.8.1</artifactId>
  		<version>0.6.12</version>
  	</dependency>
  	<dependency>
  		<groupId>org.scala-lang</groupId>
  		<artifactId>scala-library</artifactId>
  		<version>2.8.1</version>
  	</dependency>
  	<dependency>
  		<groupId>com.google.protobuf</groupId>
  		<artifactId>protobuf-java</artifactId>
  		<version>2.4.0a</version>
  	</dependency>
  	<dependency>
  		<groupId>org.apache.zookeeper</groupId>
  		<artifactId>zookeeper</artifactId>
  		<version>3.3.3</version>
  	</dependency>

  先上一个norbert官方的例子,官方的缺少protoc生成消息格式类的介绍,具体生成办法请参考 我的另一篇文章<protobuf介绍>

  

class PingSerializer implements Serializer<Ping, Ping> {
    public String requestName() {
      return "ping";
    }
    public String responseName() {
      return "pong";
    }
    public byte[] requestToBytes(Ping message) {
      return NorbertExampleProtos.Ping.newBuilder().setTimestamp(message.timestamp).build().toByteArray();
    }
    public Ping requestFromBytes(byte[] bytes) {
      try {
        return new Ping(NorbertExampleProtos.Ping.newBuilder().mergeFrom(bytes).build().getTimestamp());
      } catch (InvalidProtocolBufferException e) {
         System.out.println("Invalid protocol buffer exception " + e.getMessage());
         throw new IllegalArgumentException(e);
      }
    }
    public byte[] responseToBytes(Ping message) {
      return requestToBytes(message);
    }
    public Ping responseFromBytes(byte[] bytes) {
      return requestFromBytes(bytes);
    }
}

必须要实现 serializer接口,这是使用netty传输时对内容进行转换的

NorbertExampleProtos这个类是通过.proto文件生成的,具体生成办法请参考 我的另一篇文章<protobuf介绍>

下面是客户端和服务端的写法, 直接运行就可以看到结果, 据我查到的情况是 norbert能支持1000并发以上没问题。这个对企业大并发还不是很了解,不评论

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import proto.Request;
import proto.RequestResponseSerializer;
import proto.Response;
import com.linkedin.norbert.cluster.InvalidClusterException;
import com.linkedin.norbert.javacompat.cluster.ClusterClient;
import com.linkedin.norbert.javacompat.cluster.ClusterListener;
import com.linkedin.norbert.javacompat.cluster.Node;
import com.linkedin.norbert.javacompat.cluster.ZooKeeperClusterClient;
import com.linkedin.norbert.javacompat.network.Endpoint;
import com.linkedin.norbert.javacompat.network.LoadBalancer;
import com.linkedin.norbert.javacompat.network.LoadBalancerFactory;
import com.linkedin.norbert.javacompat.network.NettyNetworkClient;
import com.linkedin.norbert.javacompat.network.NettyNetworkServer;
import com.linkedin.norbert.javacompat.network.NetworkClient;
import com.linkedin.norbert.javacompat.network.NetworkClientConfig;
import com.linkedin.norbert.javacompat.network.NetworkServer;
import com.linkedin.norbert.javacompat.network.NetworkServerConfig;
import com.linkedin.norbert.javacompat.network.RequestHandler;
import com.linkedin.norbert.javacompat.network.RoundRobinLoadBalancerFactory;
public class Start {
    public static void main(String[] args)
    {
        final String serviceName = "norbert";
        final String zkConnectStr = "localhost:2181";
        configCluster(serviceName, zkConnectStr);
        startServer(serviceName, 1, zkConnectStr);
        startServer(serviceName, 2, zkConnectStr);
        NetworkClientConfig config = new NetworkClientConfig();
        config.setServiceName(serviceName);
        config.setZooKeeperConnectString(zkConnectStr);
        config.setZooKeeperSessionTimeoutMillis(30000);
        config.setConnectTimeoutMillis(1000);
        config.setWriteTimeoutMillis(150);
        config.setMaxConnectionsPerNode(5);
        config.setStaleRequestTimeoutMins(10);
        config.setStaleRequestCleanupFrequencyMins(10);
        final LoadBalancerFactory myLB = new LoadBalancerFactory()
        {
            @Override
            public LoadBalancer newLoadBalancer(final Set<Endpoint> endpoints) throws InvalidClusterException
            {
                return new LoadBalancer()
                {
                    @Override
                    public Node nextNode()
                    {
                        return endpoints.iterator().next().getNode();
                    }
                };
            }
        };
        final NetworkClient nc = new NettyNetworkClient(config, new RoundRobinLoadBalancerFactory());
        //PartitionedNetworkClient<Integer> nc = new NettyPartitionedNetworkClient<Integer>(config, new IntegerConsistentHashPartitionedLoadBalancerFactory());
        //nc.registerRequest(NetqProtocol.AppendReq.getDefaultInstance(), NetqProtocol.AppendResp.getDefaultInstance());
        for(int index = 0;index<10;index++){
		    new Thread(new Runnable() {
				@Override
				public void run() {
					for(int i=0;i<10;i++){
				        final Request request = new Request(5+i);
				        System.out.println(Thread.currentThread().getName()+"-client request at "+(5+i));
				        Future<Response> pingFuture = nc.sendRequest(request, new RequestResponseSerializer());
				
				        try
				        {	
				            final Response appendResp = pingFuture.get();
				            System.out.println(Thread.currentThread().getName()+"-client got ping resp: " + appendResp.total);
				        }
				        catch( InterruptedException e )
				        {
				            e.printStackTrace();
				        }
				        catch( ExecutionException e )
				        {
				            e.printStackTrace();
				        }
			        }
					
				}
			}).start();    
       }
    }
 private static void startServer(String serviceName,final int nodeId, String zkConnectStr)
    {
        NetworkServerConfig config = new NetworkServerConfig();
        config.setServiceName(serviceName);
        config.setZooKeeperConnectString(zkConnectStr);
        config.setZooKeeperSessionTimeoutMillis(30000);
        config.setRequestThreadCorePoolSize(5);
        config.setRequestThreadMaxPoolSize(10);
        config.setRequestThreadKeepAliveTimeSecs(300);
        NetworkServer ns = new NettyNetworkServer(config);
        ns.registerHandler(new RequestHandler<Request, Response>()
        {
            @Override
            public Response handleRequest(Request request) throws Exception
            {
            	System.out.println("server_"+nodeId+":run at "+request.num);
                return new Response(request.num+10);
            }
        }, new RequestResponseSerializer());
        ns.bind(nodeId);
    }
    private static void configCluster(String serviceName, String zkConnectStr)
    {
        //ClusterClient cc = new InMemoryClusterClient("norbert");//, "localhost:2181", 30000);
        final ClusterClient cc = new ZooKeeperClusterClient(serviceName, zkConnectStr, 30000);
        cc.awaitConnectionUninterruptibly();
        cc.addListener(new ClusterListener()
        {
            @Override
            public void handleClusterConnected(Set<Node> nodes)
            {
                System.out.println("connected to cluster: " + nodes);
            }
            @Override
            public void handleClusterNodesChanged(Set<Node> nodes)
            {
                System.out.println("nodes changed: ");
                for( Node node : nodes )
                {
                    System.out.println("node: " + node);
                }
            }
            @Override
            public void handleClusterDisconnected()
            {
                final Set<Node> nodes = cc.getNodes();
                System.out.println("dis-connected from cluster: " + nodes);
            }
            @Override
            public void handleClusterShutdown()
            {
                final Set<Node> nodes = cc.getNodes();
                System.out.println("cluster shutdown: " + nodes);
            }
        });
        cc.removeNode(1);
        cc.removeNode(2);
        cc.addNode(1, "localhost:5002");
        cc.addNode(2, "localhost:5003");
//        cc.markNodeAvailable(1);
//        cc.shutdown();
    }


加载中
0
jeffsui
jeffsui
norbert 现在还更新吗
韭菜根
韭菜根
github中一直在有人提交代码
0
fir01
fir01
又过两年了,还想用呢
返回顶部
顶部