博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ生产者流程篇
阅读量:6568 次
发布时间:2019-06-24

本文共 21316 字,大约阅读时间需要 71 分钟。

系列文章

前言

生产者向消息队列里面写入消息,不同的业务场景会采用不同的写入策略,比如:同步发送,异步发送,延迟发送,事务消息等;本文首先从分析生产者发送消息的流程开始,然后再来介绍各种发送消息的策略。

生产者流程

1.流程概述

生产者首先需要设置namesrv,或者指定其他方式更新namesrv;然后从namesrv获取topic的路由信息,路由信息包括broker以及Message Queue等信息,同时将路由信息保存在本地内存中,方便下次使用;最后从Message Queue列表中选择合适的Queue发送消息,实现负载均衡;

2.启动过程

DefaultMQProducer实例化提供了两个参数分别是:生产者组名称以及RPCHook,RPCHook是一个接口,具体实现交由业务端实现,两个方法分别是:doBeforeRequest和doAfterResponse,表示在执行请求之前和接收返回之后分别执行相关逻辑;

接下来就是调用DefaultMQProducer的start方法,相关的初始化操作都在里面进行,内部其实调用的是DefaultMQProducerImpl的start方法,具体代码如下:

public void start(final boolean startFactory) throws MQClientException {        switch (this.serviceState) {            case CREATE_JUST:                this.serviceState = ServiceState.START_FAILED;                this.checkConfig();                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {                    this.defaultMQProducer.changeInstanceNameToPID();                }                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);                if (!registerOK) {                    this.serviceState = ServiceState.CREATE_JUST;                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                        null);                }                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());                if (startFactory) {                    mQClientFactory.start();                }                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),                    this.defaultMQProducer.isSendMessageWithVIPChannel());                this.serviceState = ServiceState.RUNNING;                break;            case RUNNING:            case START_FAILED:            case SHUTDOWN_ALREADY:                throw new MQClientException("The producer service state not OK, maybe started once, "                    + this.serviceState                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                    null);            default:                break;        }        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();    }

默认serviceState的状态为CREATE_JUST,刚进入设置为START_FAILED,初始化完成之后设置为RUNNING,再次初始化时会直接报错,下面看一下具体初始化了哪些信息;

2.1检查配置

这里的检查其实就是对producerGroup进行合法性校验;

2.2设置instanceName

如果producerGroup不等于默认的"CLIENT_INNER_PRODUCER",则设置DefaultMQProducer的instanceName为进程的pid;

2.3创建MQClientInstance对象

首先检查 ConcurrentMap<String/ clientId /, MQClientInstance> factoryTable中是否已经存在已clientId为key的MQClientInstance,如果存在则取出,不存在则实例化;clientId是已ip地址,instanceName以及unitName组成的,例如:10.13.83.7@12500

2.4注册producer

将DefaultMQProducerImpl注册到MQClientInstance中,已producerGroup作为key,注册到ConcurrentMap<String/ group /, MQProducerInner> producerTable中,如果已经存在此Group,则抛出异常;

2.5初始化TopicPublishInfo

已topic名称为"TBW102"为key,实例化TopicPublishInfo作为value,存放在ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中,TopicPublishInfo用来存放topic的路由信息;

2.6启动MQClientInstance

MQClientInstance启动会启动很多相关服务,具体可以看如下代码:

