文章目录
- 前言
- mysql
- 表结构设计
- db_telecom.tb_contacts
- db_telecom.tb_call
- db_telecom.tb_dimension_date
- 建表语句
- 导入基础数据
- 姓名手机号映射
- 时间维度表
- Meaven依赖
- HBase
- Maven依赖
- mapper
- reducer
- 自定义输出
- OutputFormat
- MyRecordWriter
- 私有属性
- 初始化操作
- 设置job
- 运行
- 博客链接
前言
有了之前存入HBase的数据,现在就可以运用HBase的MR进行数据分析了。
mysql
表结构设计
db_telecom.tb_contacts
用于存放用户手机号码与联系人姓名。
列 | 备注 | 类型 |
---|---|---|
id | 自增主键 | int(11) NOT NULL |
telephone | 手机号码 | varchar(255) NOT NULL |
name | 联系人姓名 | varchar(255) NOT NULL |
db_telecom.tb_call
列 | 备注 | 类型 |
---|---|---|
id | date | contact 复合主键(联系人维度 id,时间维度 id) varchar(255) NOT NULL |
id | date | dimension 时间维度 id int(11) NOT NULL |
id | contact | 查询人的电话号码 int(11) NOT NULL |
call | sum | 通话次数总和 int(11) NOT NULL DEFAULT 0 |
call | duration | sum 通话时长总和 int(11) |
db_telecom.tb_dimension_date
列 | 备注 | 类型 |
---|---|---|
id | 自增主键 | int(11) |
year | 年,当前通话信息所在年 | int(11) |
month | 月,当前通话信息所在月,如果按照年来统计信息,则month 为-1。 | int(11) NOT NULL |
day | 日,当前通话信息所在日,如果是按照月来统计信息,则day为-1。 | int(11) NOT |
建表语句
我这里简化了约束信息,自己建立的表,需要注意自己的默认字符集,不是utf8请选择下面的完整版,否则可能无法导入中文。
CREATE DATABASE db_telecom;CREATE TABLE tb_dimension_date(
id int PRIMARY KEY AUTO_INCREMENT,
year int,
month int,
day int);create table tb_contacts(
id int PRIMARY KEY AUTO_INCREMENT,
telephone char(11),
name varchar(20));CREATE TABLE tb_call(
id_date_contact VARCHAR(10) PRIMARY KEY,
id_date_dimension int,
id_contact int,
call_sum int,
call_duration_sum int,
FOREIGN KEY(id_date_dimension) REFERENCES tb_dimension_date(id),
FOREIGN KEY(id_contact) REFERENCES tb_contacts(id));
下面是我老师提供的完整的建表语句
SET FOREIGN_KEY_CHECKS=0;-- ----------------------------
-- Table structure for tb_call
-- ----------------------------
DROP TABLE IF EXISTS `tb_call`;
CREATE TABLE `tb_call` (`id_date_contact` varchar(100) NOT NULL,`id_date_dimension` int(11) NOT NULL,`id_contact` int(11) NOT NULL,`call_sum` int(11) NOT NULL,`call_duration_sum` int(11) NOT NULL,PRIMARY KEY (`id_date_contact`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ----------------------------
-- Table structure for tb_contacts
-- ----------------------------
DROP TABLE IF EXISTS `tb_contacts`;
CREATE TABLE `tb_contacts` (`id` int(11) NOT NULL AUTO_INCREMENT,`telephone` varchar(11) NOT NULL,`name` varchar(50) NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ----------------------------
-- Table structure for tb_dimension_date
-- ----------------------------
DROP TABLE IF EXISTS `tb_dimension_date`;
CREATE TABLE `tb_dimension_date` (`id` int(11) NOT NULL AUTO_INCREMENT,`year` int(11) NOT NULL,`month` int(11) NOT NULL,`day` int(11) NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
导入基础数据
姓名手机号映射
我先修改了一下之前生成的文件,推荐用notepad++、或者sublime都有列编辑的工具,直接在每行开始添加",再进行局部替换。
可以直接在Navicat中导入csv文件
一直下一步到完成就行了,其中需要注意字段是否对应上
也可以用mysql命令
mysql --local-infile -uroot -proot db_telecom -e "LOAD DATA LOCAL INFILE 'nam_num.csv' INTO TABLE tb_contacts character set utf8 FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' LINES TERMINATED BY '\n'"
时间维度表
和上一步操作一样,这里不用担心字符集还简单一些
Meaven依赖
如果没有添加mysql驱动支持,需要添加上,之后才能用JDBC
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.20</version></dependency>
HBase
Maven依赖
检查是否有依赖,没有的话添加进去。
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.4.5</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.4.5</version></dependency>
mapper
这时候也体现出了rowKey设计的好处,直接切分rowKey就能获得需要的数据
public static class HBase2MysqlMapper extends TableMapper<Text, IntWritable>{Text outKey = new Text();IntWritable outValue = new IntWritable();@Override//来一个,算三个维度protected void map(ImmutableBytesWritable key, Result value,Context context)throws IOException, InterruptedException {String rowKey = Bytes.toString(key.get());String[] strs = rowKey.split("_");//用于计算通话时长,只算一次即可if(strs[5].equals("01"))outValue.set(Integer.valueOf(strs[4]));elseoutValue.set(0);//日维度outKey.set(strs[1]+"_"+strs[2]);context.write(outKey, outValue);//月维度outKey.set(strs[1]+"_"+strs[2].substring(0, 7));context.write(outKey, outValue);//年维度outKey.set(strs[1]+"_"+strs[2].substring(0, 4));context.write(outKey, outValue);}}
reducer
和wordcount差不多,都是求和,key设计好就行了
public static class HBase2MysqlReducer extends Reducer<Text, IntWritable, Text, IntWritable>{Text outKey = new Text();IntWritable outValue = new IntWritable();int count,sum;@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {count = 0;sum = 0;for(IntWritable value:values){ //累计通话次数count += 1;//累计通话时长sum += value.get();}outKey.set(key.toString()+"_"+String.valueOf(count));outValue.set(sum);context.write(outKey, outValue);}}
自定义输出
这个需要自定义才能输出到Mysql比较麻烦
OutputFormat
不是输出到文件,不必继承FileOutputFormat,而是直接继承OutputFormat;核心是实现RecordWriter,另外两个方法可以参照FileOutputFormat,也可以直接return。
public class MySQLOutputFormat extends OutputFormat<Text, IntWritable>{@Overridepublic RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {RecordWriter<Text, IntWritable> writer = new MyRecordWriter<Text, IntWritable>(job);return writer;}@Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {return; }@Overridepublic OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {return new FileOutputCommitter(null, context);}}
MyRecordWriter
私有属性
private Connection conn = null;private Statement stmt = null;
初始化操作
记得修改自己的url,还有密码
public MyRecordWriter(TaskAttemptContext context) {try {Class.forName("com.mysql.jdbc.Driver");String url = "jdbc:mysql://node103:3306/db_telecom";String username = "root";String password = "root";conn = DriverManager.getConnection(url, username, password);stmt = conn.createStatement();} catch (ClassNotFoundException e) {e.printStackTrace();} catch (SQLException e) {e.printStackTrace();}}
这是最关键的部分,就是查询出两个外键,然后构造成主键,插入mysql
@Overridepublic void write(Text key, IntWritable value) throws IOException, InterruptedException {String[] strs = key.toString().split("_");Integer count = Integer.valueOf(strs[2]);String[] dimensions = strs[1].split("-");String year="-1",month="-1",day="-1";if(dimensions.length>2)day = dimensions[2];if(dimensions.length>1)month = dimensions[1];if(dimensions.length>0)year = dimensions[0];try {//获得手机号码对应的外键String querySql = "SELECT id FROM tb_contacts WHERE telephone = "+strs[0]+";";ResultSet rs = stmt.executeQuery(querySql);rs.next();String id_contact = rs.getString("id");//获得日期对应的外键querySql = "SELECT id FROM tb_dimension_date WHERE "+ "year = "+year+" AND month = "+month+" AND day = "+day+";";rs = stmt.executeQuery(querySql);rs.next();String id_date_dimension = rs.getString("id");//拼凑为主键String id_date_contact = id_date_dimension+"_"+id_contact;//插入String insertSql = "INSERT INTO tb_call "+"(id_date_contact,id_date_dimension,id_contact,"+ "call_sum,call_duration_sum) "+ "VALUES('"+id_date_contact+"',"+id_date_dimension+","+id_contact+","+count+","+value.get()+");";stmt.execute(insertSql);} catch (SQLException e) {e.printStackTrace();}}
设置job
我实现了Tool接口,主要在run中设置
public int run(String[] args) throws Exception {Job job = Job.getInstance(this.getConf());job.setJarByClass(HBase2MysqlDriver.class);TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), HBase2MysqlMapper.class, Text.class, IntWritable.class, job);job.setReducerClass(HBase2MysqlReducer.class);job.setOutputFormatClass(MySQLOutputFormat.class);boolean result = job.waitForCompletion(true);return result?0:1;}
其他的代码就是走个形式了
private Configuration conf;@Overridepublic void setConf(Configuration conf) {this.conf = conf;}@Overridepublic Configuration getConf() {return conf;}
public static void main(String[] args) {try {Configuration conf = HBaseConfiguration.create(); ToolRunner.run(conf, new HBase2MysqlDriver(), args);} catch (Exception e) {e.printStackTrace();}}
运行
直接在Eclipse中运行即可,数据库中的结果如下
博客链接
大数据实战-callLog项目(通话记录数据分析)之项目介绍
大数据实战-callLog项目(通话记录数据分析)之数据生产
大数据实战-callLog项目(通话记录数据分析)之数据采集
大数据实战-callLog项目(通话记录数据分析)之数据分析