avatar

目录
Hbase-案例

参考:https://www.bilibili.com/video/av65548392

[TOC]

需求分析

表结构分析

Code
1
2
3
4
5
6
7
8
微博内容表
人+当前时间戳

关注表
粉丝,普通人按列存就行了。如果是流量号,几十万,可以采用其他存储方式。

初始化页面表
存储A的最近关注B,C,D,E,和对应关注人的最近几条微博

表结构

屏幕快照 2019-10-08 下午3.19.44

业务:增/删微博、关注/取关用户、查看微博详情…

开发

项目架构

HBaseUtil

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package mxx.demoWeibo.utils;

import lombok.extern.slf4j.Slf4j;
import mxx.demoWeibo.constants.Constans;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
@Slf4j
public class HBaseUtil {

// 工具类不要单例对象

/**
* 创建命名空间
*/
public static void createNameSpace(String nameSpace) throws IOException {

// 获取connet对象--常量类
Connection connection = ConnectionFactory.createConnection(Constans.CONFIGURATION);
Admin admin = connection.getAdmin();

NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build();
admin.createNamespace(namespaceDescriptor);

admin.close();
connection.close();
}

/**
* 判断表是否存在
*/
private static boolean isTableExist(String tableName) throws IOException {
Connection connection = ConnectionFactory.createConnection(Constans.CONFIGURATION);
Admin admin = connection.getAdmin();

boolean exists = admin.tableExists(TableName.valueOf(tableName));

admin.close();
connection.close();

return exists;
}

/**
* 创建表
*/
public static void creteTable(String tableName,int versions, String... cfs) throws IOException {
if(cfs.length<=0){
log.error("请设置列蔟信息!!!");
return;
}
if(isTableExist(tableName)){
log.error("{}表已存在",tableName);
return;
}

Connection connection = ConnectionFactory.createConnection(Constans.CONFIGURATION);
Admin admin = connection.getAdmin();

HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));

for (String cf : cfs) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
hColumnDescriptor.setMaxVersions(versions);
hTableDescriptor.addFamily(hColumnDescriptor);
}

admin.createTable(hTableDescriptor);

admin.close();
connection.close();
}
}

测试…

定义常量

表、命名空间、列蔟

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package mxx.demoWeibo.constants;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

public class Constans {

// HBase配置信息
public static Configuration CONFIGURATION = HBaseConfiguration.create();
// 命名空间
public static String NAMESPACE = "weibo";

// 内容表
public static String CONTENT_TABLE = "weibo:content";
public static String CONTENT_TABLE_CF = "info";
public static int CONTENT_TABLE_VERSIONS = 1;

// 关系表
public static String RELATION_TABLE = "weibo:relation";
public static String RELATION_TABLE_CF1 = "attends";
public static String RELATION_TABLE_CF2 = "fans";
public static int RELATION_TABLE_VERSIONS = 1;

// 收件箱表
public static String INBOX_TABLE = "weibo:inbox";
public static String INBOX_TABLE_CF = "info";
public static int INBOX_TABLE_VERSIONS = 2;

}

发布微博

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 发布微博
*/
public static void publishWeiBo(String uid, String content) throws IOException {

// ----------修改内容表--------------

// 生产环境可以用连接池
Connection connection = ConnectionFactory.createConnection(Constans.CONFIGURATION);
// 操作内容表
Table contentTable = connection.getTable(TableName.valueOf(Constans.CONTENT_TABLE));

long ts = System.currentTimeMillis();
String rowKey = uid +"_"+ ts;

Put contentPut = new Put(Bytes.toBytes(rowKey));
contentPut.addColumn(Bytes.toBytes(Constans.CONTENT_TABLE_CF),Bytes.toBytes("content"),Bytes.toBytes(content));
contentTable.put(contentPut);

// -----------操作收件箱表-------------

/*
修改初始页面表(遍历每一个关注了A的用户,修改其页面表)
先从关系表得到A的所有粉丝id,再遍历收件箱表里的id, 修改info里的内容,将新发布加入列表(使用list批量提交)
有可能没粉丝,就不用改表,这里要判断
*/

// 获取当前发布人的fans列蔟数据
Table relationTable = connection.getTable(TableName.valueOf(Constans.RELATION_TABLE));
Get relationGet = new Get(Bytes.toBytes(uid));
relationGet.addFamily(Bytes.toBytes(Constans.RELATION_TABLE_CF2));
Result relationResult = relationTable.get(relationGet);

// 创建集合,存放put对象。
List<Put> inboxPuts = new ArrayList<>();
for (Cell relationCell : relationResult.rawCells()) {
// 构建收件箱的put对象
Put inboxPut = new Put(CellUtil.cloneQualifier(relationCell));
// 更新最新发布
inboxPut.addColumn(Bytes.toBytes(Constans.INBOX_TABLE_CF),
Bytes.toBytes(uid),
Bytes.toBytes(rowKey));
inboxPuts.add(inboxPut);
}

// 判断是否有粉丝
if(inboxPuts.size()>0){
// 获取收件箱对象
Table inboxTable = connection.getTable(TableName.valueOf(Constans.INBOX_TABLE));
inboxTable.put(inboxPuts);

inboxTable.close();
}

// close
relationTable.close();
contentTable.close();
connection.close();

}

