【云计算】scala-zipWithIndex、zipWithUniqueId函数用法解析
小标 2018-12-24 来源 : 阅读 2393 评论 0

摘要:本文主要向大家介绍了【云计算】scala-zipWithIndex、zipWithUniqueId函数用法解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】scala-zipWithIndex、zipWithUniqueId函数用法解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。


1. 是什么


顾名思义,zipWithIndex:通过主键打包,ZipWithUniqueId:通过唯一主键打包。二者的主要作用


1. def zipWithIndex(): RDD[(T, Long)]


该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。


2. def zipWithUniqueId(): RDD[(T, Long)]


该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:

每个分区中第一个元素的唯一ID值为:该分区索引号,

每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)

该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。


2. 怎么用


// 1. zipWithIndex
scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21
scala> rdd2.zipWithIndex().collect
res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

// 2. zipWithUniqueId
scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
//rdd1有两个分区,
scala> rdd1.zipWithUniqueId().collect
res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//总分区数为2
//第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5


<h3 id="3-深入源码">3. 深入源码

1.zipWithIndex


  /**
   * Zips this RDD with its element indices. The ordering is first based on the partition index
   * and then the ordering of items within each partition. So the first item in the first
   * partition gets index 0, and the last item in the last partition receives the largest index.
   *
   * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
   * This method needs to trigger a spark job when this RDD contains more than one partitions.
   *
   * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
   * elements in a partition. The index assigned to each element is therefore not guaranteed,
   * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
   * the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
   */
  def zipWithIndex(): RDD[(T, Long)] = withScope {
    new ZippedWithIndexRDD(this)
  }

正如文档所注释,The ordering is first based on the partition index,the last item in the last partition receives the largest index,ID号跟着分区走。方法new了ZippedWithIndexRDD对象,继续点击


/**
 * Represents an RDD zipped with its element indices. The ordering is first based on the partition
 * index and then the ordering of items within each partition. So the first item in the first
 * partition gets index 0, and the last item in the last partition receives the largest index.
 *
 * @param prev parent RDD
 * @tparam T parent RDD item type
 */
private[spark]
class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {

  /** The start index of each partition. */
  @transient private val startIndices: Array[Long] = {
    val n = prev.partitions.length
    if (n == 0) {
      Array.empty
    } else if (n == 1) {
      Array(0L)
    } else {
      prev.context.runJob(
        prev,
        Utils.getIteratorSize _,
        0 until n - 1 // do not need to count the last partition
      ).scanLeft(0L)(_ + _)
    }
  }

  override def getPartitions: Array[Partition] = {
    firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
  }

  override def getPreferredLocations(split: Partition): Seq[String] =
    firstParent[T].preferredLocations(split.asInstanceOf[ZippedWithIndexRDDPartition].prev)

  override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
    val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
    val parentIter = firstParent[T].iterator(split.prev, context)
    Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
  }
}

哦,果然是根据分区去Index


2.zipWithUniqueId


  /**
   * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
   * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
   * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
   *
   * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
   * elements in a partition. The unique ID assigned to each element is therefore not guaranteed,
   * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
   * the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
   */
  def zipWithUniqueId(): RDD[(T, Long)] = withScope {
    val n = this.partitions.length.toLong
    this.mapPartitionsWithIndex { case (k, iter) =>
      Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
        (item, i * n + k)
      }
    }
  }

源码doc注释中已经定义好 index的规则:will get ids k, n+k, 2*n+k, …, where n is the number of partitions.

更加值得注意一句话是:won’t trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]],不会触发spark job任务。再回去看下ZipWithIndex的源码:

prev.context.runJob, 哦,原来这个方法还启动了 spark job任务,我只想拍个序给个ID,非要起个任务?


4. 总结


2个方法都有对RDD中的元素进行ID标号的功能,但是有以下区别:


前者依赖分区,可能会造成ID相同的情况。而后者根据算法“k, n+k, 2*n+k”生成Long类型的ID,所以一定不会重复,这也是他被命名为UniqueId的原因吧 后者效率更高,因为前者会启动runJob的任务 2者的共性,在Doc上也有注释:Some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition


          

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved