引言:
Spark是一种基于内存的快速、通用、可扩展的大数据分析引擎。
参考:
[toc]
概述
历史渊源
1.X的mapreduce缺陷
- 主要用于一次性计算,不适合机器学习数据挖掘的迭代计算。
- 基于文件存储介质,非常慢。
- MR和Hadoop紧密耦合,无法动态替换
2.x
- 提出了Yarn资源调度
- 将资源管理RM和任务调度Driver解耦,可插拔。
Spark
- 计算:迭代计算之间的数据是基于内存的,基于Scale语言,非常适合迭代式计算。
- 存储:还是基于Hadoop。
- 利用2.X的Yarn解耦,变成HDFS+Yarn+Spark
模块
Spark运行模式
重要角色
Driver: 管理
Executor: 计算
Local模式
1 2 3 4 5 6 7 8 9 10 11 12
| tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/ mv spark-2.1.1-bin-hadoop2.7 spark
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --executor-memory 1G \ --total-executor-cores 2 \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100
bin/spark-shell
|
shell-WordCount
读取> 扁平化> 分组> 聚合
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
sc.textFile("input") res0: org.apache.spark.rdd.RDD[String] = input MapPartitionsRDD[1] at textFile at <console>:25
sc.textFile("input").flatMap(_.split(" ")) res2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:25
sc.textFile("input").flatMap(_.split(" ")).map((_,1)) org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:25
sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) res4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[13] at reduceByKey at <console>:25
sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect res5: Array[(String, Int)] = Array((scala,1), (world,1), (hello,3), (spark,1))
|
程序解析
idea-WordCount
pom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.mxx</groupId> <artifactId>demo-spark</artifactId> <version>1.0-SNAPSHOT</version>
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies>
<build> <finalName>WordCount</finalName> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <archive> <manifest> <mainClass>WordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
|
WordCount
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.mxx.spark
import org.apache.spark.{SparkConf, SparkContext}
object WordCount { def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(sparkConf)
val lines = sc.textFile("file:///opt/module/spark/input")
val words = lines.flatMap(_.split(" "))
val wordToOne = words.map( (_,1) )
val wordToSum = wordToOne.reduceByKey(_+_)
val result = wordToSum.collect()
result.foreach(println) } }
|
Yarn模式
概述
有 yarn-client 和 yarn-cluster
两种模式, 主要区别在于: Driver 程序的运行节点。
安装使用
安装hadoop2.7
配置伪分布式hadoop
hdfs+yarn+JobHistory+日志聚集
配置spark
yarn-site.xml
1 2 3 4 5 6 7 8 9 10 11 12
|
<property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property>
<property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
|
spark-env.sh
1
| YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
|
配置日志
参考:spark history server配置使用
spark-defaults.conf
1 2 3 4
| spark.eventLog.enabled true spark.eventLog.dir hdfs://mxxcentos6:9000/spark_event_data spark.eventLog.compress true spark.yarn.historyServer.address http://mxxcentos6:18080
|
spark-env.sh
1
| export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://mxxcentos6:9000/spark_event_data"
|
访问:
yarn部署spark流程
yarn调度流程
spark程序是在container里跑的
spark yarn部署流程
wordcount程序打包到环境中运行
1 2 3
| bin/spark-submit \ --class com.mxx.spark.WordCount \ /opt/module/WordCount-jar-with-dependencies.jar
|
Standalone模式
不需要hadoop, 构建一个由 Master+Slave 构成的 Spark 集群, Spark 运行在集群中。
安装使用
…
Mesos 模式
…
模式对比
| 模式 |
Spark 安装机器数 |
需启动的进程 |
所属者 |
| Local |
1 |
无 |
Spark |
| Standalone |
3 |
Master 及 Worker |
Spark |
| Yarn |
1 |
Yarn 及 HDFS |
Hadoop |