关注用户

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
public static void addAttends(String uid, String... attends) throws IOException {
/*
批量/单个 关注
1、关系表。a-attends-bcd, 轮询b,c,d,fans add a
2、首页表。a-add-b, add b的最近微博rowkey
3、内容表。b的最近微博从content表拿到的
*/

// 校验
if(attends.length<=0){
log.error("请选择待关注的人!!!");
return;
}

// ------------关系表----------------
Connection connection = ConnectionFactory.createConnection(Constans.CONFIGURATION);

// 创建connect table
Table relationTable = connection.getTable(TableName.valueOf(Constans.RELATION_TABLE));

// 创建list<put>
List<Put> relationPuts = new ArrayList<>();

// 创建操作者的 put
Put uidPut = new Put(Bytes.toBytes(uid));

// 循环创建被关注者的 put
for (String attend : attends) {
// 给操作者的 put赋值
uidPut.addColumn(Bytes.toBytes(Constans.RELATION_TABLE_CF1),
Bytes.toBytes(attend),
Bytes.toBytes(attend));

// 创建被关注者的put
Put attendPut = new Put(Bytes.toBytes(attend));

// 给被关注者的put赋值
attendPut.addColumn(Bytes.toBytes(Constans.RELATION_TABLE_CF2),
Bytes.toBytes(uid),
Bytes.toBytes(uid));

// 将被关注者put放入集合
relationPuts.add(attendPut);
}

// 将操作者put对象添加到集合
relationPuts.add(uidPut);

// 执行用户关系表插入数据
relationTable.put(relationPuts);



// ------------首页表----------------

/*
bcd都没有发布过wb
*/

// 获取wb内容表对象
Table contentTable = connection.getTable(TableName.valueOf(Constans.CONTENT_TABLE));


// 创建inbox的put对象
Put inboxPut = new Put(Bytes.toBytes(uid));

// 循环attends,获取每个被关注者近期发布的微博
for (String attend : attends) {
// 获取当前被关注者近期发布的微博

// 先获取当前被关注者所有发布的微博(是否可优化)
// scan start: uid_ stop: uid|
Scan attendScan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|"));
ResultScanner contentResultScanner = contentTable.getScanner(attendScan);

// 对获取的集合遍历
/*
取某个被专注者最新3条wb的逻辑:
contentResultScanner集合 = 某个被关注者所有的wb
现在只需取最近3条
contentResultScanner是按照rowKey排序的,这里的rowKey设计是 uid_time
所以这里把所有的contentResult都放入inboxPut,
而inboxTable设计的version=3, 所以最后取出来刚好是3个最新微博id

TODO 优化: 某人发布wb过多,就有很多无效插入,如何优化?

uid_time, 让time修改成,发布越晚,时间戳越小。999(13个9)- 时间戳
这样,新发布的就排在扫描Scanner集合的前面了,
这样就只需要for前3条数据

BUG(已解决): inboxPut时间戳是系统服务端传的(inboxPut.addColumn循环完后是在server上统一提交的),
造成的效果就是时间戳都一样,这样取出来数据只有1条,而不是3条
修改:for里面手动传时间戳

*/

// 定义时间戳
long ts = System.currentTimeMillis();


for (Result contentResult : contentResultScanner) {
// 拿contentResult对应的rowKey

// 给inbox的put赋值
inboxPut.addColumn(Bytes.toBytes(Constans.INBOX_TABLE_CF),
Bytes.toBytes(attend),
ts++,
contentResult.getRow());
}
}
//判断当前的put对象是否null
if (!inboxPut.isEmpty()){
// 获取inbox对象,插入数据
Table inboxTable = connection.getTable(TableName.valueOf(Constans.INBOX_TABLE));
inboxTable.put(inboxPut);

// 关闭
inboxTable.close();
}

// close resource
relationTable.close();
contentTable.close();
connection.close();

}

