avatar

目录
SparkCore

引言:

RDD(Resilient Distributed Dataset)叫做分布式数据集,是 Spark 中最基本的数据抽象。
代码中是一个抽象类, 它代表一个不可变、可分区、里面的元素可并行计算的集合。

参考:

[toc]

RDD概述

javaIO和RDD

java IO

image-20200213105119461

只是结构转换,真正readLine的时候,才开始去读


RDD简介

image-20200213110320014

RDD是抽象类,和IO的InputStream类似,底下有不同的实现类。

RDD适合分布式读文件,javaIO只适合读特定文件。

只有collect时候,才开始真正读取。RDD只封装了数据处理逻辑,并没有在处理数据。而javaIO里面还是有Buffer的。

RDD介绍

RDD(Resilient Distributed Dataset)叫做分布式数据集,是 Spark 中最基本的数据抽象。
代码中是一个抽象类, 它代表一个不可变(为了保护数据)、可分区(为了并行计算)、里面的元素可并行计算的集合。

属性:

一个列表,存储存取每个 Partition 的优先位置(preferred location)。 (将计算任务发给数据所在节点:移动数据不如移动计算)

特点:

  • 分区

  • 只读

    操作(算子,Operate):解决问题就是状态转换。

    所有RDD方法都叫算子,分两类:转换算子,行动算子

  • 依赖

  • 缓存

    血缘关系很长时,为了防止丢失,可以缓存起来。

RDD编程

RDD创建

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.mxx.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object C01_RDD {
def main(args: Array[String]): Unit = {
// 获得sc
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(sparkConf)

// 创建RDD
// 1) 从内存中创建 makeRDD, 底层实现就是parallelize
val listRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 2) 从内存中创建 parallelize
val listRDD2: RDD[Int] = sc.parallelize(List(1,2,3,4))
// 3) 从外部存储中创建
sc.textFile("in")

listRDD.collect().foreach(println)
}
}

RDD分区

分区是为了并行,默认是怎么分区的?

demo

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// parallelize默认分区
val listRDD1: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// listRDD1.saveAsTextFile("output") // 生成了16个分区文件

// textFile默认分区
val listRDD2: RDD[String] = sc.textFile("input")
// listRDD2.saveAsTextFile("output") // 生成了2个分区文件

// parallelize指定分区
val listRDD3: RDD[Int] = sc.makeRDD(List(1,2,3,4),3)
// listRDD3.saveAsTextFile("output") // 生成了16个分区文件

// textFile指定分区
// minPartitions,取的最小分区数,但有可能实际分区比指定的3大,因为依赖hadoop分区规则
// hadoopFile,切片规则依赖hdfs
val listRDD4: RDD[String] = sc.textFile("input",3)
listRDD4.saveAsTextFile("output") // 生成了2个分区文件

源码解析

scala
1
2
3
4
5
6
7
8
9
// parallelize分区策略:
// spark.default.parallelism配置
// 读取cpu核数,与2比较,选大的
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

// textFile分区策略:最小分区取2
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

RDD转换

RDD 整体上分为 Value 类型和 Key-Value 类型

Value 类型

map算子

scala
1
2
3
4
5
6
// map算子
// 1. 作用: 返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成
// 2. 需求: 创建一个 1-10 数组的 RDD,将所有元素*2 形成新的 RDD
val listRDD: RDD[Int] = sc.makeRDD(1 to 10)
val mapRDD: RDD[Int] = listRDD.map( _*2 )
// mapRDD.collect().foreach(println)

mapPartitions算子

scala
1
2
3
4
5
6
7
8
9
10
// mapPartitions
// 1. 作用: 类似于 map,但独立地在 RDD 的每一个分片(分区)上运行,而map是在每条数据上运行。
// Iterator[T] => Iterator[U]
// mapPartitions可理解为批处理,有两个分区就走两遍mapPartitions。
// 2. 需求: 创建一个 RDD,使每个元素*2 组成新的 RDD
val mapPartitionsRDD: RDD[Int] = listRDD.mapPartitions(datas => {
// 这个map是scala map,返回的是Iterator
datas.map(_ * 2)
})
mapPartitionsRDD.collect().foreach(println)

