【云计算】Exactly-once语义实现设计文档分享
小标 2019-01-07 来源 : 阅读 932 评论 0

摘要:本文主要向大家介绍了【云计算】Exactly-once语义实现设计文档分享,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】Exactly-once语义实现设计文档分享,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。


kafka 版本 0.8.x


spark 版本 1.3.x


文章链接地址:


Spark ISSUE 链接地址:


翻译原因: 0.8 的 kafka 版本中, 所有 topic partition 的 offset 消费记录集中保存在 zookeeper 上,


造成了 spark-streaming driver 数据传输记录和 kafka 数据传输记录分开存储在两个地方,对 exactly-once 的实现带来了很大难度,这篇设计文档就是在这个背景下提出来的解决方法,其中的方法很值得学习与借鉴,另外作者在社区发完文档之后自己就就这当前的环境把代码实现了一遍,所以就这这篇文档 + 源码一起学习能加深对这里问题的理解分析和解决。


------


Exactly-once + WAL-free Kafka Support in Spark Streaming


Kafka 中 Exactly-once + WAL(write ahead log) 在 Spark Streaming 中的实现


Problem

问题描述

Currently the best way to use Kafka with Spark Streaming is to use the exsiting KafkaUtils, and enable Write Ahead Logs in Spark Streaming.


目前解决 Kafka Spark Streaming 二者之间数据传输的最佳方法均被封装在 KafkaUtils 类中对外提供 API,且 Spark Streaming 中支持数据以WAL 的方式写入。


There are two ways by which this Kafka integration of Spark Streaming can be improved -exactly-once semantics and not using the write ahread log in Spark Streaming.


但 Kafka 和 Spark Streaming 二者数据传输而言仍有两个地方待改进:消息只传输一次(exaclty once)语义的实现方式和 在 Spark Streaming 中放弃数据通过WAL 的方式写入。


This design doc tries to solve both issues by implementing a new integration .


本篇设计文档试图通过整合出一种新的用于 Spark Streaming 和 Kafka之间数据传输的模块。


Eaxctly-once Semantics

Exactly-once 语义介绍

Kafka Stream with the WAL enabled can avoid data loss and provide zero data loss ofall data records.


Kafka 在处理数据流时借助WAL 的数据写入方式来防止数据丢失,


However, it cannot still provide exactly-once semantics.


然而即便如此,Kafka 却无法借助 WAL 来实现 exactly-once 的语义


The fundamental reason any "transfer" of records cannot be acknowledged by both Spark Streaming and Kafka in a signle transactional update.


exactly-once 语义很难实现本质上是因为构成数据流的 record 在数据传输过程中的进度记录对于数据收发双方Spark Streaming 和 Kafka 而言无法通过单独事务更新(这就造成了数据收发双方记录数据传输进度的不一致)


When the driver / Kafka receiver fails, it may so happen that some data records have been saved in Spark Streaming(written to WAL, etc.) but the Kafka offsets have not been updated accrodingly due to the failure.


例如(开始介绍上面所说的不一致)当 Spark Streaming driver 或是 Kafka 作为数据接收者挂掉时会有这样一种情况:就是数据流的 record 在 Spark Streaming 被接收的已经写到WAL 中,但是 Kafka 这边的 offset 却因为进程挂掉没有及时的更新(没落盘持久化)


In that case, Kafka will send the data again to the recovered Spark Streaming process.


接下来, Kafka 会将上次已经发送但没记录 offset 的数据再次发送给恢复重启的 Spark Streaming 进程中。


And the key reason why this can happen is that the metadata information about the what has happen successfully received is stored in two places -- WAL in Spark Streaming and Zookeeper offsets in Kafka.


造成上述情况发生的关键因素便是记录发生了什么的 metadata 元数据被接收和记录(持久化)在两个不同的地方 -- Spark Streaming 的 WAL 文件中 和 Kafka 的 Zookeeper 的 offset 中(也会持久化到 zk 的元数据文件中)


These two places cannot be transactionally updated, leanding to inconsistencies and therefore duplicates.


而这两个地方文件中的信息无法通过事务的方式来同步更新,更新的不同步便造成了数据上的不一致,最终导致了相同的数据 record 因记录的消费进度不一致而被重复发送,这就破坏了 exactly-once 语义。