取关用户

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public static void deleteAttends(String uid, String... dels) throws IOException {

// ------------connect表----------------

Connection connection = ConnectionFactory.createConnection(Constans.CONFIGURATION);

// 创建connect table
Table relationTable = connection.getTable(TableName.valueOf(Constans.RELATION_TABLE));

// 创建集合,存放delete对象
List<Delete> relationDelete = new ArrayList<>();

// 创建操作者的delete对象
Delete uidDelete = new Delete(Bytes.toBytes(uid));

// 循环创建被取关者的delete对象
for (String del : dels) {

// 给操作者的delete赋值
/*
addColumn有什么问题:A连续关注2次,就只能删除一个版本。
addColumns能删除所有版本。
*/
uidDelete.addColumns(Bytes.toBytes(Constans.RELATION_TABLE_CF1),
Bytes.toBytes(del));

// 创建被取关者的delete
Delete delDelete = new Delete(Bytes.toBytes(del));

// 给被取关这者delete赋值
delDelete.addColumns(Bytes.toBytes(Constans.RELATION_TABLE_CF2),
Bytes.toBytes(uid));

// 将被取关这者delete添加到集合
relationDelete.add(delDelete);
}

// 将操作者的delete对象加入集合
relationDelete.add(uidDelete);

// 执行connect表的删除
relationTable.delete(relationDelete);

// ------------inbox表----------------

// 获取inbox对象
Table inboxTable = connection.getTable(TableName.valueOf(Constans.INBOX_TABLE));

// 创建操作者delete
Delete inboxDelete = new Delete(Bytes.toBytes(uid));

// 给操作者delete循环赋值
for (String del : dels) {
inboxDelete.addColumns(Bytes.toBytes(Constans.INBOX_TABLE_CF),
Bytes.toBytes(del));
}

// 执行inbox删除
inboxTable.delete(inboxDelete);

// 关闭资源
relationTable.close();
inboxTable.close();
connection.close();

}

获取初始化页面数据

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void getInit(String uid) throws IOException {

// 获取connet对象
Connection connection = ConnectionFactory.createConnection(Constans.CONFIGURATION);

// 获取inbox对象
Table inboxTable = connection.getTable(TableName.valueOf(Constans.INBOX_TABLE));

// 获取content对象
Table contentTable = connection.getTable(TableName.valueOf(Constans.CONTENT_TABLE));

// 创建inbox get, 获取数据(设置最大版本)
Get inboxGet = new Get(Bytes.toBytes(uid));
inboxGet.setMaxVersions();
Result inboxResult = inboxTable.get(inboxGet);

// 遍历获取的数据
for (Cell inboxCell : inboxResult.rawCells()) {
// 构建content get
Get contentGet = new Get(CellUtil.cloneValue(inboxCell));

// 获取get 内容
Result contentResult = contentTable.get(contentGet);

// 解析内容,打印
for (Cell contentCell : contentResult.rawCells()) {
System.out.println("RK:"+Bytes.toString(CellUtil.cloneRow(contentCell))+
",CF:"+Bytes.toString(CellUtil.cloneFamily(contentCell))+
",CN:"+Bytes.toString(CellUtil.cloneQualifier(contentCell))+
",Value:"+Bytes.toString(CellUtil.cloneValue(contentCell)));
}
}

// 关闭资源
inboxTable.close();
contentTable.close();
connection.close();
}

获取某用户所有微博详情

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static void getWeiBo(String uid) throws IOException {
// 获取connet对象
Connection connection = ConnectionFactory.createConnection(Constans.CONFIGURATION);

// 获取content对象
Table contentTable = connection.getTable(TableName.valueOf(Constans.CONTENT_TABLE));

// 构建scan对象
Scan contentScan = new Scan();

// 构建过滤器
/*
也可以用FilterList(交、并)
*/
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid + "_"));
contentScan.setFilter(rowFilter);

