小标
2019-02-11
来源 :
阅读 1525
评论 0
摘要:本文主要向大家介绍了【云计算】Spark的shuffle,RDD持久化,共享变量详细介绍,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
本文主要向大家介绍了【云计算】Spark的shuffle,RDD持久化,共享变量详细介绍,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
1.Spark的shuffle
Spark中的某些操作会触发一个称为洗牌的事件。shuffle是Spark重新分发数据的机制,以便在分区之间进行不同的分组。这通常涉及跨执行者和机器复制数据,使得洗牌成为一项复杂而昂贵的操作。
为了理解洗牌过程中会发生什么,我们可以考虑reduceby by key操作的例子。reducebykey操作生成一个新的RDD,其中单个键的所有值被组合成元组——该键和针对与该键相关联的所有值执行reduce函数的结果。问题是,并非单个键的所有值都必须位于同一分区,甚至同一台机器上,但它们必须位于同一位置才能计算结果。
在Spark中,数据通常不会分布在分区中,以处于特定操作所需的位置。在计算过程中,单个任务将在单个分区上运行——因此,为了组织要执行的单个reduceby by key reduce任务的所有数据,Spark需要执行一个全对全的操作。它必须从所有分区中读取,以找到所有键的所有值,然后将分区中的值汇集在一起,以计算每个键的最终结果——这称为洗牌。
虽然新混洗数据的每个分区中的元素集是确定性的,分区本身的排序也是确定性的,但是这些元素的排序不是确定性的。如果希望洗牌后的有序数据可以预测,那么可以使用:
mapPartitionsto sort each partition using, for example,.sorted repartitionAndSortWithinPartitionsto efficiently sort partitions while simultaneously repartitioning sortByto make a globally ordered RDD
可能导致洗牌的操作包括重新分区操作,如重新分区和合并,按键操作(计数除外),如按键分组和按键减少,以及合并操作,如按组和合并。
性能影响
洗牌是一项昂贵的操作,因为它涉及磁盘I / O、数据序列化和网络I / O。为了组织洗牌的数据,Spark会生成一组任务-映射任务来组织数据,以及一组精简任务来聚合数据。这个命名来自MapReduce,并不直接与Spark的地图和reduce操作相关。
在内部,单个地图任务的结果会一直保存在内存中,直到它们不适合。然后,根据目标分区对这些文件进行排序,并写入单个文件。在reduce方面,任务读取相关的排序块。
某些随机操作会消耗大量堆内存,因为它们在传输记录之前或之后使用内存中数据结构来组织记录。具体来说,reduce by key和aggregate by key在地图端创建这些结构,而“by key”操作在reduce端生成这些结构。当数据不适合内存时,Spark会将这些表溢出到磁盘,从而导致磁盘I / O额外开销和垃圾收集增加。
随机播放还会在磁盘上生成大量中间文件。从Spark 1.3开始,这些文件将被保存,直到相应的RDDs不再使用并且被垃圾收集。这样,如果重新计算谱系,就不需要重新创建随机文件。如果应用程序保留对这些RDDs的引用,或者如果GC不经常介入,垃圾收集可能会在很长一段时间后发生。这意味着长时间运行的Spark作业可能会消耗大量磁盘空间。配置Spark上下文时,临时存储目录由Spark . local . dir配置参数指定。
洗牌行为可以通过调整各种配置参数来调整。请参见火花配置指南中的“洗牌行为”部分。
2.RDD持久化
Spark最重要的功能之一是跨操作在内存中持久保存(或缓存)数据集。当您持久化RDD时,每个节点都会将它计算的任何分区存储在内存中,并在该数据集(或从该数据集派生的数据集)上的其他操作中重复使用这些分区。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。
您可以在RDD上使用persist ( )或cache ( )方法标记要持久化的RDD。第一次在动作中计算它时,它将保存在节点上的内存中。spark的缓存是容错的——如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。
此外,每个持久化的RDD可以使用不同的存储级别来存储,例如,允许您将数据集持久化到磁盘上,持久化到内存中,但作为序列化的Java对象(为了节省空间),在节点之间复制它。这些级别是通过传递StorageLevel对象( Scala、Java、Python )来持久化( )来设置的。cache ( )方法是使用默认存储级别(即storage level )的简写。仅限内存(将反序列化对象存储在内存中)。全套存储级别是:
MEMORY_ONLY //只在内存
MEMORY_AND_DISK
MEMORY_ONLY_SER //内存存储(串行化)
MEMORY_AND_DISK_SER
DISK_ONLY //硬盘
MEMORY_ONLY_2 //带有副本
MEMORY_AND_DISK_2 //快速容错。
OFF_HEAP
使用方式如下:
rdd2.persist(StorageLevel.DISK_ONLY)
println(rdd2.reduce(_ + _))
rdd2.unpersist();
println(rdd2.reduce(_ + _))
3.共享变量
通常,当传递给Spark操作(如map或reduce )的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,对远程机器上变量的更新不会传播回驱动程序。支持跨任务的一般读写共享变量将是低效的。然而,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
[广播变量]
广播变量允许程序员将只读变量缓存在每台机器上,而不是随任务一起发送一份副本。例如,它们可以用来以有效的方式给每个节点一个大输入数据集的副本。spark还试图使用有效的广播算法来分配广播变量,以降低通信成本。
Spark动作通过一组阶段来执行,这些阶段由分布式“洗牌-shuffle”操作分开。spark自动广播每个阶段任务所需的公共数据。以这种方式广播的数据以序列化形式缓存并在运行每个任务之前反序列化。这意味着显式创建广播变量仅在跨多个阶段的任务需要相同的数据或者以反序列化形式缓存数据很重要时有用。
广播变量是通过调用sparkcontext . broadcast ( v )从变量v创建的。广播变量是v的包装器,它的值可以通过调用value方法来访问。下面的代码显示了这一点:
//创建广播变量
val bc1 = sc.broadcast(Array(1,2,3))
bc1.value
[累加器]
累加器是仅通过关联和交换操作“添加”到的变量,因此可以有效地并行支持。它们可用于实现计数器(如MapReduce )或和。spark天生支持数字类型的累加器,程序员可以添加对新类型的支持。
作为用户,您可以创建命名或未命名的累加器。如下图所示,web UI中将显示一个命名累加器(在本例中为计数器),用于修改该累加器的阶段。spark在“任务”表中显示由任务修改的每个累加器的值。
使用方式:
val ac1 = sc.longaccumulator("ac1")
ac1.value
sc.parell..(1 to 10).map(_ * 2).map(e=>{ac1.add(1) ; e}).reduce(_+_)
ac1.value //10
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号