引言:
RDD(Resilient Distributed Dataset)叫做分布式数据集,是 Spark 中最基本的数据抽象。 代码中是一个抽象类, 它代表一个不可变、可分区、里面的元素可并行计算的集合。
参考:
[toc]
RDD概述 javaIO和RDD java IO
只是结构转换,真正readLine的时候,才开始去读
RDD简介
RDD是抽象类,和IO的InputStream类似,底下有不同的实现类。
RDD适合分布式读文件,javaIO只适合读特定文件。
只有collect时候,才开始真正读取。RDD只封装了数据处理逻辑,并没有在处理数据。而javaIO里面还是有Buffer的。
RDD介绍 RDD(Resilient Distributed Dataset)叫做分布式数据集,是 Spark 中最基本的数据抽象。 代码中是一个抽象类, 它代表一个不可变(为了保护数据)、可分区(为了并行计算)、里面的元素可并行计算的集合。
属性:
一个列表,存储存取每个 Partition 的优先位置(preferred location)。 (将计算任务发给数据所在节点:移动数据不如移动计算)
特点:
RDD编程 RDD创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.mxx.sparkimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf , SparkContext }object C01_RDD { def main (args: Array [String ]): Unit = { val sparkConf = new SparkConf ().setMaster("local[*]" ).setAppName("WordCount" ) val sc = new SparkContext (sparkConf) val listRDD: RDD [Int ] = sc.makeRDD(List (1 ,2 ,3 ,4 )) val listRDD2: RDD [Int ] = sc.parallelize(List (1 ,2 ,3 ,4 )) sc.textFile("in" ) listRDD.collect().foreach(println) } }
RDD分区 分区是为了并行,默认是怎么分区的?
demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 val listRDD1: RDD [Int ] = sc.makeRDD(List (1 ,2 ,3 ,4 ))val listRDD2: RDD [String ] = sc.textFile("input" )val listRDD3: RDD [Int ] = sc.makeRDD(List (1 ,2 ,3 ,4 ),3 )val listRDD4: RDD [String ] = sc.textFile("input" ,3 )listRDD4.saveAsTextFile("output" )
源码解析
1 2 3 4 5 6 7 8 9 override def defaultParallelism (): Int = { conf.getInt("spark.default.parallelism" , math.max(totalCoreCount.get(), 2 )) } def defaultMinPartitions : Int = math.min(defaultParallelism, 2 )
RDD转换 RDD 整体上分为 Value 类型和 Key-Value 类型
Value 类型 map算子 1 2 3 4 5 6 val listRDD: RDD [Int ] = sc.makeRDD(1 to 10 )val mapRDD: RDD [Int ] = listRDD.map( _*2 )
mapPartitions算子 1 2 3 4 5 6 7 8 9 10 val mapPartitionsRDD: RDD [Int ] = listRDD.mapPartitions(datas => { datas.map(_ * 2 ) }) mapPartitionsRDD.collect().foreach(println)
map & mapPartitions区别 原理区别:
为什么需要mapPartitions?
map:10条数据有可能发给3个Executor,算10个计算。
mapPartitions:datas.map(_ * 2)算一个计算,spark的计算是指发给executor的计算,所以两个分区只需要发给两个Executor,减少网络交互,计算效率高。
但是mapPartitions以分区为单位释放内存,可能造成内存溢出(oom)
mapPartitionsWithIndex 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 val mapPartitionsWithIndexRDD: RDD [(Int , Int )] = listRDD.mapPartitionsWithIndex( (index,datas)=>datas.map( (index,_) ) )mapPartitionsWithIndexRDD.collect().foreach(println)
原理图:
Driver & Executor的关系
所有算子的计算功能都是Excutor做
Driver和Excutor可能不是一台机器,所以在Driver申明的变量i,在算子操作时需要序列化,然后网络io传到Executor去。因此要保证 i 可以序列化(他可能时user)。
flatMap 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 val sourceFlat: RDD [Int ] = sc.makeRDD(1 to 5 )val flatMapRDD: RDD [String ] = sourceFlat.flatMap( x => List (x+"a" ,x+"b" ) )flatMapRDD.collect().foreach(println) val sourceFlat2: RDD [List [Int ]] = sc.makeRDD(Array (List (1 ,2 ), List (3 ,4 )))val flatMapRDD2: RDD [Int ] = sourceFlat2.flatMap( datas => datas)flatMapRDD2.collect().foreach(println)
glom 1 2 3 4 5 6 7 8 9 10 val rdd: RDD [Int ] = sc.makeRDD(1 to 16 ,4 )val arrayRDD: RDD [Array [Int ]] = rdd.glom()arrayRDD.collect().foreach( array => println(array.mkString("," )) )
groupBy 1 2 3 4 5 6 7 8 val rdd2: RDD [Int ] = sc.parallelize(1 to 4 )val group: RDD [(Int , Iterable [Int ])] = rdd2.groupBy(_%2 )
filter 1 2 3 4 5 6 7 8 9 10 val sourceFilter2: RDD [String ] = sc.parallelize(Array ("xiaoming" ,"xiaojiang" ,"xiaohe" ,"dazhi" ))val filter: RDD [String ] = sourceFilter2.filter(_.contains("xiao" ))
sample fraction相当于抽样算法里面的因子?
1 2 3 4 5 6 7 8 9 10 val rdd3: RDD [Int ] = sc.parallelize(1 to 10 )val sample: RDD [Int ] = rdd3.sample(true ,0.4 ,2 )val sample2: RDD [Int ] = rdd.sample(false ,0.2 ,3 )
distinct 1 2 3 4 5 6 7 8 9 10 11 val rdd4: RDD [Int ] = sc.parallelize(List (1 ,2 ,1 ,5 ,2 ,9 ,6 ,1 ))val distinctRDD: RDD [Int ] = rdd4.distinct()distinctRDD.saveAsTextFile("output" ) val distinctRDD2: RDD [Int ] = rdd4.distinct(3 )
原理:
要去重,必须要所有分区做完,才有结果,所以需要等待。所以shuffle慢。
coalesce 1 2 3 4 5 6 val rdd5: RDD [Int ] = sc.parallelize(1 to 16 ,4 )val coalesceRDD: RDD [Int ] = rdd5.coalesce(3 )println(coalesceRDD.partitions.size)
如何缩减的?
比如最开始17个数,要分3个区,实际会被分4个区,因为16/3除不尽,变成4个分区:
(1,2,3,4,5 ) (6,7,8,9,10 ) (11,12,13,14,15) (16,17)
缩减策略:可以简单理解为合并分区,没有shuffle过程:
(1,2,3,4,5 ) (6,7,8,9,10 ) (11,12,13,14,15,16,17)
repartition 1 2 3 4 5 6 7 8 val rdd = sc.parallelize(1 to 16 ,4 )val rerdd = rdd.repartition(2 )println(rerdd.partitions.size)
coalesce & repartition 区别
coalesce 重新分区,可以选择是否进行 shuffle 过程。由参数 shuffle: Boolean = false/true 决 定。
repartition 实际上是调用的 coalesce,默认是进行 shuffle 的。
sortBy 1 2 3 4 5 val rdd = sc.parallelize(List (2 ,1 ,3 ,4 ))rdd.sortBy(x => x).collect().foreach(x=>print(x+" " ))
pipe 双 Value 类型交互 union 1 2 3 4 5 6 7 8 9 10 11 12 13 scala> scala> val rdd1 = sc.parallelize(1 to 5 ) rdd1: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [0 ] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(5 to 10 ) rdd2: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [1 ] at parallelize at <console>:24 scala> val rdd3 = rdd1.union(rdd2) rdd3: org.apache.spark.rdd.RDD [Int ] = UnionRDD [2 ] at union at <console>:28 scala> rdd3.collect() res0: Array [Int ] = Array (1 , 2 , 3 , 4 , 5 , 5 , 6 , 7 , 8 , 9 , 10 )
subtract 1 2 3 4 5 6 7 8 9 10 scala> scala> val rdd = sc.parallelize(3 to 8 ) rdd: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [3 ] at parallelize at <console>:24 scala> val rdd1 = sc.parallelize(1 to 5 ) rdd1: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [4 ] at parallelize at <console>:24 scala> rdd.subtract(rdd1).collect() res1: Array [Int ] = Array (6 , 8 , 7 )
intersection 1 2 3 4 5 6 7 8 9 10 11 12 13 scala> scala> val rdd1 = sc.parallelize(1 to 7 ) rdd1: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [9 ] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(5 to 10 ) rdd2: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [10 ] at parallelize at <console>:24 scala> val rdd3 = rdd1.intersection(rdd2) rdd3: org.apache.spark.rdd.RDD [Int ] = MapPartitionsRDD [16 ] at intersection at <console>:28 scala> rdd3.collect() res2: Array [Int ] = Array (6 , 7 , 5 )
cartesian 1 2 3 4 5 6 7 8 9 10 scala> scala> val rdd1 = sc.parallelize(1 to 3 ) rdd1: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [17 ] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(2 to 5 ) rdd2: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [18 ] at parallelize at <console>:24 scala> rdd1.cartesian(rdd2).collect() res3: Array [(Int , Int )] = Array ((1 ,2 ), (1 ,3 ), (1 ,4 ), (1 ,5 ), (2 ,2 ), (2 ,3 ), (3 ,2 ), (3 ,3 ), (2 ,4 ), (2 ,5 ), (3 ,4 ), (3 ,5 ))
zip 1 2 3 4 5 6 7 8 9 10 11 12 13 scala> scala> val rdd1 = sc.parallelize(Array (1 ,2 ,3 ),3 ) rdd1: org.apache.spark.rdd.RDD [Int ] = ParallelCollectionRDD [20 ] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(Array ("a" ,"b" ,"c" ),3 ) rdd2: org.apache.spark.rdd.RDD [String ] = ParallelCollectionRDD [21 ] at parallelize at <console>:24 scala> rdd1.zip(rdd2).collect res4: Array [(Int , String )] = Array ((1 ,a), (2 ,b), (3 ,c)) scala> rdd2.zip(rdd1).collect res5: Array [(String , Int )] = Array ((a,1 ), (b,2 ), (c,3 ))
Key-Value 类型 k-v算子
partitionBy 1 2 3 4 5 6 7 8 9 10 11 12 13 14 scala> scala> scala> val rdd = sc.parallelize(Array ((1 ,"aaa" ),(2 ,"bbb" ),(3 ,"ccc" ),(4 ,"ddd" )),4 ) rdd: org.apache.spark.rdd.RDD [(Int , String )] = ParallelCollectionRDD [24 ] at parallelize at <console>:24 scala> scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner (2 )) rdd2: org.apache.spark.rdd.RDD [(Int , String )] = ShuffledRDD [25 ] at partitionBy at <console>:26 scala> rdd2.partitions.size res6: Int = 2
自定义分区器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class MyPartitioner (partitions: Int ) extends Partitioner { override def numPartitions : Int = { partitions } override def getPartition (key: Any ): Int = { 1 } } def main (args: Array [String ]): Unit = { val sparkConf = new SparkConf ().setMaster("local[*]" ).setAppName("WordCount" ) val sc = new SparkContext (sparkConf) val rdd1: RDD [(Int , String )] = sc.makeRDD(List ((1 ,"a" ),(2 ,"b" ),(3 ,"c" )),4 ) val rdd2: RDD [(Int , String )] = rdd1.partitionBy(new MyPartitioner (2 )) rdd2.saveAsTextFile("output" ) }
groupByKey 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 scala> scala> val words = Array ("one" , "two" , "two" , "three" , "three" , "three" ) words: Array [String ] = Array (one, two, two, three, three, three) scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1 )) wordPairsRDD: org.apache.spark.rdd.RDD [(String , Int )] = MapPartitionsRDD [27 ] at map at <console>:26 scala> val group = wordPairsRDD.groupByKey() group: org.apache.spark.rdd.RDD [(String , Iterable [Int ])] = ShuffledRDD [28 ] at groupByKey at <console>:28 scala> group.collect() res7: Array [(String , Iterable [Int ])] = Array ((two,CompactBuffer (1 , 1 )), (one,CompactBuffer (1 )), (three,CompactBuffer (1 , 1 , 1 ))) scala> group.map(t => (t._1, t._2.sum)).collect() res10: Array [(String , Int )] = Array ((two,2 ), (one,1 ), (three,3 ))
reduceByKey 1 2 3 4 5 6 7 8 9 10 11 12 13 14 scala> scala> scala> val rdd = sc.parallelize(List (("female" ,1 ),("male" ,5 ),("female" ,5 ),("male" ,2 ))) rdd: org.apache.spark.rdd.RDD [(String , Int )] = ParallelCollectionRDD [31 ] at parallelize at <console>:24 scala> scala> val reduce = rdd.reduceByKey((x,y) => x+y) reduce: org.apache.spark.rdd.RDD [(String , Int )] = ShuffledRDD [32 ] at reduceByKey at <console>:26 scala> reduce.collect() res11: Array [(String , Int )] = Array ((female,6 ), (male,7 ))
reduceByKey & groupByKey 区别
reduceByKey:按照 key 进行聚合,在 shuffle 之前有 combine(预聚合)操作
groupByKey:按照 key 进行分组,直接进行 shuffle。
reduceByKey比groupByKey性能好。
aggregateByKey foldByKey combineByKey sortByKey mapValues join cogroup 案例