avatar

目录
Hadoop-基础篇

参考:

简介

大数据时代已经到来,越来越多的行业面临着大量数据需要存储以及分析的挑战。Hadoop,作为一个开源的分布式并行处理平台,以其高扩展、高效率、高可靠等优点,得到越来越广泛的应用。本课旨在培养学员理解Hadoop的架构设计以及掌握Hadoop的运用能力。

第1章 初识Hadoop

1-1 Hadoop大数据平台架构与实践

学习目标:
掌握大数据存储与处理技术的原理(理论)
掌握Hadoop的使用和开发能力
学习建议:
《Hadoop技术详解》《Hadoop权威指南》
Linux命令、Java编程基础

1.大数据的相关概念
2.Hadoop的架构和运行机制
3.实战:Hadoop的安装和配置
4.实战:Hadoop开发

1-2 Hadoop的前世今生

系统瓶颈:存储容量 读写速率 运行效率
google 提出三大关键技术mapreduce Bigtable GFS
革命性变化:
1、降低成本,普通PC集群;
2、硬件故障是常态,利用软件保证高可靠性;
3、简化并行计算,无须同步和数据交换

hadoop:是模拟谷歌的分布式的开源实现,其作用是降低成本,可容错,高效计算
容错性:硬件故障是常态,通过软件来保证可靠性

1-3 Hadoop的功能与优势

Hadoop=分布式存储+分布式计算
包括两个核心组成:
HDFS:分布式文件系统,存储海量的数据
MapReduce:并行处理框架,实现任务分解和调度。

优势:高扩展、低成本
Hadoop可以用来搭建大型数据仓库,PB级数据的存储、处理、分析、统计等业务

1-4 Hadoop生态系统及版本

Hadoop不仅包括HDFS和MapReduce,还包括一次额开源工具。比如:
HIVE,本意是小蜜蜂,轻盈。
HBASE,存储结构化数据的分布式数据库。区别于HDFS的一点是,HABASE提供数据的随机读写和实时访问,实现对表数据的读写功能。
zookeeper,动物管理员。 监控Hadoop集群里面每个节点的状态,管理整个集群的配置,维护节点之家数据的(一次?依次)性

hive使sql转成一个hadoop任务去执行,降低hadoop的门槛。
hive(sql语句转换工具) habse(结构型数据,随机写入和实时读取) zookeeper(监控个节点使用、配置)。
habase存储结构化数据的分布式数据库,放弃事务特性,追求更高的扩展。habase提供数据的随机读写和实时访问,实现对表数据的读写功能。
zookeeper监控hadoop每个节点的状态,管理集群配置,维护节点间数据的一致性。
zookeeper的作用:
1)监控hadoop每个节点的状态
2)管理整个集群的配置
3)维护节点间数据的一致性

hadoop生态系统:
1.hdfs
2.mapreduce
3.相关开源工具:
(1)hive:将简单的sql语句转化为hadoop任务,降低使用hadoop的门槛
(2)HBASE:区别于传统数据库:存储结构化数据的分布式数据库,放弃事务特性,追求更高的扩展,它提供数据的随机读写和实时访问,实现对表数据的读写功能
(3)zookeeper:监控Hadoop集群里的每个节点的状态,管理整个集群的配置,维护数据节点之间的一致性
Hadoop版本最高2.6,初学者建议1.2(ver1.2-稳定)

第2章 Hadoop安装

2-4 安装小结

修改4个配置文件
(a) 修改hadoop-env.sh,设置JAVA_HOME
(b) 修改core-site.xml,设置hadoop.tmp.dir, dfs.name.dir, fs.default.name
(c) 修改mapred-site.xml,设置mapred.job.tracker
(d) 修改hdfs-site.xml,设置dfs.data.dir

hadoop安装步骤:
1、安装JDK:apt-get install openjdk-7-jdk;
2、设置环境变量:JAVA_HOME、JRE_HOME、CLASSPATH、PATH(在/etc/profile)
3、下载hadoop安装包并解压到指定目录下;
4、设置环境变量:HADOOP_HOME、PATH(在/etc/profile)
5、修改相关配置文件$HADOOP_HOME/conf:
​ 1)修改hadoop-env.sh,设置JAVA_HOME;
​ 2)修改core-site.xml,设置hadoop.tmp.dir、dfs.name.dir、fs.default.name;
​ 3)修改mapred-site.xml,设置mapred.job.tracker;
​ 4)修改hdfs-site.xml,设置dfs.data.dir;
6、格式化:hadoop namenode -format;
7、启动:start-all.sh
8、检查:jps

第3章 Hadoop的核心-HDFS简介

3-1 HDFS基本概念

HDFS的文件被分成快进行存储
HDFS块的默认大小64MB
块是文件村春处理的逻辑单元

