avatar

目录
Spark-基础

引言:

Spark是一种基于内存的快速、通用、可扩展的大数据分析引擎。

参考:

[toc]

概述

历史渊源

1.X的mapreduce缺陷

  • 主要用于一次性计算,不适合机器学习数据挖掘的迭代计算。
  • 基于文件存储介质,非常慢。
  • MR和Hadoop紧密耦合,无法动态替换

2.x

  • 提出了Yarn资源调度
  • 将资源管理RM和任务调度Driver解耦,可插拔。

Spark

  • 计算:迭代计算之间的数据是基于内存的,基于Scale语言,非常适合迭代式计算。
  • 存储:还是基于Hadoop。
  • 利用2.X的Yarn解耦,变成HDFS+Yarn+Spark

模块

image-20200211173725703

Spark运行模式

重要角色

Driver: 管理

Executor: 计算

Local模式

sh
1
2
3
4
5
6
7
8
9
10
11
12
tar -zxvf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
mv spark-2.1.1-bin-hadoop2.7 spark
# 官方案例:迭代计算pi
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
# 启动spark-shell
bin/spark-shell
# ui http://spark101:4040/

shell-WordCount

读取> 扁平化> 分组> 聚合

sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 启动spark-shell
# 在spark根目录下创建input目录,写两个文件进去
# 路径:相对路径,spark的上下文对象sc能判断

# 读取:返回一行一行的数据
sc.textFile("input")
res0: org.apache.spark.rdd.RDD[String] = input MapPartitionsRDD[1] at textFile at <console>:25

# 扁平化:(hello world, hello scala) -> hello,world,hello,scala
sc.textFile("input").flatMap(_.split(" "))
res2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:25

# 结构转换:map(x=>{(x,1)}) 简化 map((_,1))
sc.textFile("input").flatMap(_.split(" ")).map((_,1))
org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:25

# 分组聚合: key就是单词,聚合就是把value两两相加,两个_代表参数1和参数2 reduceByKey(x1,x2 => x1+x2)
sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
res4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[13] at reduceByKey at <console>:25

# 搜集,展示
sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res5: Array[(String, Int)] = Array((scala,1), (world,1), (hello,3), (spark,1))

程序解析

image-20200212101811444

idea-WordCount

pom

xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mxx</groupId>
<artifactId>demo-spark</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>

<build>
<finalName>WordCount</finalName>
<plugins>
<!-- 自动增加scala支持 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

WordCount

scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.mxx.spark

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
def main(args: Array[String]): Unit = {

// 获得sc
// local模式
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(sparkConf)
// println(sc)

// 读取:
// 没有配置yarn:默认从当前部署环境找相对目录
// 配置了yarn:会读取hdfs文件
// 如果本地查找:file:///opt/module/spark/input
val lines = sc.textFile("file:///opt/module/spark/input")

// 扁平化
val words = lines.flatMap(_.split(" "))

// 结构转换
val wordToOne = words.map( (_,1) )

// 分组聚合
val wordToSum = wordToOne.reduceByKey(_+_)

// 搜集到Driver打印
val result = wordToSum.collect()

// 控制台显示
result.foreach(println)
}
}

Yarn模式

概述

有 yarn-client 和 yarn-cluster
两种模式, 主要区别在于: Driver 程序的运行节点。


安装使用

  1. 安装hadoop2.7

  2. 配置伪分布式hadoop

    hdfs+yarn+JobHistory+日志聚集

  3. 配置spark

    yarn-site.xml

    xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,
    则直接将其杀掉,默认是 true -->
    <property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
    </property>
    <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,
    则直接将其杀掉,默认是 true -->
    <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
    </property>

    spark-env.sh

    sh
    1
    YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop
  4. 配置日志

    参考:spark history server配置使用

    spark-defaults.conf

    Code
    1
    2
    3
    4
    spark.eventLog.enabled           true
    spark.eventLog.dir hdfs://mxxcentos6:9000/spark_event_data
    spark.eventLog.compress true
    spark.yarn.historyServer.address http://mxxcentos6:18080

    spark-env.sh

    sh
    1
    export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://mxxcentos6:9000/spark_event_data"

    访问:

    Code
    1
    http://spark101:18080/

yarn部署spark流程

yarn调度流程

image-20200212144204404

spark程序是在container里跑的

spark yarn部署流程

image-20200212153859818

wordcount程序打包到环境中运行

Code
1
2
3
bin/spark-submit \
--class com.mxx.spark.WordCount \
/opt/module/WordCount-jar-with-dependencies.jar

Standalone模式

不需要hadoop, 构建一个由 Master+Slave 构成的 Spark 集群, Spark 运行在集群中。

image-20200212161031490

安装使用

Mesos 模式

模式对比

模式 Spark 安装机器数 需启动的进程 所属者
Local 1 Spark
Standalone 3 Master 及 Worker Spark
Yarn 1 Yarn 及 HDFS Hadoop
文章作者: Machine
文章链接: https://machine4869.gitee.io/2020/02/11/20200211151842560/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论