Procuder相对consumer来说比较简单,根据topic从zk拿broker列表,注意这里只拿master类型的broker,slave型的broker和master拥有同样的broker id,主要为了HA用。roubd-robin取一个partition,发送消息。
1.MetaMessageSessionFactory初始化zookeeper连接,创建MessageProducer,默认SimpleMessageProducer
2.订阅topic,是一个异步操作
public void publishTopic(final String topic, final Object ref) { if (this.topicConnectionListeners.get(topic) != null) { this.addRef(topic, ref); return; } final FutureTask<BrokerConnectionListener> task = new FutureTask<BrokerConnectionListener>(new Callable<BrokerConnectionListener>() { @Override public BrokerConnectionListener call() throws Exception { final BrokerConnectionListener listener = new BrokerConnectionListener(topic); if (ProducerZooKeeper.this.zkClient != null) { ProducerZooKeeper.this.publishTopicInternal(topic, listener); } listener.references.add(ref); return listener; } }); final FutureTask<BrokerConnectionListener> existsTask = this.topicConnectionListeners.putIfAbsent(topic, task); if (existsTask == null) { task.run(); } else { this.addRef(topic, ref); } }
订阅过程
private void publishTopicInternal(final String topic, final BrokerConnectionListener listener) throws Exception, NotifyRemotingException, InterruptedException { ///meta/brokers/topics-pub/的topic节点, final String partitionPath = this.metaZookeeper.brokerTopicsPubPath + "/" + topic; ZkUtils.makeSurePersistentPathExists(ProducerZooKeeper.this.zkClient, partitionPath); //BrokerConnectionListener处理broker变化的情况 ProducerZooKeeper.this.zkClient.subscribeChildChanges(partitionPath, listener); // 第一次要同步等待就绪 listener.syncedUpdateBrokersInfo(); }
同步broker信息过程
void syncedUpdateBrokersInfo() throws NotifyRemotingException, InterruptedException { this.lock.lock(); try { //取该topic下所有master节点,节点名称有后缀'-m',key是brokerId,value是broker地址,从节点目录/meta/brokers/ids/0/master下读取 final Map<Integer, String> newBrokerStringMap = ProducerZooKeeper.this.metaZookeeper.getMasterBrokersByTopic(this.topic); final List<String> topics = new ArrayList<String>(1); topics.add(this.topic); //返回master的topic到partition映射的map,key是topic,value是对应的master中配置的partition集合 final Map<String, List<Partition>> newTopicPartitionMap = ProducerZooKeeper.this.metaZookeeper.getPartitionsForTopicsFromMaster(topics); log.warn("Begin receiving broker changes for topic " + this.topic + ",broker ids:" + newTopicPartitionMap); // Connect to new brokers //新broker创建连接 for (final Map.Entry<Integer, String> newEntry : newBrokerStringMap.entrySet()) { final Integer newBrokerId = newEntry.getKey(); final String newBrokerString = newEntry.getValue(); // 新的有,旧的没有,创建 if (!this.brokersInfo.oldBrokerStringMap.containsKey(newBrokerId)) { ProducerZooKeeper.this.remotingClient.connect(newBrokerString, this); ProducerZooKeeper.this.remotingClient.awaitReadyInterrupt(newBrokerString); log.warn("Connect to " + newBrokerString); } } // Close removed brokers. //没用的broker删除连接 for (final Map.Entry<Integer, String> oldEntry : this.brokersInfo.oldBrokerStringMap.entrySet()) { final Integer oldBrokerId = oldEntry.getKey(); final String oldBrokerString = oldEntry.getValue(); final String newBrokerString = newBrokerStringMap.get(oldBrokerId); // 新旧都有 if (newBrokerStringMap.containsKey(oldBrokerId)) { // 判断内容是否变化 if (!newBrokerString.equals(oldBrokerString)) { log.warn("Close " + oldBrokerString + ",connect to " + newBrokerString); ProducerZooKeeper.this.remotingClient.connect(newBrokerString, this); ProducerZooKeeper.this.remotingClient.awaitReadyInterrupt(newBrokerString); ProducerZooKeeper.this.remotingClient.close(oldBrokerString, this, false); } else { // ignore } } else { // 新的没有,旧的有,关闭 ProducerZooKeeper.this.remotingClient.close(oldBrokerString, this, false); log.warn("Close " + oldBrokerString); } } // Set the new brokers info. this.brokersInfo = new BrokersInfo(newBrokerStringMap, newTopicPartitionMap); log.warn("End receiving broker changes for topic " + this.topic); } finally { this.lock.unlock(); } } }
3. 发送消息
private SendResult send0(final Message message, final byte[] encodedData, final long timeout, final TimeUnit unit) throws InterruptedException, MetaClientException { try { final String topic = message.getTopic(); Partition partition = null; String serverUrl = null; // 如果在事务内,则使用上一次发送消息时选择的broker if (this.isInTransaction()) { final LastSentInfo info = this.lastSentInfo.get(); if (info != null) { serverUrl = info.serverUrl; // 选择该broker内的某个分区 partition = this.producerZooKeeper.selectPartition(topic, message, this.partitionSelector, serverUrl); if (partition == null) { // 没有可用分区,抛出异常 throw new MetaClientException("There is no partitions in `" + serverUrl + "` to send message with topic `" + topic + "` in a transaction"); } } } //默认采用round-robin策略发送,partition里取一个 if (partition == null) { partition = this.selectPartition(message); } if (partition == null) { throw new MetaClientException("There is no aviable partition for topic " + topic + ",maybe you don't publish it at first?"); } if (serverUrl == null) { serverUrl = this.producerZooKeeper.selectBroker(topic, partition); } if (serverUrl == null) { throw new MetaClientException("There is no aviable server right now for topic " + topic + " and partition " + partition + ",maybe you don't publish it at first?"); } if (this.isInTransaction() && this.lastSentInfo.get() == null) { // 第一次发送,需要启动事务 this.beforeSendMessageFirstTime(serverUrl); } final int flag = MessageFlagUtils.getFlag(message); //Command拼装 final PutCommand putCommand = new PutCommand(topic, partition.getPartition(), encodedData, flag, CheckSum.crc32(encodedData), this.getTransactionId(), OpaqueGenerator.getNextOpaque()); //发送 final BooleanCommand resp = this.invokeToGroup(serverUrl, partition, putCommand, message, timeout, unit); //返回结果 return this.genSendResult(message, partition, serverUrl, resp); } catch (final TimeoutException e) { throw new MetaOpeartionTimeoutException("Send message timeout in " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " mills"); } catch (final InterruptedException e) { throw e; } catch (final MetaClientException e) { throw e; } catch (final Exception e) { throw new MetaClientException("send message failed", e); } }
相关推荐
整理后的Metaq原理应用文档,欢迎大家看看。
metamorphosis(metaq) 服务端1.4.3版本 包括客户端 发送一个序列化对象
metaQ向spark传数据
Metaq在JDk 7下的异常及解决方案,希望可以帮助学习者!
metaq-server-1.4.6.2服务端+客户端+javadoc文档,打包于一个压缩包
metaq-server-1.4.6.2.tar.gz
Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。
metaQ的安装包
metaq--1.4.6.2.zip 和原版一样就是换了个名字,方便大家一起学习.
MetaQ 分布式消息服务中间件.pdf
Memorphosis是一个消息中间件,它是linkedin开源MQ——kafka的Java版本,针对淘宝内部应用做了定制和优化。Metamorphosis的设计原则 • 消息都是持久的,保存在磁盘 • 吞吐量第一 • 消费状态保存在客户端 ...
Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...
rocketMQ
数据生产者,producer 的用法:《producer 的用法》、《producer 使用注意》 数据消费者,consumer 的用法:《consumer 的用法》 迓有些零碎的,关亍通信段的源码览读:《net 包源码览读》、《broker 配置》 扩展的...
分享一下 RocketMq的文档RocketMQ运维指令 rocketmq在阿里内部叫metaq
Metamorphosis, 一种高可用高性能的分布式 #新闻MetaQ 1.4.6.2 发布。更新日志MetaQ 1.4.6.1 发布。更新日志MetaQ 1.4.5.1 发布。更新日志MetaQ 1.4.5发布。更新日志meta: 一个用于的ruby 客户端。 源代码
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ...
RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,前身是Metaq,当 Metaq 3.0发布时,产品名称改为 RocketMQ。