没有合适的资源?快使用搜索试试~ 我知道了~
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ作为Storm的消息源之一是必须掌握的。该模拟项目中,使用MetaQ作为storm的消息源,MetaqSpout从指定的zkconnect及topic中读取数据,发布到节点中。此外,还写了MetaQ与storm的生产者接口,即MetaqBolt指定Topic将数据写入Metaq中供其他业务系统继续使用。好了文档说明就这些了,代码马上会随文档更新(通常情况下,代码调试成
资源推荐
资源详情
资源评论



























Storm项目:流数据监控(下)项目:流数据监控(下)
1 文档说明
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。
软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ作为Storm的
消息源之一是必须掌握的。
该模拟项目中,使用MetaQ作为storm的消息源,MetaqSpout从指定的zkconnect及topic中读取数据,发布到节点中。此外,
还写了MetaQ与storm的生产者接口,即MetaqBolt指定Topic将数据写入Metaq中供其他业务系统继续使用。
好了文档说明就这些了,代码马上会随文档更新(通常情况下,代码调试成功了我才会写文档的)。
2 MetaQ与Storm接口
2.1 MetaqSpout
2.1.1 接口说明
该接口参考自Github,作了部分修改。项目设计中,使用storm.xml.MetaqSpoutXml读取MetaqSpout对应的配置文件
MetaqSpout.xml
配置文件中,指明zkconnect的地址及端口号、metaq的root目录、对应的消费topic及其消费组(这个很重要)。
读取配置之后,将配置传递到spout的open部分进行初始化工作,主要是进行消费者参数设定(包括zkconnect、root目录、
Topic及Group设置)等。
在nextTuple方法中,进行消息(message)拉取(poll),每次拉取一条记录,发布到下一个拓扑节点中。
2.1.2 上代码
贴部分主要代码(详细参考代码包):
//构造函数,传递xml地址
public MetaqSpout(String MetaqSpoutXml) {
super();
this.metaqspoutxml = MetaqSpoutXml;
}
//实例化参数配置类
private ZKConfig zkConfig = new ZKConfig();
private MetaClientConfig metaClientConfig = new MetaClientConfig();
private final Scheme scheme = new StringScheme();
//初始化调用
public void open(final Map conf, final TopologyContext context,
final SpoutOutputCollector collector) {
//从xml中获取参数
new MetaqSpoutXml(this.metaqspoutxml).read();
this.zkConfig.zkConnect = MetaqSpoutXml.zkConnect;//"192.168.2.240:2181";
this.zkConfig.zkRoot = MetaqSpoutXml.zkRoot;//"/meta";
String topic = MetaqSpoutXml.topic;
String group = MetaqSpoutXml.group;
this.metaClientConfig.setZkConfig(this.zkConfig);
this.consumerConfig = new ConsumerConfig(group);
//final String topic = (String) conf.get(TOPIC);
if (topic == null) {
throw new IllegalArgumentException(TOPIC + " is null");
}
Integer maxSize = (Integer) conf.get(FETCH_MAX_SIZE);

if (maxSize == null) {
log.warn("Using default FETCH_MAX_SIZE");
maxSize = DEFAULT_MAX_SIZE;
}
this.id2wrapperMap = new ConcurrentHashMap<Long, MetaMessageWrapper>();
this.messageQueue = new LinkedTransferQueue<MetaMessageWrapper>();
try {
this.collector = collector;
this.setUpMeta(topic, maxSize);
}
catch (final MetaClientException e) {
log.error("Setup meta consumer failed", e);
}
}
private void setUpMeta(final String topic, final Integer maxSize) throws MetaClientException {
this.sessionFactory = new MetaMessageSessionFactory(this.metaClientConfig);
this.messageConsumer = this.sessionFactory.createConsumer(this.consumerConfig);
this.messageConsumer.subscribe(topic, maxSize, new MessageListener() {
public void recieveMessages(final Message message) {
final MetaMessageWrapper wrapper = new MetaMessageWrapper(message);
MetaqSpout.this.id2wrapperMap.put(message.getId(), wrapper);
MetaqSpout.this.messageQueue.offer(wrapper);
try {
wrapper.latch.await();
}
catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
// 消费失败,抛出运行时异常
if (!wrapper.success) {
throw new RuntimeException("Consume message failed");
}
}
public Executor getExecutor() {
return null;
}
}).completeSubscribe();
}
//关闭时调用,进行consumer的shutdown操作
public void close() {
try {
this.messageConsumer.shutdown();
}
catch (final MetaClientException e) {
log.error("Shutdown consumer failed", e);
}
try {
this.sessionFactory.shutdown();
}
catch (final MetaClientException e) {
log.error("Shutdown session factory failed", e);
}
}
//消息发布
public void nextTuple() {
if (this.messageConsumer != null) {
try {
//进行消息拉取
final MetaMessageWrapper wrapper = this.messageQueue.poll
(WAIT_FOR_NEXT_MESSAGE, TimeUnit.MILLISECONDS);
if (wrapper == null) {
return;
}
final Message message = wrapper.message;
this.collector.emit(this.scheme.deserialize(message.getData()), message.getId());
}
catch (final InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消息操作成功确认机制
public void ack(final Object msgId) {
if (msgId instanceof Long) {
final long id = (Long) msgId;

final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
if (wrapper == null) {
log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
return;
}
wrapper.success = true;
wrapper.latch.countDown();
}
else {
log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
}
}
//消费失败时返回
public void fail(final Object msgId) {
if (msgId instanceof Long) {
final long id = (Long) msgId;
final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
if (wrapper == null) {
log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
return;
}
wrapper.success = false;
wrapper.latch.countDown();
}
else {
log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
}
}
2.2 MetaqBolt
2.2.1 接口说明
该部分代码修改自Github上的Metaq异步生产者实例。设计这个Bolt的原因是,部分业务有这种需求,当经过storm实时处理
后,数据发送到下一个业务系统,当下一个业务系统也是从metaq拉取数据时,就需要我们把处理过的数据写入到metaq中
去,所以有了这个接口。
其读取配置文件的过程与MetaqSpout相似,但是没有组(Group)的概念,只需指定地址、目录及Topic(前提是Metaq上有
该Topic),则可以把数据写入metaq中。
2.2.1 上代码
该部分代码较简单,可以参考AsyncConsumer代码。
剩余14页未读,继续阅读
资源评论


weixin_38535812
- 粉丝: 5
上传资源 快速赚钱
我的内容管理 展开
我的资源 快来上传第一个资源
我的收益
登录查看自己的收益我的积分 登录查看自己的积分
我的C币 登录后查看C币余额
我的收藏
我的下载
下载帮助


最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈



安全验证
文档复制为VIP权益,开通VIP直接复制