The logical way to resolve this is to save the necessary metadata in only one place. Furthermore, for failure recovery, it needs to be, in a way such that the batches can be exactly once.


从上述 metadata 分隔两地的问题符合逻辑的解决方法便是将 metadata 指定一个地点进行存放。 进一步说,对于失败之后数据和任务的恢复而言,也得这么做(metadata 存放在一个地方)只有这么做才能保证批处理的数据被执行且仅被执行一次。


In order to ensure that the Spark Streaming receives every record exactly-once,


为了实现 exactly once 语义我需要做的是保证 Spark Streaming 在接收数据时就仅接收一次才行。


No use of WAL

废弃使用 WAL 的方式来实现 Exactly-once 语义的思路介绍

Using the WAL for Kafka is not ideal because the data is getting replicated twice -- once for Kafka , and once for again for Spark Streaming WAL .


上述提到的 WAL 是在 Spark Streaming 中实现的,所以在 Kafka 中使用 WAL 来保证 exactly once 语义显然是不行的,因为这样的话还是会导致 metadata 的重复存储,一份在 Kafka 的 WAL 日志中,另一份在 Spark Streaming 的 WAL 日志中。


In fact Kafka iteself is a replicated log, we should ideally use Kafka to recover all the data in case of failures.


事实上 Kafka 从其本身的设计而言它实现了日志的冗余存放,所以当计算出错时Kafka 是理想的用于数据恢复的中间件。


Requirements

需求(在分析了问题及问题出现原因之后,接下来对要做些什么,达到什么效果进行了详细描述)

(需求一)Allow streaming applications to be created when every record from Kafka is effectively received exactly-once .

让每个流计算应用 app 其中和重启时每个Kafka 数据流中的 record 被 app 有效接收一次。

Note the effectively. Each record may be read from Kafka multiple times but from the point of view of the computation, the record is received exactly once.

注于上面提到的一词‘有效地’,指的是 Kafka 中的每条记录/record都是被多次读取的,但站在计算的角度我们只要保证每个 record 下游仅接收到一次(即不会出现数据重复计算,保证数据结果正确)即可。

Example: Suppose the series of records is 1,2,3 ... , 10, and the streaming application has to add them up. The expected answer is 55 despite any failures.The system may read any of the records from Kafka multiple times ( to recover from failures), but the final answer still should be 55 as it they have been effectively read exactly once.

比如说:设想数据流中的 records 从 1 - 10 10个数字,实时计算算子想要把这些数据依次相加得到计算和。理想情况下是无论发生什么错误,计算结果都能得到 55 这个数值。在计算过程中为了恢复计算中途有可能出现的错误允许系统运行从 Kafka 中读取任意多次,任意多个数据记录 record,只要保证计算任务恢复之后得到的计算结果仍旧是 55 这个数值,这种仍叫做数据的有效只传输一次(exactly once)

Also note that we are not ensuring the exactly-once output of data(using output operations like foreachRDD) to external storagesystems. This problem is specific to the application requirements, the type of update to the storage system (idempotent or not) and the abilities of the storage system ( transaction suport or not). Ensuring out-of-the-box end-to-end exactly-once semantics(i.e. including data store output operation) is not in the scope of this design.

仍要注意的是,我们虽然能够保证数据传输和计算过程中数据的 ‘exactly-once’ 的语义,但我们无法保证数据从内存写入到外存的时候数据顺序仍然是有序的,例如像是使用 foreachRDD 这种操作写文件这种。 这种内存到外存的情况需要分析计算算子具体的环境及其业务场景而定,以及存储系统是否支持幂等操作,以及是否具备支持事务操作的能力。 确保落盘的时候端到端,exactly-once 语义(包括数据落盘的方式)不在本设计文档讨论和实现范围内。

(需求二) Recover from driver failures without using Write Ahead Logs to save the data .

当 Spark Streaming 的 driver 计算失败重启的时候不会加载之前写入的 WAL 文件来恢复其数据

To recover from driver failures, the lastest batches must be exactly reconstructed after recovery. Those batches must have the exact same data in the same order as it was before failure.

为了从上次 driver 计算的失败中恢复过来,最后的批处理计算的数据必须恢复到上次计算的状态。这些批处理计算中必须保证数据以及数据的顺序和当时出错时是完全一致的。

Proposed Solution

提出的解决方法

The basic idea is to treat Kafka like a durable filesystem source, rather than an ephemeral network source .


最根本的解决方法就是将 Kafka 当作持久的文件系统数据源,而并非是提供短暂服务的网络数据源。(即,将 Kafka = HDFS )


After each batch interval, the input DStream generates a RDD which is defined by the range of Kafka offsets(i.e. offsets for each Kafka topic + partition) that define the set of records in that batch .


在批处理的中的每次间隔,数据流从 DStream 转换成 RDD ,而这个 RDD 则是由 Kafka 中的 offset 所构成的,(offset 就是 Kafka 中的 topic + partition) 而 RDD 中的 offset 则是用来定义了每个批处理中处理的数据记录 record 集合。


Each partition in the RDD is defined by a Kafka topic+partition and a range of offsets.


RDD 中的每一个分区(partition)都是由 Kafka 中的 topic + partition 和 一个范围内的 offset 所构成的。


So there is a one-to-one correspondence between RDD partitions and Kafka topic + partitions.


所以,在这里不就是刚好的 RDD 中的分区和 Kafka 中的 topic + 分区的一一对应的衔接点吗?


When the RDD is executed, for each partition, a Kafkaconsumers is created, that reads the Kafka data corresponding to the offset range.


当RDD 在计算时,会逐一地遍历 partition 进行计算,每遍历到一个 partition 时便会创建一个对应的 kafka consumer,在计算该 partition 的时候所创建的 kafka consumer 被用来从特定的 topic,对应范围的 offset 中读取数据(该数据参与该partition 的计算)。


Failure Recovery

失败恢复

These offset ranges are also saved as part of DStream checkpoints.


这些 offset 范围在计算失败时会被一并记录到 DStream 的检查点记录文件中


This allows the RDDs containing Kafka data to be recreated( from the checkpoint offsets) after the driver recovers form failures, and the regenerated RDDs will have the same data as before.


这就让包含者 Kafka 中数据的 RDD 在 Spark Streaming driver 得以根据检查点记录文件中的 offsets 数值得以重新被构建, 并且重建的 RDD 有着和 driver 失败之前同样的数据(及数据顺序)


And since all the transformed RDD generated from these RDDs are deterministic, the final transformed RDD will be deterministically generated and have the same value as it would have been without any failure.


并且,由于最后一步中的 RDD 所转换而来的 RDD 中的数值都是确定的,最后通过转换得到的 RDD 也必定是确定的,所以这就保证了恢复的 RDD 中的数值和原先的 RDD 是完全一样的。


Performance

性能

Compared to the exsiting receiver based Kafka stream, this method can have higher throughput as records from different kafka topic + partitions are pulled through the cluster.


对比现有的 Spark Streaming 中处理 Kafka 数据流的 receiver 而言,这种解决方法在处理 Kafka 中不同的 topic + partition 的吞吐量会更高,因为数据是由 cluster 通过 pull 的方式获取的。


However , there can also be some loss in throughput as every partition needs to create a consumer and connect to Kafka.


然而仍有拉低吞吐率的地方:Spark Streaming 中的 RDD 是按照每个 partition 都会创建其自己的 consumer 并与 kafka 创建连接,连接过多会一定造成性能的损耗。


This may be an acceptable tradeoff for achieving exactly-once and no WAL usage .


即便如此(性能开销上有些损耗),但可以实现 exactly-once 语义并且不用 Spark-Streaming 的 WAL 这点性能损耗也是可以忍受的。


Other Advantages

其它优点

Few other advantages in the long term/ 从 Kafka 发展长远角度来看体现的优势


No receiver and no data stored in memory, so easy to dynamically scale resources.

这种处理方法会让数据接收者接收到的中间数据和传输数据及时落盘而不会常驻内存,使得动态扩充资源成为了可能。

Natural flow control/back pressure. With receivers, if data comes in faster than the system can process, the data keeps buffering in memory an can finally cause OOMs. With this method, data is pull only when the job using the data is executed. So no chance of OOMing because of temporary glitches in the flow.x .

