注:kafka版本为0.10.1.0 本文大致梳理PRoducer的发送过程,若有不对请指出。 我们在使用producer客户端,基本使用方法如下:
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);producer.send(record, cb);基本类说明:
KafkaProducer:kafka生产者客户端,用于发送消息 RecordAccumulator:用于缓存要发送的消息,做Batch NetworkClient:底层通信客户端 Sender:用于发送RecordAccumulator中消息的线程
下面从创建的 KafkaProducer
类开始,其构造函数流程如下:
具体代码如下:
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { try { log.trace("Starting the Kafka producer"); Map<String, Object> userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = new SystemTime(); // ... // 省略一些基本的初始化工作 // ... // records 缓存器 this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, metrics, time); // 更新元数据 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); // 创建客户端 ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.requestTimeoutMs, time); // 创建发送线程,daemon形式 // MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1时,即 guaranteeMessageOrder this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs); String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); this.errors = this.metrics.sensor("errors"); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 close(0, TimeUnit.MILLISECONDS, true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); } }先从Sender看起,Sender声明了Runnable接口,用于线程,因此,下面是Sender的run方法:
用running
作标志,在其为 true 的情况下,执行线程工作; 当被置为 false 时,结束循环,进行线程关闭工作; 若不是强制关闭,则先将缓存器中未发送的消息发出再关闭; 若为强制关闭,则直接放弃发送未发送的数据。 具体代码如下:
public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // 调用关闭线程后,执行关闭操作 log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. // 如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求 while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { try { // 调用调用待参数的run()方法继续处理 run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // 如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求 if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. this.accumulator.abortIncompleteBatches(); } // 关闭客户端 try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); }细看带参数的run方法:
先从原数据中获取集群信息 获取 其batch处于发送就绪的 节点列表(详见下面RecordAccumulator
类ready
函数) 检查若有topic的leader未知,则强制更新一次Metadata
遍历第二步获取的节点集,移除连接未就绪的节点(详见下面RecordAccumulator
类ready
函数) 遍历RecordAccumulator
中已有的Batch。基于第四步筛选后的节点,对每个节点,再遍历该节点下每个作为leader
的topic partition
,取其队列第一个Batch,若符合要求则拿出来。最终遍历结果为,返回每个节点对应的RecordBatch
集合(详见下面RecordAccumulator
类drain
函数) 若配置中要求保证消息发送顺序(即:MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1),则将上一步中所拿取的所有topic partition
加入到RecordAccumulator
的muted
集合中,作用为下次调用RecordAccumulator
类drain
函数时,不会从这些topic partition
拿取Batch,以达到发往该topic partition
的只有一个request处于未确认状态,即保证到达每个topic partition
的消息是有序的(详见下面RecordAccumulator
类drain
函数) 遍历已经超过请求时间的Batch 更新metrics 通过第五步中的数据,封装成针对每个节点的ClientRequest
通过NetworkClient
发送请求包 具体代码如下:
void run(long now) { // 获取集群信息 Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send // 获取发送就绪 batch 的节点列表 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // 强制更新 metadata // if there are any partitions whose leaders are not known yet, force metadata update if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // 移除未就绪的节点 // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests // 创建 producer 请求,遍历就绪节点上的每个partition的第一个batch // Integer - broker id // batches - 要发送的batch Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); // MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1时 // 把要发送的 batches 加入到 RecordAccumulator 的 muted 中 if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } // 遍历已经超过请求时间的batch,并更新metrics List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); // 对每个节点,创建要发送的请求包 List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now); }ready
函数:函数作用为:返回处于就绪状态的节点列表,以及未知leader的topic集
一个Broker处于就绪状态 ready
的条件为: 1. 其节点下至少包含一个不处于重发策略(back off)的属于leader的 topic partititon
; 2. 且 这些topic partititon
没有被放入muted,即没有正在发送该topic partititon
中的request,防止乱序 3. 且满足以下条件中的一个即可: Batch已满 或 Batch已经在缓冲池待了至少lingerMs
时间 或 缓冲池已满和有将写入的数据被阻塞 或 缓冲池被关闭
具体代码如下:
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>(); // 缓冲池是否耗尽 boolean exhausted = this.free.queued() > 0; // 遍历所有的 TopicPartition 队列 for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); // 获取该 TopicPartition 的leader broker Node leader = cluster.leaderFor(part); synchronized (deque) { // 若leader未知,则在结果中返回,后续sender会据此更新 Metadata if (leader == null && !deque.isEmpty()) { // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); // 注意:这里排除了已经放在 muted 中的 TopicPartition, 是为了保证消息顺序(即:MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1),也就是若TopicPartition存在正处于发送中的request(未确认),则不属于就绪状态 } else if (!readyNodes.contains(leader) && !muted.contains(part)) { // 获取第一个Batch,不是移除 RecordBatch batch = deque.peekFirst(); if (batch != null) { // Batch 是否处于重发策略中,即还在等待下次重发 boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; // 具体上次重发,已经等待的时间 long waitedTimeMs = nowMs - batch.lastAttemptMs; // 需要等待的时间: // 若处于重发策略,等于 配置中的 retry.backoff.ms 时间,默认 100ms // 否则,等于 配置中的 linger.ms 时间,默认 0 ms long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; // 还需要等待的时间 long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); // Batch 是否已满 boolean full = deque.size() > 1 || batch.records.isFull(); // 是否过时,也就是已经等待时间超过了需要等的时间 boolean expired = waitedTimeMs >= timeToWaitMs; // 所以最后能够发送的条件为: // Batch已满 // 或 已经过时 // 或 缓冲池耗尽 // 或 缓冲池被关闭 // 或 正在要求刷新缓冲池 boolean sendable = full || expired || exhausted || closed || flushInProgress(); // 能够发送 且 不处于重发策略 则为就绪节点 if (sendable && !backingOff) { readyNodes.add(leader); } else { // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }drain
函数:函数作用:对给定的节点,从缓冲器中抽取节点下作为leader的topic partition
的Batch集合
topic partition
(不包括处于muted中的) 获取该缓冲器中topic partition
的Batch队列 查看第一个Batch,若不为空且不处于重发策略,且不超过request最大大小,则抽出这个Batch准备发送。(特例:若只有一个Batch且超过最大request,也会抽取出去) 具体代码如下:
public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer, List<RecordBatch>> batches = new HashMap<>(); // 遍历每个节点 for (Node node : nodes) { int size = 0; // 该节点下所有作为leader的 partitions List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); // 该节点下就绪的 batch List<RecordBatch> ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ int start = drainIndex = drainIndex % parts.size(); // 遍历所有 partition do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); // Only proceed if the partition has no in-flight batches. // 只处理不在发送过程中的 topic partition if (!muted.contains(tp)) { // 找到缓冲池中该 topic partition 的 队列 Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null) { synchronized (deque) { // 获取队列第一个 Batch RecordBatch first = deque.peekFirst(); if (first != null) { boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; // Only drain the batch if it is not during backoff period. if (!backoff) { // 注意,如果加上这个Batch的超过了request最大大小限制,但是该节点下还没有就绪数据,还是会将这个Batch发送出去 // 即若有一个batch大于最大大小,就会将这一个Batch发送出去 if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size due // to compression; in this case we will still eventually send this batch in a single // request break; } else { RecordBatch batch = deque.pollFirst(); batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } } } } } this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); batches.put(node.id(), ready); } return batches; }新闻热点
疑难解答