map & mapPartitions区别

原理区别:

image-20200213132148062

为什么需要mapPartitions?

  1. map:10条数据有可能发给3个Executor,算10个计算。
  2. mapPartitions:datas.map(_ * 2)算一个计算,spark的计算是指发给executor的计算,所以两个分区只需要发给两个Executor,减少网络交互,计算效率高。
  3. 但是mapPartitions以分区为单位释放内存,可能造成内存溢出(oom)

mapPartitionsWithIndex

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 1. 作用:类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值
// (Int, Interator[T]) => Iterator[U]
// 2. 需求: 创建一个 RDD,使每个元素跟所在分区形成一个元组组成一个新的 RDD
// mapPartitionsWithIndex
val mapPartitionsWithIndexRDD: RDD[(Int, Int)] = listRDD.mapPartitionsWithIndex( (index,datas)=>datas.map( (index,_) ) )
mapPartitionsWithIndexRDD.collect().foreach(println)
//(0,1)
//(0,2)
//(0,3)
//(1,4)
//(1,5)
//(1,6)
//(2,7)
//(2,8)
//(2,9)
//(2,10)

原理图:

image-20200213133716624

Driver & Executor的关系

image-20200213134903396
  1. 所有算子的计算功能都是Excutor做
  2. Driver和Excutor可能不是一台机器,所以在Driver申明的变量i,在算子操作时需要序列化,然后网络io传到Executor去。因此要保证 i 可以序列化(他可能时user)。

flatMap

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 1. 作用:类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素。扁平化。
// 2. 需求:创建一个元素为 1-5 的 RDD,运用 flatMap 创建一个新的 RDD,新的 RDD 为原RDD 的每个元素的 2 倍(2, 4, 6, 8, 10)

// flatMap demo1
val sourceFlat: RDD[Int] = sc.makeRDD(1 to 5)
val flatMapRDD: RDD[String] = sourceFlat.flatMap( x => List(x+"a",x+"b") )
flatMapRDD.collect().foreach(println)
//1a
//1b
//2a
//2b
//3a
//3b
//4a
//4b
//5a
//5b

// flatMap demo2 展开
val sourceFlat2: RDD[List[Int]] = sc.makeRDD(Array(List(1,2), List(3,4)))
val flatMapRDD2: RDD[Int] = sourceFlat2.flatMap( datas => datas)
flatMapRDD2.collect().foreach(println)
//1
//2
//3
//4

glom

scala
1
2
3
4
5
6
7
8
9
10
// 1. 作用: 将每一个分区形成一个数组,形成新的 RDD 类型 RDD[Array[T]]。可用于以分区为单位的数据统计。
// 2. 需求:创建一个 4 个分区的 RDD,并将每个分区的数据放到一个数组
// glom
val rdd: RDD[Int] = sc.makeRDD(1 to 16,4)
val arrayRDD: RDD[Array[Int]] = rdd.glom()
arrayRDD.collect().foreach( array => println(array.mkString(",")) )
//1,2,3,4
//5,6,7,8
//9,10,11,12
//13,14,15,16

groupBy

scala
1
2
3
4
5
6
7
8
// groupBy
//1. 作用: 分组,按照传入函数的返回值进行分组。 将相同的 key 对应的值放入一个迭代器。
//2. 需求:创建一个 RDD,按照元素模以 2 的值进行分组。
val rdd2: RDD[Int] = sc.parallelize(1 to 4)
val group: RDD[(Int, Iterable[Int])] = rdd2.groupBy(_%2)
// group.collect().foreach(println)
//(0,CompactBuffer(2, 4))
//(1,CompactBuffer(1, 3))

filter

scala
1
2
3
4
5
6
7
8
9
10
// filter
//1. 作用: 过滤。 返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入
//元素组成。
//2. 需求:创建一个 RDD(由字符串组成),过滤出一个新 RDD(包含”xiao”子串)
val sourceFilter2: RDD[String] = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
val filter: RDD[String] = sourceFilter2.filter(_.contains("xiao"))
// filter.collect().foreach(println)
//xiaoming
//xiaojiang
//xiaohe

sample

fraction相当于抽样算法里面的因子?

scala
1
2
3
4
5
6
7
8
9
10
// sample(withReplacement, fraction, seed)
//1. 作用: 以指定的随机种子随机抽样出数量为 fraction 的数据, withReplacement 表示是抽出的数据是否放回, true 为有放回的抽样, false 为无放回的抽样, seed 用于指定随机数生成器种子。
// 应用:从一亿条数据中抽样来进行数据分析。
//2. 需求:创建一个 RDD(1-10),从中选择放回和不放回抽样
val rdd3: RDD[Int] = sc.parallelize(1 to 10)
// 有放回:
val sample: RDD[Int] = rdd3.sample(true,0.4,2)
// sample.collect().foreach(x=>print(x+" ")) // 3 4 4 5 6 8
val sample2: RDD[Int] = rdd.sample(false,0.2,3)
// sample2.collect().foreach(x=>print(x+" ")) // 1 8 14

distinct

scala
1
2
3
4
5
6
7
8
9
10
11
// distinct([numTasks]))
//1. 作用: 对源 RDD 进行去重后返回一个新的 RDD。 默认情况下,只有 8 个并行任务来操作,但是可以传入一个可选的 numTasks 参数改变它。
// numTasks的意义:去重后数据量变小,不用原来那么多分区了,因此可以控制分区变小一点。
//2. 需求:创建一个 RDD, 使用 distinct()对其去重。
val rdd4: RDD[Int] = sc.parallelize(List(1,2,1,5,2,9,6,1))
val distinctRDD: RDD[Int] = rdd4.distinct()
// distinctRDD.collect().foreach(x=>print(x+" ")) // 1 2 5 6 9
distinctRDD.saveAsTextFile("output")
// 指定分区
val distinctRDD2: RDD[Int] = rdd4.distinct(3)
// distinctRDD2.saveAsTextFile("output")

原理:

image-20200213160152966

要去重,必须要所有分区做完,才有结果,所以需要等待。所以shuffle慢。

coalesce

scala
1
2
3
4
5
6
// coalesce(numPartitions) 案例
//1. 作用: 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
//2. 需求:创建一个 4 个分区的 RDD,对其缩减分区
val rdd5: RDD[Int] = sc.parallelize(1 to 16,4)
val coalesceRDD: RDD[Int] = rdd5.coalesce(3)
println(coalesceRDD.partitions.size) //3

如何缩减的?

比如最开始17个数,要分3个区,实际会被分4个区,因为16/3除不尽,变成4个分区:

(1,2,3,4,5 ) (6,7,8,9,10 ) (11,12,13,14,15) (16,17)

缩减策略:可以简单理解为合并分区,没有shuffle过程:

(1,2,3,4,5 ) (6,7,8,9,10 ) (11,12,13,14,15,16,17)

repartition

scala
1
2
3
4
5
6
7
8
//repartition(numPartitions)
//1. 作用: 根据分区数, 重新通过网络随机洗牌所有数据。
//2. 需求:创建一个 4 个分区的 RDD,对其重新分区
val rdd = sc.parallelize(1 to 16,4)
val rerdd = rdd.repartition(2)
println(rerdd.partitions.size) // 2
// repartition底层:默认shuffle的
// coalesce(numPartitions, shuffle = true)

coalesce & repartition 区别

  1. coalesce 重新分区,可以选择是否进行 shuffle 过程。由参数 shuffle: Boolean = false/true 决
    定。
  2. repartition 实际上是调用的 coalesce,默认是进行 shuffle 的。

sortBy

scala
1
2
3
4
5
//sortBy(func,[ascending], [numTasks])
//1. 作用;使用 func 先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
//2. 需求:创建一个 RDD,按照不同的规则进行排序
val rdd = sc.parallelize(List(2,1,3,4))
rdd.sortBy(x => x).collect().foreach(x=>print(x+" ")) // 1 2 3 4