nameNode 是管理节点,用来读取元数据的
DataNode是数据节点,用来存储数据的

HDFS——文件系统
MapReduce——并行计算框架

namenode是管理节点,存放文件元数据,元数据包括两部分:

  1. 文件与数据块的映射表
  2. 数据块与数据节点的映射表

5a3b288900016b4b12800720

3-2 数据管理策略

hdf数据管理策略:

  1. hdfs是采用master-slave的模式关管理文件,即一个master(namenade:保存datanode的一些基本信息和元数据)和多个slave(datanode:真正的存贮单元,里面存储了真实数据)
  2. hdfs默认保存三份文件,有两份保存在同一台机器上,另外一份(备份文件)保存到另外一台机器上,确保当一台机器挂了时能保存数据的存在
  3. namenade也有一个备用节点:Secondary NameNode,当namenode挂了时secondaryNameNode就变为nameNode的角色进行管理数据
  4. datandoe会采用心跳的方式时不时的想namenode报告自己的基本信息,比如网络是否正常,运行是否正确常。

3-3 HDFS中文件的读写操作

12617-20170308123104828-132826254

客户端发出读请求,namenode根据元数据返回给客户端,下载需要的block并组装

HDFS读取文件的流程:

(1)客户端向namenode发起读文件请求,把文件名,路径告诉namenode;

(2)namenode查询元数据,并把数据库返回客户端;

(3)此时客户端就明白文件包含哪些块,这些块在哪些datanode中可以找到;

12617-20170308123105109-1542248328

HDFS写数据:首先将文件拆分为默认大小64M的块。通知NameNode,找到并返回可用的datanode信息,客户端写入一个后,其他的进行流水线复制。最后更新元数据。

HDFS写文件流程:

(1)客户端把文件拆分成固定大小64M的块,并通知namenode;

(2)namenode找到可用的datanode返回给客户端;

(3)客户端根据返回的datanode,对块进行写入

(4)通过流水线管道流水线复制

(5)更新元数据。告诉namenode已经完成了创建心的数据块。保证了namenode中的元数据都是最新的状态。

3-4 HDFS特点

1)数据冗余,硬件容错;

2)流式的数据访问,写一次,读多次。数据没办法修改,如果要修改只能把之前的数据删除,然后重新写入一份;

3)适合存储大文件,这是设计之初就这么考虑的,如果是大量的小文件的话,不适合,因为一个小文件也要存储元数据,此时NameNode的压力会非常大。

4)适合数据的批量读写,吞吐量高,不适合做交互式应用,低延迟很难满足;

5)支持顺序读写,不支持此多用户并发写相同文件;

3-5 HDFS使用

hadoop的几个常用命令来演示如何在hadoop中创建一个目录,然后上传一个文件,然后再下载一个文件。

大致都是:

hadoop fs -put filea.dat input/

或者是get命令下载文件

或者是cat命令查看文件内容

或者是hdfsadmin命令来查看整个系统的一些统计信息。

第4章 Hadoop的核心-MapReduce

4-1 MapReduce的原理

所谓Map就是要将任务拆分成很多份,这里给了一个例子,有1000副牌,少了一张,然后要找出到底是少了哪一章。

12617-20170308123105359-963721942

然后又给了一个从日志中统计出访问此处最多的ip的例子,也都是类似的,先把日志分块,然后由不同的任务分别统计,然后再把他们的结果拿来合并,也就是Reduce了

4-2 MapReduce的运行流程

几个基本的概念:

  • Job&Task,比如上面的找出访问次数最多的任务就是一个Job,然后这个Job要完成的话要被分解为多个Task,放到不同的节点上去执行。Task又可以分为MapTask和ReduceTask

  • JobTracker,作业调度,分配任务并监控任务的执行进度,任务分配出去之后,TaskTracker每隔几秒钟要向JobTracker更新状态

  • TaskTracker,作业的执行

这几个概念互相之间的关系如下图:

12617-20170308123105625-357060316

一般来说,TaskTracker就是分配在其数据所在的DataNode上,这样可以保证运行的效率最高,毕竟读取本机的磁盘总是更快的。这也是MapReduce的一个设计思想,用移动计算来避免移动数据。

12617-20170308123105984-801660379

MapReduce的容错机制,

1)重复执行;也就是执行的过程中如果出错了,他会重复执行,但是重复了4次之后如果还出错,他就放弃了。

2)推测执行,用于解决那种计算速度特别慢的情况,此时会新开一个任务,然后再看这两个任务谁先执行完。

第5章 开发Hadoop应用程序

5-1~5-3 WordCount单词计数

需求:计算文件中出现每个单词的频数,输入结果按照字母顺序进行排序