这种处理方法提供了一种自然优雅的控流缓解压力的方法。 对于现有的 receiver 而言是被动的接收数据,如果数据到来的速率高于系统能处理的速率,系统便会将无法顾及的数据存放到缓冲区内存中,数据不断增多最终会因耗光内存缓冲区而引发 OOM 。 如果使用了这种方法通过主动的方式来从数据源拉取数据,不会对系统及内存缓冲区造成过大的负担,也不会引起 OOM 的发生

Other Concerns

其它的一些考虑

Not using the Receiver API - This prevents this design from taking advantage of existing infrastructure of rate limiting, record counting, etc. This isn't such a problem in practivce, the existing PR uses the value of spark.streaming.receiver.maxRate to implement a per-partition maximum batch size for rate limiting.

并没有复用现有的 Receiver API, 而是提出了解决方法的新思路,这么做无法复用现有设计架构中限流,record 计数这些方法。 但是在实际使用中并不构成问题,我们可以使用配置信息中的 spark.streaming.receiver.maxRage 的这个参数来作为限制每个 RDD 中的 parition 每次最多处理的 batch 的大小以达到局部限流的作用。

Not updating the Kafka's zookeeper - Since the zookeeper is not used to store the offset, any monitoring tools that monitor Kafka by checking the offsets in zookeeper will not work . However, this can be fixed by the application developer committing offsets maintained in Spark Streaming to Zookeeper.

这种解决方法下,kafka 的消费进度 offset 数值将不会被写入到 zookeeper 中进行存放,任何通过监控 zookeeper 中记录的 offset 数值来监控 kafka 上数据消费进度的监控工具都不能用了。然而,这个问题可以通过在 spark streaming 计算算子中由开发者自行实现将当前条的 offset 数据写入到 Zookeeper 中来实现。

Multiple transformation on the input stream will unnecessarily pull the data multiple times from Kafka. This is not good. Word around is to call persist() on the input stream . Maybe that can be enabled autocatically for this new DStream.

这种设计方案下会导致在计算过程中,RDD 多次转换过程中,每一次转换都会不必要的从 Kafka 上游拉去一次数据(而不是从上次的 RDD 缓冲中获取)。这样做很不好,不过看了一圈唯一能起到作用就是在从上游 kafka 读取数据的之后,通过调用 persist() 把数据缓冲到磁盘上,这样每次就是从磁盘上读取而不是通过连接借助 kafka consumer 来读取数据。 或许在新的 DStream 中可以自动支持这种调用 persist() 的方法而不是人为显示的调用来实现会好些。

Programming Interfaces

编程 API 描述

Here are the public API that we expose to the application developer.


接下来我们介绍基于本篇设计文档中的功能将会给开发者开放出去的 APi。


The existing kafka functionality is expressed through the object org.apache.spark.streaming.kafka.KafkaUtils.


Spark Streaming 中现有的处理 kafka 的方法都封装在 KafkaUtils 中对外提供 API 访问接口。


We can add a new set of methods in the same class. Here are the proposed methods.


我们也可以在相同类中加入一系列新方法,新加入的方法描述如下


1. createExactlyOnceStream(context, kafkaParams, topicSet):
DStream[String, String]
@param context: StreamingContext
@param kafkaParams: Map[Stirng,Stirng] of Kafka key value properties
@param topicSet: a set of Kafka topics to consume (all partitions will be consumed)
@return DStream[Stirng,Stirng]


2. createExactlyOnceStream[K,V, KD<:Decoder[_], VD<: Decoder[_], R](context, kafkaParams, topicPartitionOffsetMap, messagehandler):DStream[R]
@param context: StreamingContext
@param kafkaParams: Map[Stirng,String] of Kafka key-value properties
@param topicPartitionOffsetMap- Map[TopicAndPartition, Long] of per-topic per-partition offset to start from
@param messagehandler : MessageAndMetadata[K,V]=> R intercepting function
@return DStream[R]


Other points to consier


需要考虑的其它一些因素


1. Starting point - Whether to strart from the earliest or latest Kafka offset should be configurable in the same as it was before -- through kafka params. In our case, we have to write explicit code to read it and accordingly determine our own starting offsets.