pipe

双 Value 类型交互

union

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
scala> // 作用: 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:28

scala> rdd3.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)

subtract

scala
1
2
3
4
5
6
7
8
9
10
scala> // 作用: 计算差的一种函数,去除两个 RDD 中相同的元素,不同的 RDD 将保留下来

scala> val rdd = sc.parallelize(3 to 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> rdd.subtract(rdd1).collect()
res1: Array[Int] = Array(6, 8, 7)

intersection

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
scala> // 作用: 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

scala> val rdd1 = sc.parallelize(1 to 7)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val rdd3 = rdd1.intersection(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at intersection at <console>:28

scala> rdd3.collect()
res2: Array[Int] = Array(6, 7, 5)

cartesian

scala
1
2
3
4
5
6
7
8
9
10
scala> // 作用: 笛卡尔积(尽量避免使用)

scala> val rdd1 = sc.parallelize(1 to 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(2 to 5)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24

scala> rdd1.cartesian(rdd2).collect()
res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (3,2), (3,3), (2,4), (2,5), (3,4), (3,5))

zip

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
scala> // 作用: 将两个 RDD 组合成 Key/Value 形式的 RDD,这里默认两个 RDD 的 partition 数量以及元素数量都相同,否则会抛出异常。 

scala> val rdd1 = sc.parallelize(Array(1,2,3),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(Array("a","b","c"),3)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> rdd1.zip(rdd2).collect
res4: Array[(Int, String)] = Array((1,a), (2,b), (3,c))

scala> rdd2.zip(rdd1).collect
res5: Array[(String, Int)] = Array((a,1), (b,2), (c,3))

Key-Value 类型

k-v算子

partitionBy

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> // 作用:对 pairRDD 进行分区操作,如果原有的 partionRDD 和现有的 partionRDD 是一leRDD,即会产生 shuffle 过程。. Shuff 

scala> // 相当于自定义分区策略来进行分区。

scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> // HashPartitioner, 一个自带的分区器

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at <console>:26

scala> rdd2.partitions.size
res6: Int = 2

自定义分区器

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MyPartitioner(partitions: Int) extends Partitioner {
override def numPartitions: Int = {
partitions
}

override def getPartition(key: Any): Int = {
1
}
}

def main(args: Array[String]): Unit = {
// 获得sc
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(sparkConf)


val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1,"a"),(2,"b"),(3,"c")),4)
val rdd2: RDD[(Int, String)] = rdd1.partitionBy(new MyPartitioner(2))
rdd2.saveAsTextFile("output") // 数据都在1号分区

}

groupByKey

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> // 需求:创建一个 pairRDD,将相同 key 对应值聚合到一个 sequence 中,并计算相同 key对应值的相加结果。

scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)

scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:26

scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[28] at groupByKey at <console>:28

scala> group.collect()
res7: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

scala> group.map(t => (t._1, t._2.sum)).collect()
res10: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

reduceByKey

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> // 使用指定的 reduce 函数,将相同key 的值聚合到一起(value1,value2), reduce 任务的个数可以通过第二个可选的参数来设置。

scala> // 需求:创建一个 pairRDD,计算相同 key 对应值的相加结果

scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[31] at parallelize at <console>:24

scala> // 使相同key的value(x和y)两两相加

scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[32] at reduceByKey at <console>:26

scala> reduce.collect()
res11: Array[(String, Int)] = Array((female,6), (male,7))

reduceByKey & groupByKey 区别

image-20200220220201840
  1. reduceByKey:按照 key 进行聚合,在 shuffle 之前有 combine(预聚合)操作
  2. groupByKey:按照 key 进行分组,直接进行 shuffle。
  3. reduceByKey比groupByKey性能好。

aggregateByKey

foldByKey

combineByKey

sortByKey

mapValues

join

cogroup

案例

文章作者: Machine
文章链接: https://machine4869.gitee.io/2020/02/13/20200213103244757/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论