storm无法提交topology

穷困潦倒 发布于 2016/01/14 12:04
阅读 1K+
收藏 0

【Gopher China万字分享】华为云的Go语言云原生实战经验!>>>

在控台用storm jar 命令提交topology。执行后没提示错误,但用storm list命令和ui上查看,发现提交成功。 
加载中
0
逝水fox
逝水fox
你看看 是运行到runLocal的本地集群上,没有提交到Storm集群的nimbus上
0
逝水fox
逝水fox
看你的main类怎么写的
0
穷困潦倒
穷困潦倒

引用来自“逝水fox”的评论

看你的main类怎么写的


package com;

import com.bolt.*;
import com.common.*;
import com.spout.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

public class ClickTopology {

	private TopologyBuilder builder = new TopologyBuilder();
	private Config conf = new Config();
	private LocalCluster cluster;

	/*
	 * 入口
	 */
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		ClickTopology topology = new ClickTopology();
		if (args != null && args.length > 1) {
			topology.runCluster(args[0], args[1]);
		} else {
			System.out.println("Running in local mode" + "\nRedis ip missing for cluster run");
			topology.runLocal(100000);
		}
	}

	public ClickTopology() {
		builder.setSpout("clickSpout", new ClickSpout(), 10);
		builder.setBolt("repeatBolt", new RepeatVisitBolt(), 10).shuffleGrouping("clickSpout");
		builder.setBolt("geographyBolt", new GeographyBolt(new HttpIpResolver()), 10).shuffleGrouping("clickSpout");
		builder.setBolt("totalStats", new VisitorStatsBolt(), 1).globalGrouping("repeatBolt");

		builder.setBolt("geoStats", new GeoStatsBolt(), 10).fieldsGrouping("geographyBolt",
				new Fields(FieldNames.COUNTRY));
	}

	public TopologyBuilder getBuilder() {
		return builder;
	}

	public LocalCluster getLocalCluster() {
		return cluster;
	}

	private void runLocal(int runTime) {
		conf.setDebug(true);
		conf.put(ConfKeys.REDIS_HOST, "localhost");
		conf.put(ConfKeys.REDIS_PORT, "6379");
		cluster = new LocalCluster();
		cluster.submitTopology("test", conf, builder.createTopology());

		if (runTime > 0) {
			Utils.sleep(runTime);
			shutdownLocal();
		}
	}

	private void shutdownLocal() {
		if (cluster != null) {
			cluster.killTopology("test");
			cluster.shutdown();
		}
	}

	//
	private void runCluster(String name, String redisHost) throws AlreadyAliveException, InvalidTopologyException {
		conf.setNumWorkers(20);
		conf.put(ConfKeys.REDIS_HOST, redisHost);
		conf.put(ConfKeys.REDIS_PORT, "6379");
		StormSubmitter.submitTopology(name, conf, builder.createTopology());
	}

}



返回顶部
顶部