1. 对于开始消费数据的起点,无论是从 kafka offset 最开始或是最新的数据开始消费这些都必须写道配置文件--并通过创建kafkaconsumer 实例时所使用的参数来控制。 在我们的这种应用场景下,我们定制化的实现了每次读取数据是从 kafka offset 最开始进行读取。


2. Along with methods for creating streams, we also want to expose utility methods to create RDDs for reading messages from Kafka


2. 和创建流数据 API 一并,我们也希望能暴露用于从 kafka 读取数据构建 RDD 的 API 出来。




createRDD[K,V, KD<: Decoder[_], VD<:Decoder[_], R] ( context, kafkaParams, batch, messageHandler)
@param context: SparkContext
@param kafkaParams: Map[String, String] of Kafka key-value properties
@param batch: Array[OffsetRange] see below
@param messagehandler: MessageAndMetadata[K,V] => R intercepting function


To expose kafka offsets without exposing too much implementation detail, one possibility is a public interface, OffsetRange.


为了将 kafka offset 暴露出来的同时又不暴露过多的实现细节,可通过创建公共接口 OffsetRange 的方法来解决。


This has basically the same accessors as KafkaRDDPartition, for defining the range of offsets for a given topic/partition, but does not actually extend rdd Partition.


这个 OffsetRange 有着与 KafkaRDDPartition 相同的父类,目的是为了给指定了 topic 和 partition 的kafka 数据源设定 offset 的访问范围,但是 OffsetRange 却并未继承 rdd 的 Partition 这个父类。


There's public static constructor of a trivial implementation of that interface so that people can use the createRDD method .


又一个提供了静态构造方法的接口,通过实现该接口开发者便可调用其中的 createRDD 这个方法了。


Another interface, HasOffsetRanges, defines objects that have an accessor for a collection of offset ranges.


另一个接口, HasOffsetRanges, 这个接口中提供了一个用来通过 collect 方式获取 offsetranges 的父类方法


KafkaRDD implements that interfaces. 而 KafkaRDD 则实现了该接口。


That way, people that care about offsets should be able to cast an RDD that happens to be KafkaRDD to something they can get offset from .


通过这种方法,关心 offset 数值的使用者可以将刚好是 KafkaRDD 实例的 RDD 转换成某种他们能获取 offset 数值的 RDD 类型。


Implementations Details

实现细节

There are 4 classes invoked: KafkaRDDPartition, KafkaRDD, DeterministicKafkaInputDStream(this needs a better name, perhaps ExplicitkafkaInputDStream), and KafkaCluster. Note that all these classes are private to Spark .


针对本篇设计文档,我们主要提供了 4 个类他们分别是 KafkaRDDPartition, KafkaRDD,DeterministircKafkaInputDStream(显然这个类需要一个更贴切的类名,例如 ExlicitkafkaInputDStream)以及 KafkaCluster。 在这里需要注意的是这些 class 对于 Spark 而言都是私有类型


org.apache.spark.streaming.kafka.KafkaRDD

Constructor takes kafka configuration parameters , an array of KafkaRDDPartition, and a function to convert Kafka MessageAndMetadata objects to the desired item type.


KafkaRDD 中的构造器用来接收配置参数,该配置参数又 KafkaRDDPartition 类型的数组构成,和传入对应的将 KafkaMessageAndMetadata 转换成 item 类型的方法参数构成


The compute implementation for each partition uses a Kafka simple consumer to request only the message in the specified range of offsets.


对于每个 partition 中所执行的计算便是通过 kafka consumer 读取offset 在特定范围内的消息。


Worker or Kafka leader failure is handled by normal spark task retires. On the first attempt, the specified preferred host will be used on future attempts, the leader is looked up .


计算中的 Worker 和 Kafka leader 中的失败会被通过普通的 spark task失败重试来处理。 在第一次失败重新计算尝试中, 会倾向选取特定的 host 用作重试计算的执行机器,并且在后续的多次尝试中都是通过这种方式来获取 host 的,而 Kafka 的 leader 则是通过查找来获取的。


org.apache.spark.streaming.kafka.KafkaRDDPartition

org.apache.spark.streaming.kafka.DeterministicKafkaInputDStream

org.apache.spark.streaming.kafka.KafkaCluster




Alternate Design (not proposed)


          

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程