Code
1
2
3
4
byte		3
hello 3
hadoop 4
world 2

思路:

map:切分
对每个词统计记1次

![屏幕快照 2019-08-14 下午2.47.39](20190620122348137/屏幕快照 2019-08-14 下午2.47.39.png)

reduce:合并
相同的key放在同一个节点

![屏幕快照 2019-08-14 下午2.48.16](20190620122348137/屏幕快照 2019-08-14 下午2.48.16.png)

代码实现

xml
1
2
3
4
5
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class WordCount {
// 1 两个静态内部类
// 2 继承Mapper,定义了输入输出格式<key,value, key,value>
public static class WordCountMap extends
Mapper<LongWritable, Text, Text, IntWritable> {

// 3 表示单词出现次数,初始为1次
private final IntWritable one = new IntWritable(1);
private Text word = new Text();

// 4 map切分操作
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);

// 5 分词,发现一个词就写入
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
}
}

// 继承Reducer,定义了输入输出格式<key,value, key,value>
public static class WordCountReduce extends
Reducer<Text, IntWritable, Text, IntWritable> {

// reduce合并
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
// <hello, [1,1,1]> --> <hello, 3>
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

// 配置作业
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
job.setJobName("wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

// 文件输入和输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}
}

输入文件如file1:

Code
1
2
3
4
5
hello world
hello hadoop
hadoop file system
hadoop java api
hello java

部署到hadoop上执行:

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
- 打包成 WordCount.jar
- 将file1 file2提交到fs文件系统里去
$ hadoop fs -mkdir input_WordCount
$ hadoop fs -put input/* input_WordCount/
$ hadoop fs -ls
$ hadoop fs -ls input_WordCount
$ hadoop fs -cat input_WordCount/file1

- 提交任务
$ hadoop jar WordCount.jar input_WordCount output_WordCount
19/08/14 08:51:17 INFO mapred.JobClient: map 0% reduce 0%
19/08/14 08:51:29 INFO mapred.JobClient: map 100% reduce 0%
19/08/14 08:51:36 INFO mapred.JobClient: map 100% reduce 33%
19/08/14 08:51:38 INFO mapred.JobClient: map 100% reduce 100%

- 查看结果
$ hadoop fs -ls output_WordCount
$ hadoop fs -cat output_WordCount/part-r-00000
api 1
file 3
free 2
hadoop 7
hello 3
home 1
java 2
new 2
school 1
system 1
world 2

5-4~5-5 利用MapReduce进行排序

数据排序

思路:将数据按照大小区间分片,放到reduce进行排序

![屏幕快照 2019-08-14 下午5.02.13](20190620122348137/屏幕快照 2019-08-14 下午5.02.13.png)

输入样例:

Code
1
2
3
4
5
6
7
2
32
654
32
15
756
65223

输出样例

Code
1
2
3
4
1 2
2 6
3 15
4 ...

代码:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class Sort {

// 输出是<IntWritable, IntWritable>格式
public static class Map extends
Mapper<Object, Text, IntWritable, IntWritable> {

private static IntWritable data = new IntWritable();

// 读出文本的每一行,转化为整数作为key值放入,value随便放一个1
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();

data.set(Integer.parseInt(line));

context.write(data, new IntWritable(1));

}

}


public static class Reduce extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

private static IntWritable linenum = new IntWritable(1);

@Override
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {

for (IntWritable val : values) {

context.write(linenum, key);

// 行号+1,代表排序序号
linenum = new IntWritable(linenum.get() + 1);
}

}
}

// 相当于自己定义reduce过程的分区策略(划分区间),而不是使用默认的
// 在每个区间排好序,合并之后就得到了最终结果
public static class Partition extends Partitioner<IntWritable, IntWritable> {

// numPartitions是分区数
// 返回值是当前key属于哪个区间
@Override
public int getPartition(IntWritable key, IntWritable value,
int numPartitions) {
int MaxNumber = 65223;
int bound = MaxNumber / numPartitions + 1; // 每个区间有多少个数
int keynumber = key.get();

// 扫描每个区间
for (int i = 0; i < numPartitions; i++) {
// 如果当前key属于该区间,则返回分区号
if (keynumber < bound * i && keynumber >= bound * (i - 1))
return i - 1;
}
return 0;
}
}

/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage WordCount <int> <out>");
System.exit(2);
}

// 配置作业
Job job = Job.getInstance(conf, "Sort");
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setPartitionerClass(Partition.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

// System.out.println("Sort...");
}

}

最后输出结果:

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
1	2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223

疑问:每个区间内部的排序是在哪里完成的?

5-6 课程总结

文章作者: Machine
文章链接: https://machine4869.gitee.io/2019/06/20/20190620122348137/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论