public void start() throws MQClientException {        synchronized (this) {            switch (this.serviceState) {                case CREATE_JUST:                    this.serviceState = ServiceState.START_FAILED;                    // If not specified,looking address from name server                    if (null == this.clientConfig.getNamesrvAddr()) {                        this.mQClientAPIImpl.fetchNameServerAddr();                    }                    // Start request-response channel                    this.mQClientAPIImpl.start();                    // Start various schedule tasks                    this.startScheduledTask();                    // Start pull service                    this.pullMessageService.start();                    // Start rebalance service                    this.rebalanceService.start();                    // Start push service                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                    log.info("the client factory [{}] start OK", this.clientId);                    this.serviceState = ServiceState.RUNNING;                    break;                case RUNNING:                    break;                case SHUTDOWN_ALREADY:                    break;                case START_FAILED:                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);                default:                    break;            }        }    }

默认serviceState的状态为CREATE_JUST,刚进入设置为START_FAILED,初始化完成之后设置为RUNNING,防止重复初始化;

2.6.1初始化NameServerAddr

首先判断DefaultMQProducer是否配置了NameServerAddr,如果没有配置会到一个地址下获取,地址默认为::8080/rocketmq/nsaddr,相关的逻辑在MixAll类中,代码如下:

public static String getWSAddr() {        String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);        String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");        String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;        if (wsDomainName.indexOf(":") > 0) {            wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;        }        return wsAddr;    }

正常情况下我们需要设置自己的地址,可以通过如下方式设置:

System.setProperty("rocketmq.namesrv.domain", "localhost");

这种情况下就可以不用手动设置NameServerAddr;

2.6.2初始化RemotingClient

RemotingClient是一个接口类,底层使用的通讯框架是Netty,提供了实现类NettyRemotingClient,RemotingClient在初始化的时候实例化Bootstrap,方便后续用来创建Channel;

2.6.3启动定时器任务

总共启动了5个定时器任务,分别是:定时更新NameServerAddr信息,定时更新topic的路由信息,定时清理下线的broker,定时持久化Consumer的Offset信息,定时调整线程池;

2.6.3启动服务

pullMessageService和rebalanceService被用在消费端的两个服务类,分别是:从broker拉取消息的服务和均衡消息队列服务,负责分配消费者可消费的消息队列;

3.发送消息

相关发送消息的代码在DefaultMQProducerImpl的sendDefaultImpl方法中,部分代码如下所示:

private SendResult sendDefaultImpl(        Message msg,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        this.makeSureStateOK();        Validators.checkMessage(msg, this.defaultMQProducer);        final long invokeID = random.nextLong();        long beginTimestampFirst = System.currentTimeMillis();        long beginTimestampPrev = beginTimestampFirst;        long endTimestamp = beginTimestampFirst;        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());        if (topicPublishInfo != null && topicPublishInfo.ok()) {            boolean callTimeout = false;            MessageQueue mq = null;            Exception exception = null;            SendResult sendResult = null;            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;            int times = 0;            String[] brokersSent = new String[timesTotal];            for (; times < timesTotal; times++) {                String lastBrokerName = null == mq ? null : mq.getBrokerName();                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);                if (mqSelected != null) {                    mq = mqSelected;                    brokersSent[times] = mq.getBrokerName();                    try {                        beginTimestampPrev = System.currentTimeMillis();                        long costTime = beginTimestampPrev - beginTimestampFirst;                        if (timeout < costTime) {                            callTimeout = true;                            break;                        }                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                        switch (communicationMode) {                            case ASYNC:                                return null;                            case ONEWAY:                                return null;                            case SYNC:                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                        continue;                                    }                                }                                return sendResult;                            default:                                break;                        }                        ...以下代码省略...

此方法的四个参数分别是:msg为要发送的消息,communicationMode要使用的发送方式包括同步、异步、单向,sendCallback在异步情况下的回调函数,timeout发送消息的超时时间;

3.1获取topic的路由信息

通过msg中设置的topic获取路由信息,具体代码如下:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);        if (null == topicPublishInfo || !topicPublishInfo.ok()) {            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);            topicPublishInfo = this.topicPublishInfoTable.get(topic);        }        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {            return topicPublishInfo;        } else {            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);            topicPublishInfo = this.topicPublishInfoTable.get(topic);            return topicPublishInfo;        }    }

首先从变量ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中获取是否存在指定topic的路由信息,如果获取不到则使用topic去nameServer获取路由信息,如果还是获取不到则使用默认的topic名称为"TBW102"去获取路由信息,需要使用默认名称去获取的情况是没有创建topic,需要broker自动创建topic的情况;获取路由信息使用的是MQClientInstance中的updateTopicRouteInfoFromNameServer方法,此方法根据topic获取路由信息,具体连接哪台nameServer,会从列表中顺序的选择nameServer,实现负载均衡;

注:名称为"TBW102"的topic是系统自动创建的;

3.2选择MessageQueue

成功获取到路由信息之后,需要从MessageQueue列表中选择一个,在这之前获取了默认发送消息失败的重试次数,默认为3次(只有发送模式是同步的情况下),代码如下:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        if (this.sendLatencyFaultEnable) {            try {                int index = tpInfo.getSendWhichQueue().getAndIncrement();                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                    if (pos < 0)                        pos = 0;                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                            return mq;                    }                }                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                if (writeQueueNums > 0) {                    final MessageQueue mq = tpInfo.selectOneMessageQueue();                    if (notBestBroker != null) {                        mq.setBrokerName(notBestBroker);                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                    }                    return mq;                } else {                    latencyFaultTolerance.remove(notBestBroker);                }            } catch (Exception e) {                log.error("Error occurred when selecting message queue", e);            }            return tpInfo.selectOneMessageQueue();        }        return tpInfo.selectOneMessageQueue(lastBrokerName);    }

以上代码在MQFaultStrategy,此类提供了选择MessageQueue的策略,相关策略可以看类变量:

private final LatencyFaultTolerance
latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

latencyFaultTolerance:延迟容错对象,维护brokers的延迟信息;

sendLatencyFaultEnable:延迟容错开关,默认不开启;
latencyMax:延迟级别数组;
notAvailableDuration :根据延迟级别,对应broker不可用的时长;

这样上面的这段代码就好理解了,首先判定是否开启开关,如果开启首先获取index,index初始为一个随机值,后面每次+1,这样在后面与MessageQueue长度取模的时候每个MessageQueue可以按顺序获取;获取

MessageQueue之后需要判定其对应的Broker是否可用,同时也需要和当前指定的brokerName进行匹配;如果没有获取到就选择一个延迟相对小的,pickOneAtLeast会做排序处理;如果都不行就直接获取一个MessageQueue,不管其他条件了;

3.3发送消息

在发送之前会做超时检测,如果前面两步已经超时了,则直接报超时,默认超时时间是3秒;部分代码如下:

private SendResult sendKernelImpl(final Message msg,                                      final MessageQueue mq,                                      final CommunicationMode communicationMode,                                      final SendCallback sendCallback,                                      final TopicPublishInfo topicPublishInfo,                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        long beginStartTime = System.currentTimeMillis();        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());        if (null == brokerAddr) {            tryToFindTopicPublishInfo(mq.getTopic());            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());        }        SendMessageContext context = null;        if (brokerAddr != null) {            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);            byte[] prevBody = msg.getBody();            try {                //for MessageBatch,ID has been set in the generating process                if (!(msg instanceof MessageBatch)) {                    MessageClientIDSetter.setUniqID(msg);                }                int sysFlag = 0;                boolean msgBodyCompressed = false;                if (this.tryToCompressMessage(msg)) {                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;                    msgBodyCompressed = true;                }                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;                }                if (hasCheckForbiddenHook()) {                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());                    checkForbiddenContext.setCommunicationMode(communicationMode);                    checkForbiddenContext.setBrokerAddr(brokerAddr);                    checkForbiddenContext.setMessage(msg);                    checkForbiddenContext.setMq(mq);                    checkForbiddenContext.setUnitMode(this.isUnitMode());                    this.executeCheckForbiddenHook(checkForbiddenContext);                }                if (this.hasSendMessageHook()) {                    context = new SendMessageContext();                    context.setProducer(this);                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());                    context.setCommunicationMode(communicationMode);                    context.setBornHost(this.defaultMQProducer.getClientIP());                    context.setBrokerAddr(brokerAddr);                    context.setMessage(msg);                    context.setMq(mq);                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                    if (isTrans != null && isTrans.equals("true")) {                        context.setMsgType(MessageType.Trans_Msg_Half);                    }                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {                        context.setMsgType(MessageType.Delay_Msg);                    }                    this.executeSendMessageHookBefore(context);                }                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());                requestHeader.setTopic(msg.getTopic());                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());                requestHeader.setQueueId(mq.getQueueId());                requestHeader.setSysFlag(sysFlag);                requestHeader.setBornTimestamp(System.currentTimeMillis());                requestHeader.setFlag(msg.getFlag());                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));                requestHeader.setReconsumeTimes(0);                requestHeader.setUnitMode(this.isUnitMode());                requestHeader.setBatch(msg instanceof MessageBatch);                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);                    if (reconsumeTimes != null) {                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);                    }                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);                    if (maxReconsumeTimes != null) {                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);                    }                }                SendResult sendResult = null;                switch (communicationMode) {                    case ASYNC:                        Message tmpMessage = msg;                        if (msgBodyCompressed) {                            //If msg body was compressed, msgbody should be reset using prevBody.                            //Clone new message using commpressed message body and recover origin massage.                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66                            tmpMessage = MessageAccessor.cloneMessage(msg);                            msg.setBody(prevBody);                        }                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeAsync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                            brokerAddr,                            mq.getBrokerName(),                            tmpMessage,                            requestHeader,                            timeout - costTimeAsync,                            communicationMode,                            sendCallback,                            topicPublishInfo,                            this.mQClientFactory,                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),                            context,                            this);                        break;                    case ONEWAY:                    case SYNC:                        long costTimeSync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeSync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                            brokerAddr,                            mq.getBrokerName(),                            msg,                            requestHeader,                            timeout - costTimeSync,                            communicationMode,                            context,                            this);                        break;                    default:                        assert false;                        break;                }                if (this.hasSendMessageHook()) {                    context.setSendResult(sendResult);                    this.executeSendMessageHookAfter(context);                }                return sendResult;

此处的6个参数分别是:msg消息本身,mq要发送到的队列,communicationMode发送策略,sendCallback异步回调函数,topicPublishInfo路由信息,timeout发送超时时间(3秒-前2步消耗的时间);

首先需要获取指定broker的地址,这要才能创建channel与broker连接;然后就是一些hock处理;接下来就是准备发送的消息头SendMessageRequestHeader,最后根据不同的发送策略执行发送消息,此处就不在进入更加深入的分析;

总结

本文重点介绍了发送者的启动,以及发送消息的大概流程;关于消息的发送策略,以及消息的类型包括:顺序消息,事务消息等,将在后面的文章介绍。

转载地址:http://cyvjo.baihongyu.com/

你可能感兴趣的文章
《利用python进行数据分析》读书笔记--第八章 绘图和可视化
查看>>
栈的操作
查看>>
Flask 备注一(单元测试,Debugger, Logger)
查看>>
ElasticSearch(八):springboot集成ElasticSearch集群并使用
查看>>
Java基础学习_01 概述及环境配置
查看>>
20165239其米仁增3
查看>>
[Usaco2005 Open]Disease Manangement 疾病管理 BZOJ1688
查看>>
P2657 [SCOI2009]windy数 数位dp入门
查看>>
Elasticsearch 运维实战之1 -- 集群规划
查看>>
jetty安装、配置、优化
查看>>
Android-环境问题
查看>>
Android- assent和raw的区别
查看>>
Vue-系统修饰键
查看>>
1264: [AHOI2006]基因匹配Match
查看>>
Java 重写(Override)与重载(Overload)
查看>>
Javascript调试技巧整理
查看>>
Python学习笔记 - PostgreSQL的使用
查看>>
Linux常用命令
查看>>
turtle练习
查看>>
Oracle Golden Gate 系列 小结
查看>>