参考:https://www.bilibili.com/video/av65548392
[TOC]
Hbase数据模型 Hbase逻辑结构 逻辑上,Hbase看起来像关系型数据库,存在一个表中。
但Hbase底层是k-v形式的,非关系型的。(列蔟)
Hbase物理存储结构
一个Store是一个列蔟,不同的Store放在不同的文件夹里(类似Hive分区)。
memstore刷写一次,就形成一个文件StoreFile;
HBase数据结构 RowKey 与 nosql 数据库们一样,RowKey 是用来检索记录的主键。访问 HBASE table 中的行,只 有三种方式:
1.通过单个 RowKey 访问
2.通过 RowKey 的 range(正则)
3.全表扫描
在 HBASE 内部,RowKey 保存为字节数组。存储时,数据按照 RowKey 的字典序(byte order)排序存储。设计RowKey 时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性)
Column Family 列族是表的 schema 的一部 分(而列不是),必须在使用表之前定义。
列名都以列族作为前缀。例如 courses:history,courses:math都属于 courses 这个列族。
Cell 由{rowkey, column Family:columu, version} 唯一确定的单元。cell 中的数据是没有类型的,全部是字节码形式存储。
Time Stamp 版本通过时间戳来索引。
赋值:时间戳可以由 HBASE(在数据写入时自动 )赋值,此时时间戳是精确到毫秒 的当前系统时间。时间戳 也可以由客户显式赋值。
唯一性:如果应用程序要避免数据版 本冲突,就必须自己生成具有唯一性的时间戳。每个 cell 中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。
版本回收:为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,HBASE 提供 了两 种数据版本回收方式。一是保存数据的最后 n 个版本,二是保存最近一段 时间内的版本(比 如最近七天)。用户可以针对每个列族进行设置。
Name Space 命名空间的结构:
1) Table:表,所有的表都是命名空间的成员,即表必属于某个命名空间,如果没有指定, 则在 default 默认的命名空间中。
2) RegionServer group:一个命名空间包含了默认的 RegionServer Group。
3) Permission:权限,命名空间能够让我们来定义访问控制列表 ACL(Access Control List)。 例如,创建表,读取表,删除,更新等等操作。
4) Quota:限额,可以强制一个命名空间可包含的 region 的数量。
HBase原理 写流程 Hbase读比写慢。
table表位置信息存在meta表里,所以先找meta表,通过meta表找table表的位置信息。
然后开始写入:
1)Client 向 HregionServer 发送写请求;
HBase 写数据流程
2)HregionServer 将数据写到 HLog(write ahead log)。为了数据的持久化和恢复; 3)HregionServer 将数据写到内存(MemStore); 4)反馈 Client 写成功。
看一下如何从zk找到meta位置信息的
1 2 3 4 5 6 7 zkCli.sh ls /hbase get /hbase/meta-region-server # 看出meta表在hadoop104里。接下来就是请求104去读meta表。 > scan "hbase:meta" # 发现stu表由102维护的 # 然后客户端去连接102,真正开始写数据(wal-mem)
写数据流程:源码流程
…
MemStore Flush 文件刷写,从内存到hdfs
刷写时机,可配置,hbase-default.xml
1)当 MemStore 数据达到阈值(默认是 128M,老版本是 64M),将数据刷到硬盘,将内存 中的数据删除,同时删除 HLog 中的历史数据; 2)并将数据存储到 HDFS 中; 3)在 HLog 中做标记点。
读流程
一个理解误区:先读内存,内存有就直接返回,不读磁盘了。内存没有才读磁盘,然后写入BlockCache,再返回。
–这个理解是错的。
BlockCache:LRU,存储最近经常读的数据。
案例:在A时间点put/1001/info:name/张三,一个小时后,该数据就已经刷写到磁盘了。这时再put/1001/info:name/李四,但时间戳B是我自己传入的,且B<A。
那我查数据的时候,合理情况下应该查到张三(时间最近的一条数据),但基于这个错误读的理解,返回的肯定是B(还在内存里)。
那怎么才能正确返回A呢?
需要把内存和磁盘都读出来做比较。
写代码做实验:略
合理解释:不是先读内存,再读磁盘。而是BlockCache与MemStore一起读的。如果BlockCache有数据,就两个一起做wage合并,比较时间戳,返回时间戳大的那一个。如果BlockCache没数据,就去磁盘读,磁盘读到的数据,放入BlockCache里,再wage,返回时间戳大的。
base读比写慢。因为读也是要等磁盘时间的,而且步骤很多。
StoreFile Compaction 问题引入:如果到了一定时间段,memStore存储还没有满上限,这个时候,也会刷数据到磁盘,每刷一次就是个新文件。所以在写频率小的情况下,会出现很多小文件。因此要做定期合并节约空间。
小合并会把多种时间戳版本都保留,大合并开始删除(覆盖)时间戳小的。
hbase-default.xml。默认7天自动触发大合并,官方建议在生产环境关掉(设置为0),应该在空间时间触发,因为很耗时。
读写扩展 读写数据可以不要HMaster,但是创建表(元数据操作)必须要HMaster参与。但是客户端就算找master也是通过z k去找的,所以实际上客户端直接打交道的是zk不是Master,因为Master高可用可能会变地址。
所以写客户端代码时,完全不需要HMaster的地址,只需要一个zk地址。
但是没有HMaster工作也不能正常执行:假如region到达一定大小被切分了,切分出来也不会调度到其他节点了。因为切分就是新生成一个Region,需要Master去改元数据,还要master把元数据调度到其他节点。
一个机子,放一个region?
数据真正删除的时间 大合并的时候会删数据;
flush会删数据;
flush测试:
put 2个版本的数据库后,scan默认只能看到一条,要加VERSIONS查所有数据。
flush 后,再查所有数据,就只剩下一条了。
这时,flush只是把最大的版本刷到磁盘了,其他的版本都删除了。
有的时候,flush也删不了:就是flush刷下去一条数据,然后flush又刷下去另一条数据,这时,两条数据不在同一个文件里,所以flush管不了。
总结:flush只管在MemCache里面的合并工作,磁盘的事它管不了。磁盘的合并只有交给major Compaction做。
大合并:
delete后,会什么时候删除标记呢?
标记还在。
标记什么时候被删?
说明标记是在major compact时候删的,flush时候不删。为什么?因为flush能管到删除内存的数据,但此时磁盘文件里可能还有历史数据,所以delete要保留标记,再大合并的时候再删。
Region Split 数据合并越来越大,就得做切分。切分也有时机。
一个region最大file大小:10G,如果有列蔟超过这个大小,就切分。
当合并的数据超过时进行拆分,将拆分后的 Region 分配给不同的 HregionServer 管理; 3)当 HregionServer 宕机后,将 HregionServer 上的 hlog 拆分,然后分配给不同的 HregionServer 加载,修改.META.;
4)注意:HLog 会同步到 HDFS。
在生产环境做预分区:建表的时候就规划好分区 (row key: 1-10000,10001-20000)。当数据到达指定大小就按用户自定义去切。
/stu/region1
/stu/region2
HBase API 操作 1 2 3 4 5 6 7 8 9 10 11 12 <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-client</artifactId > <version > 1.3.1</version > </dependency > <dependency > <groupId > org.apache.hbase</groupId > <artifactId > hbase-server</artifactId > <version > 1.3.1</version > </dependency >
DDL/DML 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 package mxx.hbaseDemo;import lombok.extern.slf4j.Slf4j;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.*;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;@Slf 4jpublic class TestAPI { private static Admin admin = null ; private static Connection connection = null ; static { try { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum" ,"mxxcentos7" ); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); } catch (IOException e) { e.printStackTrace(); } } public static void close () { if (admin!=null ){ try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection!=null ){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } public static boolean isTableExist (String tableName) throws IOException { boolean exists = admin.tableExists(TableName.valueOf(tableName)); return exists; } public static void createTable (String tableName, String... columnFamilys) throws IOException { if (isTableExist(tableName)){ log.error(tableName+"表已经存在!" ); return ; } if (columnFamilys.length <= 0 ){ log.error("请设置列蔟信息!" ); return ; } HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); for (String columnFamily : columnFamilys) { HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily); hTableDescriptor.addFamily(hColumnDescriptor); } admin.createTable(hTableDescriptor); } public static void dropTable (String tableName) throws IOException { if (!isTableExist(tableName)){ log.error("{}表不存在!" , tableName); return ; } admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); } public static void createNameSpace (String nsName) { NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nsName).build(); try { admin.createNamespace(namespaceDescriptor); } catch (NamespaceExistException e){ log.warn("{}命名空间已存在" ,nsName); }catch (IOException e) { e.printStackTrace(); } } public static void putData (String tableName, String rowKey, String cf, String cn, String value) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn), Bytes.toBytes(value)); table.put(put); table.close(); } public static void getData (String tableName, String rowKey, String cf, String cn) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn)); get.setMaxVersions(5 ); Result result = table.get(get); Cell[] cells = result.rawCells(); for (Cell cell : cells) { String CF = Bytes.toString(CellUtil.cloneFamily(cell)); String CN = Bytes.toString(CellUtil.cloneQualifier(cell)); String Value = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println("CF: " + CF + "," + "CN: " + CN + "," + "Value: " + Value); } table.close(); } public static void scanTable (String tableName) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); ResultScanner resultScanner = table.getScanner(scan); for (Result result : resultScanner) { Cell[] cells = result.rawCells(); for (Cell cell : cells) { String CF = Bytes.toString(CellUtil.cloneFamily(cell)); String CN = Bytes.toString(CellUtil.cloneQualifier(cell)); String Value = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println("CF: " + CF + "," + "CN: " + CN + "," + "Value: " + Value); } } table.close(); } public static void deleteData (String tableName,String rowKey, String cf, String cn) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn)); table.delete(delete); table.close(); } public static void main (String[] args) throws IOException { deleteData("stu2" , "1001" , "info1" , "name" ); close(); } }
与MapReduce交互 可以实现伴随 HBase 操作的 MapReduce 过程,比如使用MapReduce 将数据从本地文件系统导入到 HBase 的表中,比如从 HBase 中读取一些原始数据后使用 MapReduce 做数据分析。
环境配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ~/.bash_profile export HBASE_HOME export HADOOP_HOME hadoop-env.sh 中配置:(注意:在 for 循环之后配) export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/machine/apps/hbase-1.2.0-cdh5.16.1/lib/* # 重启 stop-hbase.sh cd $HADOOP_HOME/sbin ./stop-all.sh ./start-dfs.sh ./start-yarn.sh hadoop dfsadmin -safemode leave start-hbase.sh
官方案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 cd $HBASE_HOME # 1、统计stu表的行数 yarn jar lib/hbase-server-1.2.0-cdh5.16.1.jar rowcounter stu # 打印结果 org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper$Counters ROWS=2 # 2、使用 MapReduce 将本地数据导入到 HBase # 创建fruit.tsv 上传到hdfs # 建表 create "fruit","info" # 导入 yarn jar lib/hbase-server-1.3.1.jar importtsv \ -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \ hdfs://mxxcentos7:9000/user/mxx/fruit/fruit.csv # 查看 scan "fruit"
自定义-MR1 读取hdfs数据到hive表
demo:
FruitMapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package mxx.hbaseDemo.mr1;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FruitMapper extends Mapper <LongWritable , Text ,LongWritable , Text > { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } }
FruitReducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package mxx.hbaseDemo.mr1;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import java.io.IOException;public class FruitReducer extends TableReducer <LongWritable , Text , NullWritable > { @Override protected void reduce (LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { String[] fields = value.toString().split("\t" ); Put put = new Put(Bytes.toBytes(fields[0 ])); put.addColumn(Bytes.toBytes("info" ), Bytes.toBytes("name" ), Bytes.toBytes(fields[1 ])); put.addColumn(Bytes.toBytes("info" ), Bytes.toBytes("color" ), Bytes.toBytes(fields[2 ])); context.write(NullWritable.get(), put); } } }
FruitDriver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package mxx.hbaseDemo.mr1;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class FruitDriver implements Tool { private Configuration configuration = null ; @Override public int run (String[] args) throws Exception { Job job = Job.getInstance(configuration); job.setJarByClass(FruitDriver.class ) ; job.setMapperClass(FruitMapper.class ) ; job.setMapOutputKeyClass(LongWritable.class ) ; job.setMapOutputValueClass(Text.class ) ; TableMapReduceUtil.initTableReducerJob(args[1 ], FruitReducer.class , job ) ; FileInputFormat.setInputPaths(job, new Path(args[0 ])); boolean result = job.waitForCompletion(true ); return result ? 0 : 1 ; } @Override public void setConf (Configuration conf) { configuration = conf; } @Override public Configuration getConf () { return configuration; } public static void main (String[] args) { try { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new FruitDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }
打包扔服务器
1 2 3 yarn jar /media/psf/centos_share/note-hadoop-1.0-SNAPSHOT.jar mxx.hbaseDemo.mr1.FruitDriver /user/mxx/fruit/fruit.tsv fruit1 scan "fruit1"
自定义-MR2 Hbase互相写数据。需求:将fruit1的name全部读到fruit2
Fruit2Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package mxx.hbaseDemo.mr2;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;public class Fruit2Mapper extends TableMapper <ImmutableBytesWritable , Put > { @Override protected void map (ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { Put put = new Put(key.get()); for (Cell cell : value.rawCells()) { if ("name" .equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ put.add(cell); } } context.write(key, put); } }
Fruit2Reducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package mxx.hbaseDemo.mr2;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.io.NullWritable;import java.io.IOException;public class Fruit2Reducer extends TableReducer <ImmutableBytesWritable , Put , NullWritable > { @Override protected void reduce (ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { for (Put put : values) { context.write(NullWritable.get(), put); } } }
Fruit2Driver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package mxx.hbaseDemo.mr2;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Fruit2Driver implements Tool { private Configuration configuration = null ; @Override public int run (String[] args) throws Exception { Job job = Job.getInstance(configuration); job.setJarByClass(Fruit2Driver.class ) ; TableMapReduceUtil.initTableMapperJob("fruit1" , new Scan(), Fruit2Mapper.class , ImmutableBytesWritable .class , Put .class , job ) ; TableMapReduceUtil.initTableReducerJob("fruit2" , Fruit2Reducer.class , job ) ; boolean result = job.waitForCompletion(true ); return result ? 0 : 1 ; } @Override public void setConf (Configuration conf) { configuration = conf; } @Override public Configuration getConf () { return configuration; } public static void main (String[] args) { try { Configuration configuration = HBaseConfiguration.create(); configuration.addResource("hbase-site.xml" ); int run = ToolRunner.run(configuration, new Fruit2Driver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }
与Hive集成 HBase 与 Hive 的对比 Hive
(1) 数据仓库 Hive 的本质其实就相当于将 HDFS 中已经存储的文件在 Mysql 中做了一个双射关系,以方便使用 HQL 去管理查询。
(2) 用于数据分析、清洗 Hive 适用于离线的数据分析和清洗,延迟较高。
(3) 基于 HDFS、MapReduce Hive 存储的数据依旧在 DataNode 上,编写的 HQL 语句终将是转换为 MapReduce 代码执行。
HBase
(1) 数据库 是一种面向列存储的非关系型数据库。
(2) 用于存储结构化和非结构化的数据 适用于单表非关系型数据的存储,不适合做关联查询,类似 JOIN 等操作。
(3) 基于 HDFS 数据持久化存储的体现形式是 Hfile,存放于 DataNode 中,被 ResionServer 以 region 的形 式进行管理。
(4) 延迟较低,接入在线业务使用 面对大量的企业数据,HBase 可以直线单表大量数据的存储,同时提供了高效的数据访问 速度。
环境准备 Hive操作Hbase,类似于MapReduce操作Hbase。Hive需要持有Hbase的相关jar包。
1 2 3 4 5 6 7 8 9 10 11 12 export HBASE_HOME=/opt/module/hbase export HIVE_HOME=/opt/module/hive # 软连接 ln -s $HBASE_HOME/lib/hbase-common-1.2.0-cdh5.16.1.jar $HIVE_HOME/lib/hbase-common-1.2.0-cdh5.16.1.jar ln -s $HBASE_HOME/lib/hbase-server-1.2.0-cdh5.16.1.jar $HIVE_HOME/lib/hbase-server-1.2.0-cdh5.16.1.jar ln -s $HBASE_HOME/lib/hbase-client-1.2.0-cdh5.16.1.jar $HIVE_HOME/lib/hbase-client-1.2.0-cdh5.16.1.jar ln -s $HBASE_HOME/lib/hbase-protocol-1.2.0-cdh5.16.1.jar $HIVE_HOME/lib/hbase-protocol-1.2.0-cdh5.16.1.jar ln -s $HBASE_HOME/lib/hbase-it-1.2.0-cdh5.16.1.jar $HIVE_HOME/lib/hbase-it-1.2.0-cdh5.16.1.jar ln -s $HBASE_HOME/lib/htrace-core-3.2.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.2.0-incubating.jar ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.2.0-cdh5.16.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.2.0-cdh5.16.1.jar ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.2.0-cdh5.16.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.2.0-cdh5.16.1.jar
在 hive-site.xml 中修改 zookeeper 的属性,如下:
1 2 3 4 5 6 7 8 <property > <name > hive.zookeeper.quorum</name > <value > mxxcentos7</value > </property > <property > <name > hive.zookeeper.client.port</name > <value > 2181</value > </property >
案例一 建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表。
在 Hive 中创建表同时关联 HBase
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE TABLE hive_hbase_emp_table( empno int , ename string , job string , mgr int , hiredate string , sal double , comm double , deptno int ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:co mm,info:deptno" )TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table" );
完成之后,可以分别进入 Hive 和 HBase 查看,都生成了对应的表
接下来是导数据(数据是放在Hbase里的,不能直接用load语句,load是放在hdfs txt文件里的)
1、在 Hive 中创建临时中间表,用于 load 文件中的数据
2、Hive 中间表中 load 数据
3、通过 insert 命令将中间表中的数据导入到 Hive 关联 HBase 的那张表中
4、查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # 创建临时中间表 CREATE TABLE emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) row format delimited fields terminated by '\t'; # load数据到中间表 hive> load data local inpath '/media/psf/centos_share/emp.txt' into table emp; # insert将中间表数据导入到 Hive 关联 HBase 的那张表中 hive> insert into table hive_hbase_emp_table select * from emp; # 查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据 hive> select * from hive_hbase_emp_table; hbase> scan ‘hbase_emp_table’ 如果mr任务卡了,尝试: mapreduce测试: 1、yarn jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.16.1.jar wordcount /test/test.txt /test/out6 2、给虚拟机分配更大的内存
案例二 在 HBase 中已经存储了某一张表 hbase_emp_table,然后在 Hive 中创建一个外部表来关联 HBase 中的 hbase_emp_table 这张表,使之可以借助 Hive 来分析 HBase 这张表中的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 > hive # 创建外部表关联已经存在的hbase表 CREATE EXTERNAL TABLE relevance_hbase_emp( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:co mm,info:deptno") TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table"); # 关联后就可以使用 Hive 函数进行一些分析操作了 hive > select * from relevance_hbase_emp; # 从hbase put 数据,hive能同步到吗? 能 hive > put 'hbase_emp_table','7935','info:ename','zhangsan' > hbase 7935 zhangsan NULL NULL NULL NULL NULL NULL
HBase优化 高可用 在 HBase 中 Hmaster 负责监控 RegionServer 的生命周期,均衡 RegionServer 的负载,如果 Hmaster 挂掉了,那么整个 HBase 集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以 HBase 支持对 Hmaster 的高可用配置。
高可用配置节点配置(备用Master)
1 2 3 4 5 6 7 8 9 10 11 # 启动2个backup Master, 主 master挂了之后,就会自动上backup Master # 在 conf 目录下创建 backup-masters 文件 $ touch conf/backup-masters # 在 backup-masters 文件中配置高可用 HMaster 节点 $ echo hadoop103 > conf/backup-masters # 将整个 conf 目录 scp 到其他节点 $ scp -r conf/ hadoop103:/opt/module/hbase/ ... # 打开页面测试查看 http://hadooo102:16010
预分区 每一个 region 维护着 startRow 与 endRowKey,如果加入的数据符合某个 region 维护的rowKey 范围,则该数据交给这个 region 维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高 HBase 性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 # 1、手动设定预分区 2.生成 16 进制序列预分区 hbase> create 'staff1' ,'info' ,'partition1' ,SPLITS => ['1000' ,'2000' ,'3000' ,'4000' ] # 注意:400会落在第4个分区,而4000会落在第5分区 # 2、生成 16 进制序列预分区 hbase > create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'} # 3.按照文件中设置的规则预分区 创建 splits.txt 文件内容如下: splits.txt: aaaa bbbb cccc dddd # 内部按字母排序? hbase > create 'staff3','partition3',SPLITS_FILE => 'splits.txt' # 4.使用 JavaAPI 创建预分区 //自定义算法,产生一系列 Hash 散列值存储在二维数组中 byte[][] splitKeys = 某个散列值函数 //创建 HBaseAdmin 实例 HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create()); //创建 HTableDescriptor 实例 HTableDescriptor tableDesc = new HTableDescriptor(tableName); //通过 HTableDescriptor 实例和散列值二维数组创建带有预分区的 HBase 表 hAdmin.createTable(tableDesc, splitKeys);
分区规划和什么有关?
1 2 未来数据量、机器规模 每台机器放2-3个region
RowKey设计 一条数据的唯一标识就是 rowkey,那么这条数据存储于哪个分区,取决于 rowkey 处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布 于所有的region中,在一定程度上防止数据倾斜。接下来我们就谈一谈 rowkey 常用的设计方案。
1.生成随机数、hash、散列值
1 2 3 4 5 比如: 原 本 rowKey 为 1001 的 , SHA1 dd01903921ea24941c26a48f2cec24e0bb0e8cc7 原 本 rowKey 为 3001 的 , SHA1 49042c54de64a1e9bf0b33e00245660ef92dc7bd 原 本 rowKey 为 5001 的 , SHA1 7b61dec07e02c188790670af43e717f0f46e8913 在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的 rowKey 来 Hash 后作为每个分区的临界值。
2.字符串反转
1 2 3 20170524000001 转成 10000042507102 20170524000002 转成 20000042507102 这样也可以在一定程度上散列逐步 put 进来的数据
3.字符串拼接
1 2 20170524000001_a12e 20170524000001_93i7
rowkey情景设计
考虑散列性和集中性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 分300个区 000 001 002 ... 298 rowkey设计: 000_ 001_ 002_ ... 298_ 比如要拿一个人的通话详情,最好把手机号相同的放在一个分区,便于查找。 如何将如手机号转换为rowkey? 同一个手机号要得到同一个前缀 策略: 手机号%299 = 分区号 分区号_xxx,后面跟手机号,便于连续读 001_13199999999_xxx 如何把一个人的通话记录按时间放在不同分区里? 按年_月分区 [(13199999999+201909).hash] % 299 = 分区号 001_13199999999_20191212 12:12:12 这样,同一个人,同一个月肯定在同一个分区,而且数据集中在一块 如何读取某个人2019-12的通话记录? start = 001_13199999999_2019-12 end = 001_13199999999_2019-13
内存&其他 HBase 操作过程中需要大量的内存开销,毕竟 Table 是可以缓存在内存中的,一般会分配整个可用内存的 70%给 HBase 的 Java 堆。但是不建议分配非常大的堆内存,因为 GC 过程持续太久会导致 RegionServer 处于长期不可用状态,一般 16~48G 内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。
基础优化
略