// 获取数据
ResultScanner contentResultScanner = contentTable.getScanner(contentScan);

// 解析打印
for (Result contentResult : contentResultScanner) {

for (Cell contentCell : contentResult.rawCells()) {
System.out.println("RK:"+Bytes.toString(CellUtil.cloneRow(contentCell))+
",CF:"+Bytes.toString(CellUtil.cloneFamily(contentCell))+
",CN:"+Bytes.toString(CellUtil.cloneQualifier(contentCell))+
",Value:"+Bytes.toString(CellUtil.cloneValue(contentCell)));
}
}

// 关闭资源
contentTable.close();
connection.close();
}

测试

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import mxx.demoWeibo.constants.Constans;
import mxx.demoWeibo.dao.HBaseDao;
import mxx.demoWeibo.utils.HBaseUtil;

import java.io.IOException;

public class TestWeibo {

public static void init(){

try {
// 创建命名空间
HBaseUtil.createNameSpace(Constans.NAMESPACE);

// 创建content表
HBaseUtil.creteTable(Constans.CONTENT_TABLE,
Constans.CONTENT_TABLE_VERSIONS,
Constans.CONTENT_TABLE_CF);

// 创建relation表
HBaseUtil.creteTable(Constans.RELATION_TABLE,
Constans.RELATION_TABLE_VERSIONS,
Constans.RELATION_TABLE_CF1,
Constans.RELATION_TABLE_CF2);

// 创建inbox表
HBaseUtil.creteTable(Constans.INBOX_TABLE,
Constans.INBOX_TABLE_VERSIONS,
Constans.INBOX_TABLE_CF);


} catch (IOException e) {
e.printStackTrace();
}


}
public static void main(String[] args) throws IOException, InterruptedException {

// 初始化
init();

// 1001发微博
HBaseDao.publishWeiBo("1001","你好,0");

// 1002关注1001和1003
HBaseDao.addAttends("1002", "1001", "1003");

// 获取1002初始化页面
HBaseDao.getInit("1002");

System.out.println("*******111*******");

// 1003发布3条微博,1001发布2条
HBaseDao.publishWeiBo("1003","你好,1");
Thread.sleep(10);
HBaseDao.publishWeiBo("1001","你好,2");
Thread.sleep(10);
HBaseDao.publishWeiBo("1003","你好,3");
Thread.sleep(10);
HBaseDao.publishWeiBo("1001","你好,4");
Thread.sleep(10);
HBaseDao.publishWeiBo("1003","你好,5");

// 获取1002初始化页面
HBaseDao.getInit("1002");
System.out.println("*******222*******");

// 1002取关1003
HBaseDao.deleteAttends("1002","1003");

// 获取1002初始化页面
HBaseDao.getInit("1002");
System.out.println("*******333*******");

// 1002再次关注1003
HBaseDao.addAttends("1002", "1003");

// 获取1002初始化页面
HBaseDao.getInit("1002");
System.out.println("*******444*******");

// 获取1001微博详情
HBaseDao.getWeiBo("1001");

}
}

查看relation表..

打印结果:

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
RK:1001_1570766541084,CF:info,CN:content,Value:你好,0
*******111*******
RK:1001_1570766541864,CF:info,CN:content,Value:你好,4
RK:1001_1570766541705,CF:info,CN:content,Value:你好,2
RK:1003_1570766541932,CF:info,CN:content,Value:你好,5
RK:1003_1570766541774,CF:info,CN:content,Value:你好,3
*******222*******
RK:1001_1570766541864,CF:info,CN:content,Value:你好,4
RK:1001_1570766541705,CF:info,CN:content,Value:你好,2
*******333*******
RK:1001_1570766541864,CF:info,CN:content,Value:你好,4
RK:1001_1570766541705,CF:info,CN:content,Value:你好,2
RK:1003_1570766541932,CF:info,CN:content,Value:你好,5
RK:1003_1570766541774,CF:info,CN:content,Value:你好,3
*******444*******
RK:1001_1570766541084,CF:info,CN:content,Value:你好,0
RK:1001_1570766541705,CF:info,CN:content,Value:你好,2
RK:1001_1570766541864,CF:info,CN:content,Value:你好,4
文章作者: Machine
文章链接: https://machine4869.gitee.io/2019/10/08/20191008134811972/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论