笔记目录
- 电信客服案例笔记
- 1.0 Hadoop准备
- 1.1克隆虚拟机102,103,104
- 1.2伪分布式的测试
- 1.3完全分布式搭建
- 1.4对配置文件进行配置
- 2.0 zookeeper准备
- 2.1解压安装
- 2.2文件配置
- 3.0HBase准备
- 3.1解压安装、环境变量
- 3.2文件配置
- 4.0 Flume准备
- 4.1解压安装
- 4.2 配置
- 5.0 Kafka准备
- 5.1解压安装配置环境变量
- 5.2集群启动
- 6.0Redis环境配置
- 7.0业务流程
- 7.1 模块①—数据的生产-采集-消费(P1-P13)
- 7.1.1创建生产者
- P6尚硅谷_数据生产 - 创建生产者对象04:25
- P7尚硅谷_数据生产 - 获取通讯录数据33:32
- P8尚硅谷_数据生产 - 随机生成主被叫电话号码12:59
- P9尚硅谷_数据生产 - 构建通话记录25:40
- 7.1.3数据采集和消费
- 7.2 模块②—HBase数据消费(P14-P22)
- 7.2.1建表
- P14尚硅谷__数据消费 - Hbase数据访问封装32:21
- 7.2.4配置协处理器
- 7.3 模块③—导出数据及运算(P23-P29)
- 7.3.1数据库表设计
- 7.3.2数据计算
- 7.4 模块④—数据展示
- 8.0云服务器部署与搭建
- 8.1工具准备
- 8.2安装JDK
- 8.3安装Tomcat
- 8.4安装MySql
- 8.5配置防火墙
- 8.6配置云服务器安全组
- 8.7Tomcat配置与启动
- 8.8补充知识
- 8.8.1如何将项目打war包
- 8.8.2如何在本地Tomcat运行war包
- 9.0 项目总结
- 9.1项目背景
- 9.2项目流程
- 9.3项目环境
- 9.4项目运行图
- 附件分享:
电信客服案例笔记
本教程主要根据,尚硅谷电信客服项目,从集群的配置,到对业务流程的剖析,同时配备了很多相应的业务架构图,笔记后面部分,包含将本项目部署到服务器中,以及页面展示,希望本教程可以为你带来一定的参考,感谢支持。
如需获取项目源码请点击访问:https://gitee.com/fanggaolei/learning-notes-warehouse
1.0 Hadoop准备
1.1克隆虚拟机102,103,104
(1)修改克隆虚拟机的静态IP
vim /etc/sysconfig/network-scripts/ifcfg-ens33
(2)修改主机名
vim /etc/hostname
(3)连接xshell和XFTP
1.2伪分布式的测试
(1)将下面六个文件放入/opt/software目录下
apache-flume-1.9.0-bin.tar.gzapache-zookeeper-3.5.7-bin.tar.gzhadoop-3.1.3.tar.gzhbase-2.4.11-bin.tar.gzjdk-8u212-linux-x64.tar.gzkafka_2.12-3.0.0.tgz
(2)将JDK和Hadoop两个文件分别解压
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/
(3)添加JDK和Hadoop的环境变量
sudo vim /etc/profile.d/my_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin最后让文件生效:
source /etc/profile
(4)对虚拟机进行重启并验证hadoop和JDK是否安装成功
java -version
hadoop version Hadoop 3.1.3
可以看到
Hadoop 3.1.3
Source code repository https://gitbox.apache.org/repos/asf/hadoop.git -r ba631c436b806728f8ec2f54ab1e289526c90579
Compiled by ztang on 2019-09-12T02:47Z
Compiled with protoc 2.5.0
From source with checksum ec785077c385118ac91aadde5ec9799
This command was run using /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-common-3.1.3.jarjava version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)
跳过伪分布式启动
1.3完全分布式搭建
(1)首先进行ssh免密登录
生成公钥和私钥
[atguigu@hadoop102 .ssh]$ pwd
/home/atguigu/.ssh[atguigu@hadoop102 .ssh]$ ssh-keygen -t rsa
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)将公钥拷贝到要免密登录的目标机器上(每个机器上均执行以下三个命令)
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop102
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop103
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop104
(2)测试成功后进行xsync配置
(a)在/home/atguigu/bin目录下创建xsync文件
[atguigu@hadoop102 opt]$ cd /home/atguigu
[atguigu@hadoop102 ~]$ mkdir bin
[atguigu@hadoop102 ~]$ cd bin
[atguigu@hadoop102 bin]$ vim xsync
在该文件中编写如下代码#!/bin/bash#1. 判断参数个数
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
doecho ==================== $host ====================#3. 遍历所有目录,挨个发送for file in $@do#4. 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir=$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done(b)修改脚本 xsync 具有执行权限
[atguigu@hadoop102 bin]$ chmod +x xsync(c)测试脚本
[atguigu@hadoop102 ~]$ xsync /home/atguigu/bin(d)将脚本复制到/bin中,以便全局调用
[atguigu@hadoop102 bin]$ sudo cp xsync /bin/(e)同步环境变量配置(root所有者)
[atguigu@hadoop102 ~]$ sudo ./bin/xsync /etc/profile.d/my_env.sh
注意:如果用了sudo,那么xsync一定要给它的路径补全。让环境变量生效
[atguigu@hadoop103 bin]$ source /etc/profile
[atguigu@hadoop104 opt]$ source /etc/profile
(3)检查103 104 上的环境变量是否生效,若果没有生效可以重启虚拟机再次检查
java -version
hadoop version Hadoop 3.1.3
1.4对配置文件进行配置
(1)核心配置文件
[atguigu@hadoop102 ~]$ cd $HADOOP_HOME/etc/hadoop
[atguigu@hadoop102 hadoop]$ vim core-site.xml<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><!-- 指定NameNode的地址 --><property><name>fs.defaultFS</name><value>hdfs://hadoop102:8020</value></property><!-- 指定hadoop数据的存储目录 --><property><name>hadoop.tmp.dir</name><value>/opt/module/hadoop-3.1.3/data</value></property><!-- 配置HDFS网页登录使用的静态用户为atguigu --><property><name>hadoop.http.staticuser.user</name><value>atguigu</value> !!!记得换成自己的用户名</property>
</configuration>
(2)HDFS 配置文件
[atguigu@hadoop102 hadoop]$ vim hdfs-site.xml<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><!-- nn web端访问地址--><property><name>dfs.namenode.http-address</name><value>hadoop102:9870</value></property><!-- 2nn web端访问地址--><property><name>dfs.namenode.secondary.http-address</name><value>hadoop104:9868</value></property>
</configuration>
(3)YARN 配置文件
[atguigu@hadoop102 hadoop]$ vim yarn-site.xml<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><!-- 指定MR走shuffle --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><!-- 指定ResourceManager的地址--><property><name>yarn.resourcemanager.hostname</name><value>hadoop103</value></property><!-- 环境变量的继承 --><property><name>yarn.nodemanager.env-whitelist</name><value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value></property>
<!-- 开启日志聚集功能 -->
<property><name>yarn.log-aggregation-enable</name><value>true</value>
</property>
<!-- 设置日志聚集服务器地址 -->
<property> <name>yarn.log.server.url</name> <value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!-- 设置日志保留时间为7天 -->
<property><name>yarn.log-aggregation.retain-seconds</name><value>604800</value>
</property>
</configuration>
(4)MapReduce 配置文件
[atguigu@hadoop102 hadoop]$ vim mapred-site.xml<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><!-- 指定MapReduce程序运行在Yarn上 --><property><name>mapreduce.framework.name</name><value>yarn</value></property>
<!-- 历史服务器端地址 -->
<property><name>mapreduce.jobhistory.address</name><value>hadoop102:10020</value>
</property><!-- 历史服务器web端地址 -->
<property><name>mapreduce.jobhistory.webapp.address</name><value>hadoop102:19888</value>
</property>
</configuration>
(5)配置 workers
[atguigu@hadoop102 hadoop]$ vim /opt/module/hadoop-3.1.3/etc/hadoop/workershadoop102
hadoop103
hadoop104同步所有配置
[atguigu@hadoop102 hadoop]$ xsync /opt/module/hadoop-3.1.3/etc
(6)初始化NameNode
[atguigu@hadoop102 hadoop-3.1.3]$ hdfs namenode -format
(7)编写myhadoop.sh脚本
[atguigu@hadoop102 ~]$ cd /home/atguigu/bin
[atguigu@hadoop102 bin]$ vim myhadoop.sh#!/bin/bashif [ $# -lt 1 ]
thenecho "No Args Input..."exit ;
ficase $1 in
"start")echo " =================== 启动 hadoop集群 ==================="echo " --------------- 启动 hdfs ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"echo " --------------- 启动 yarn ---------------"ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"echo " --------------- 启动 historyserver ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")echo " =================== 关闭 hadoop集群 ==================="echo " --------------- 关闭 historyserver ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"echo " --------------- 关闭 yarn ---------------"ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"echo " --------------- 关闭 hdfs ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)echo "Input Args Error..."
;;
esac记得添加执行权限
[atguigu@hadoop102 bin]$ chmod +x myhadoop.sh
(8)编写jpsall脚本
[atguigu@hadoop102 ~]$ cd /home/atguigu/bin
[atguigu@hadoop102 bin]$ vim jpsal#!/bin/bashfor host in hadoop102 hadoop103 hadoop104
doecho =============== $host ===============ssh $host jps
done添加脚本并分发脚本
[atguigu@hadoop102 bin]$ chmod +x jpsall
[atguigu@hadoop102 ~]$ xsync /home/atguigu/bin/
在103 104上添加执行权限
(9)启动集群
myhadoop.sh start 启动集群jpsall 查看所有节点是否启动
(10)查看3个网址是否可以正常访问
http://hadoop102:9870
http://hadoop103:8088/
http://hadoop102:19888/jobhistory
如果节点都正常启动,页面无法访问,检查hosts文件是否修改,如果无误,可以重写启动,并进行测试
2.0 zookeeper准备
2.1解压安装
解压
[atguigu@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/更名
[atguigu@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
2.2文件配置
配置服务器编号
(1)在/opt/module/zookeeper-3.5.7/这个目录下创建 zkData
[atguigu@hadoop102 zookeeper-3.5.7]$ mkdir zkData
(2)在/opt/module/zookeeper-3.5.7/zkData 目录下创建一个 myid 的文件
[atguigu@hadoop102 zkData]$ vi myid在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)2[atguigu@hadoop102 module ]$ xsync zookeeper-3.5.7vim /opt/module/zookeeper-3.5.7/zkData/myid
并分别在 hadoop103、hadoop104 上修改 myid 文件中内容为 3、4
(3)配置zoo.cfg文件
(1)重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg
[atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg(2)打开 zoo.cfg 文件
[atguigu@hadoop102 conf]$ vim zoo.cfg#修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.5.7/zkData
#增加如下配置
#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888(3)同步 zoo.cfg 配置文件
[atguigu@hadoop102 conf]$ xsync zoo.cfg
(4)编写启停脚本
1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本
[atguigu@hadoop102 bin]$ vim zk.sh#!/bin/bash
case $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 启动 ------------ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 停止 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"done
};;
"status"){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 状态 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"done
};;
esac2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod u+x zk.sh3)Zookeeper 集群启动脚本
[atguigu@hadoop102 module]$ zk.sh start4)Zookeeper 集群停止脚本
[atguigu@hadoop102 module]$ zk.sh stop输入jpsall正常启动即可
(5)zookeeper的操作
启动zookeeper客户端
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop102:2181查看zookeeper节点信息
[zk: hadoop102:2181(CONNECTED) 0] ls /
3.0HBase准备
HBase 通过 Zookeeper 来做 master 的高可用、记录 RegionServer 的部署信息、并且存储有 meta 表的位置信息。
HBase 对于数据的读写操作时直接访问 Zookeeper 的,在 2.3 版本推出 Master Registry 模式,客户端可以直接访问 master。使用此功能,会加大对 master 的压力,减轻对 Zookeeper的压力。
3.1解压安装、环境变量
1)解压 Hbase 到指定目录
[atguigu@hadoop102 software]$ tar -zxvf hbase-2.4.11-bin.tar.gz -C /opt/module/
[atguigu@hadoop102 software]$ mv /opt/module/hbase-2.4.11 /opt/module/hbase2)配置环境变量
[atguigu@hadoop102 ~]$ sudo vim /etc/profile.d/my_env.sh添加
:3)使用 source 让配置的环境变量生效
[atguigu@hadoop102 module]$ source /etc/profile.d/my_env.sh[fang@hadoop102 hbase]$ sudo /home/fang/bin/xsync /etc/profile.d/my_env.sh
3.2文件配置
1)hbase-env.sh 修改内容,可以添加到最后:
在hbase/conf目录下
[fang@hadoop102 conf]$ vim hbase-env.sh最后面添加一句
export HBASE_MANAGES_ZK=false
2)hbase-site.xml 修改内容: 把原先标签内的内容删掉,直接替换成下方的即可
<property><name>hbase.cluster.distributed</name><value>true</value>
</property><property><name>hbase.zookeeper.quorum</name><value>hadoop102,hadoop103,hadoop104</value><description>The directory shared by RegionServers.</description></property><!-- <property>-->
<!-- <name>hbase.zookeeper.property.dataDir</name>-->
<!-- <value>/export/zookeeper</value>-->
<!-- <description> 记得修改 ZK 的配置文件 -->
<!-- ZK 的信息不能保存到临时文件夹-->
<!-- </description>-->
<!-- </property>--><property><name>hbase.rootdir</name><value>hdfs://hadoop102:8020/hbase</value><description>The directory shared by RegionServers.</description></property><property><name>hbase.cluster.distributed</name><value>true</value></property>
3)修改regionservers
hadoop102
hadoop103
hadoop104
4)解决 HBase 和 Hadoop 的 log4j 兼容性问题,修改 HBase 的 jar 包,使用 Hadoop 的 jar 包
[atguigu@hadoop102hbase]$mv /opt/module/hbase/lib/client-facing-thirdparty/slf4j-reload4j-1.7.33.jar /opt/module/hbase/lib/client-facing-thirdparty/slf4j-reload4j-1.7.33.jar.bak发送到其他
xsync hbase/
5)Hbase启停
[atguigu@hadoop102 hbase]$ bin/start-hbase.sh
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh
4.0 Flume准备
4.1解压安装
解压
[atguigu@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/改名
[atguigu@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
[atguigu@hadoop102 lib]$ rm /opt/module/flume/lib/guava-11.0.2.jar
4.2 配置
(1)安装 netcat 工具
[atguigu@hadoop102 software]$ sudo yum install -y nc(2)判断 44444 端口是否被占用
[atguigu@hadoop102 flume-telnet]$ sudo netstat -nlp | grep 44444(3)创建 Flume Agent 配置文件 flume-netcat-logger.conf
(4)在 flume 目录下创建 job 文件夹并进入 job 文件夹。
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ cd job/(5)在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。
[atguigu@hadoop102 job]$ vim flume-netcat-logger.conf(6)在 flume-netcat-logger.conf 文件中添加如下内容。
添加内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1(7)先开启 flume 监听端口
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console复制102会话(8)使用 netcat 工具向本机的 44444 端口发送内容
[atguigu@hadoop102 ~]$ nc localhost 44444
到这就可以了,项目中会继续进行配置
5.0 Kafka准备
5.1解压安装配置环境变量
解压
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/改名
[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka进入到/opt/module/kafka 目录,修改配置文件
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties修改三个地方:记得找对相应的位置#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka分发
[atguigu@hadoop102 module]$ xsync kafka/分别在 hadoop103 和 hadoop104 上修改配置文件
[fang@hadoop102 flume]$ vim /opt/module/kafka/config/server.properties
中的 broker.id=1、broker.id=2在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
[atguigu@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新
[atguigu@hadoop102 module]$ source /etc/profile分发并刷新
[atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
[atguigu@hadoop103 module]$ source /etc/profile
[atguigu@hadoop104 module]$ source /etc/profile
5.2集群启动
(1)先启动 Zookeeper 集群,然后启动 Kafka。
[atguigu@hadoop102 kafka]$ zk.sh start(2)集群脚本
1)在/home/atguigu/bin 目录下创建文件 kf.sh 脚本文件
[atguigu@hadoop102 bin]$ vim kf.sh#! /bin/bash
case $1 in"start" ){for i in hadoop102 hadoop103 hadoop104;doecho "==============$i start=============="ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;"stop" ){for i in hadoop102 hadoop103 hadoop104;doecho "==============$i stop=============="ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"done
};;
esac2)添加执行权限
[atguigu@hadoop102 bin]$ chmod +x kf.sh
3)启动集群命令
[atguigu@hadoop102 ~]$ kf.sh start
4)停止集群命令
[atguigu@hadoop102 ~]$ kf.sh stop
6.0Redis环境配置
1.将软件包上传到/opt/softwere目录下,解压到/opt/modul下
[root@linux softwere]# tar -zxvf redis-3.0.0.tar.gz -C /opt/module/
2.进入redis目录进行启动
[root@linux redis-3.0.0]# make
出现如下报错:没有找到gcc,此时需要安装gcc
[root@linux module]# yum -y install gcc
安装完成后验证是否安装成功
[root@linux module]# gcc -v
3.再次进行make出现如下报错
此时执行
#清理残余文件
make distclean#再次进行make
make
执行成功
4.执行
make install
5.查看安装目录
[root@linux redis-3.0.0]# cd /usr/local/bin/
6.回到Redis目录备份文件
#在根目录创建文件夹
[root@linux /]# mkdir myredis#将redis.conf移动到文件夹汇总
[root@linux redis-3.0.0]# cp redis.conf /myredis/#在myredis目录中编辑文件
[root@linux myredis]# vi redis.conf
#切换目录
[root@linux myredis]# cd /usr/local/bin/#启动redis
[root@linux bin]# redis-server /myredis/redis.conf #查看客户端
[root@linux bin]# redis-cli -p 6379
测试
查看redis在后台是否启动
[root@linux bin]# ps -ef|grep redis
集群环境查看
此时输入:jpsall 可查看到一下节点相应信息
软件 | hadoop102 | hadoop103 | hadoop104 |
---|---|---|---|
Hadoop HDFS | NameNode | SecondaryNameNode | |
DataNode | DataNode | DataNode | |
Hadoop YARN | ResourceManager | ||
NodeManager | NodeManager | NodeManager | |
Hadoop历史服务器 | JobHistoryServer | ||
Zookeeper | QuorumPeerMain | QuorumPeerMain | QuorumPeerMain |
Kafka | Kafka | Kafka | Kafka |
HBase | HMaster | ||
HRegionServer | HRegionServer | HRegionServer | |
jsp | jsp | jsp |
集群停止脚本:**
Hadoop启停:
myhadoop.sh start
myhadoop.sh stopHBase启停:
bin/start-hbase.sh
bin/stop-hbase.shZookeeper启停: 先启停Zookeeper再启停Kafka
zk.sh start
zk.sh stopKafKa启停:
kf.sh start
kf.sh stop各大网址:
http://hadoop102:9870
http://hadoop103:8088/
http://hadoop102:19888/jobhistory
http://hadoop102:16010/master-status
注意:
Hbase启动前先启动zookeeper再启动Hadoop
必须先关闭KafKa再关闭Zookeeper
7.0业务流程
7.1 模块①—数据的生产-采集-消费(P1-P13)
7.1.1创建生产者
P6尚硅谷_数据生产 - 创建生产者对象04:25
public class Bootstrap {public static void main(String[] args) throws IOException {if(args.length<2){System.out.println("系统参数不正确,请按照指定格式传递");System.exit(1);}//构建生产者对象Producer producer=new LocalFileProducer();// producer.setIn(new LocalFileDataIn("D:\\大数据学习资料\\尚硅谷大数据技术之电信客服综合案例\\2.资料\\辅助文档\\contact.log"));
// producer.setOut(new LocalFileDataOut("D:\\大数据学习资料\\尚硅谷大数据技术之电信客服综合案例\\2.资料\\辅助文档\\call.log"));producer.setIn(new LocalFileDataIn(args[0]));producer.setOut(new LocalFileDataOut(args[1]));//生产数据producer.producer();//关闭生产者对象producer.close();}
}
P7尚硅谷_数据生产 - 获取通讯录数据33:32
创建输入流,将数据保存到集合中
public class LocalFileDataIn implements DataIn {private BufferedReader reader = null;/*** 构造方法* @param path*/public LocalFileDataIn(String path) {setPath(path);}/*** 设置路径* 创建输入流* @param path*/public void setPath(String path) {try {reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();}}public Object read() throws IOException {return null;}/*** 读取数据,将数据返回到集合中** @param <T>* @param clazz* @throws IOException* @return*/public <T extends Data> List<T> read(Class<T> clazz) throws IOException {List<T> ts = new ArrayList<T>();try {//从数据文件中读取所有的数据String line = null;while ((line = reader.readLine()) != null) {//将数据转换为指定类型的对象,封装为集合返回T t = (T) clazz.newInstance();t.setValue(line);ts.add(t);}} catch (Exception e) {e.printStackTrace();}return ts;}/*** 关闭资源* @throws IOException*/public void close() throws IOException {if (reader!=null){reader.close();}}
}
P8尚硅谷_数据生产 - 随机生成主被叫电话号码12:59
从集合中获取两条数据构成主叫和被叫,并随机生成通话 日期和时间
/*** 本地数据文件的生产者*/
public class LocalFileProducer implements Producer {/*** 数据来源* @param in*/private DataIn in;private DataOut out;private volatile boolean flg=true; //增强内存可见性public void setIn(DataIn in) {this.in=in;}/*** 数据输出* @param out*/public void setOut(DataOut out) {this.out=out;}/*** 数据生产*/public void producer() {/*** 数据返回类型为一个对象*/try {List<Contact> contacts=in.read(Contact.class);//读取通讯录的数据while ( flg ){int call1Index=new Random().nextInt(contacts.size());int call2Index;while (true){call2Index=new Random().nextInt(contacts.size());if (call1Index!=call2Index){break;}}Contact call1=contacts.get(call1Index);Contact call2=contacts.get(call2Index);//生成随机的通话时间String startDate="20180101000000"; //开始时间String endDate="20190101000000"; //结束时间long startTime= DataUtil.parse(startDate,"yyyyMMddHHmmss").getTime();//通话时间long endtime=DataUtil.parse(endDate,"yyyyMMddHHmmss").getTime();//通话时间字符串long calltime=startTime+(long)((endtime-startTime)* Math.random());//通话时间字符串String callTimeString=DataUtil.format(new Date(calltime),"yyyyMMddHHmmss");//生成随机的通话时长String duration= NumberUtil.format(new Random().nextInt(3000),4);//生成通话记录Callog log=new Callog(call1.getTel(),call2.getTel(),callTimeString,duration);System.out.println(log);//将通话记录写入到数据文件中out.write(log);Thread.sleep(500);}}catch (Exception e){e.printStackTrace();}}/*** 关闭生产者* @throws IOException*/public void close() throws IOException {if (in!=null){in.close();}if (out!=null){out.close();}}
}
P9尚硅谷_数据生产 - 构建通话记录25:40
将数据封装为对象输出到通话日志中
/*** 本地文件数据输出*/
public class LocalFileDataOut implements DataOut {private PrintWriter writer=null;public LocalFileDataOut(String path){setPath(path);}/*** 设置路径* @param path*/public void setPath(String path) {try {writer=new PrintWriter(new OutputStreamWriter(new FileOutputStream(path),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();}}public void write(Object data) throws Exception {write(data.toString());}/*** 将数据字符串生成到文件中* @param data* @throws Exception*/public void write(String data) throws Exception {writer.println(data);writer.flush();}/*** 释放资源* @throws IOException*/public void close() throws IOException {if(writer!=null){writer.close();}}
}
public class Callog {private String call1;private String call2;private String calltime;private String duration;public Callog(String call1, String call2, String calltime, String duration) {this.call1 = call1;this.call2 = call2;this.calltime = calltime;this.duration = duration;}@Overridepublic String toString() {return call1+"\t"+call2+"\t"+calltime+"\t"+duration;}public String getCall1() {return call1;}public void setCall1(String call1) {this.call1 = call1;}public String getCall2() {return call2;}public void setCall2(String call2) {this.call2 = call2;}public String getCalltime() {return calltime;}public void setCalltime(String calltime) {this.calltime = calltime;}public String getDuration() {return duration;}public void setDuration(String duration) {this.duration = duration;}
}
7.1.3数据采集和消费
配置Flume的配置文件监听日志信息和Kafka输出存储区
在kafka创建一个ct的数据存储区开启kafka获取到数据
到指定目录下配置flume
[fang@hadoop102 kafka]$ cd /opt/module/data/
[fang@hadoop102 data]$ vim flume-2-kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/call.log
a1.sources.r1.shell = /bin/bash -c# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动jar包:
[fang@hadoop102 data]$ java -jar ct_producer.jar contact.log call.log
在kafka目录下创建topic:
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic ct --partitions 3 --replication-factor 2
kafka开始消费
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 -topic ct
启动flume对数据进行采集:
bin/flume-ng agent -c conf/ -n a1 -f /opt/module/data/flume-2-kafka.conf
使用Kafka消费者API获取数据,并将数据发送到Hbase中
/*** 启动Kafka消费者**/
//使用kafka消费者获取Flume采集数据//将数据存储到HBase中
public class Bootstrap {public static void main(String[] args) throws IOException {//创建消费者Consumer consumer=new CallogConsumer();//消费数据consumer.consume();//关闭资源consumer.close();}
}
/*** 通话日志消费者*/
public class CallogConsumer implements Consumer {/*** 消费数据*/public void consume() {try {//创建配置对象Properties prop =new Properties();prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));//获取flume采集的数据KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(prop);//关注主题consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));//HBase数据访问对象HBaseDao dao=new HBaseDao();dao.init();//初始化HBase//消费数据while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(100);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());dao.insertData(consumerRecord.value());//将数据插入HBase中//Callog log=new Callog(consumerRecord.value());//dao.insertData(log);}}} catch (IOException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}/*** 关闭资源* @throws IOException*/public void close() throws IOException {}
}
7.2 模块②—HBase数据消费(P14-P22)
7.2.1建表
P14尚硅谷__数据消费 - Hbase数据访问封装32:21
在Hbase中创建对应的表格,设置两个列族和rowkey
/*** HBase的数据访问对象*/
public class HBaseDao extends BaseDao {/*** 初始化*/public void init() throws Exception {start();//创建命名空间ctcreateNamespaseNX(Names.NAMESPACE.getValue());//创建表名ct:calllog 分区数为6createTableXX(Names.TABLE.getValue(),"com.fang.ct.consumer.coprocessor.InsertCalleeCoprocessor", 6,Names.CF_CALLER.getValue(),Names.CF_CALLEE.getValue());end();}/*** 插入对象* @param log* @throws Exception*/public void insertData(Callog log) throws Exception{log.setRowkey(genRegionNum(log.getCall1(),log.getCalltime())+"_"+log.getCall1()+"_"+log.getCalltime()+"_"+log.getCall2()+"_"+log.getDuration());putData(log);}/*** 插入数据*/public void insertData(String value) throws Exception{//将通话日志保存到Hbase的表中//1.获取通话日志数据String[] values= value.split("\t");String call1=values[0];String call2=values[1];String calltime=values[2];String duration=values[3];//2.创建数据对象//rowkey设计//主叫用户String rowkey=genRegionNum(call1,calltime)+"_"+call1+"_"+calltime+"_"+call2+"_"+duration+ "_1";Put put =new Put(Bytes.toBytes(rowkey));byte[] family=Bytes.toBytes(Names.CF_CALLER.getValue());put.addColumn(family,Bytes.toBytes("call1"),Bytes.toBytes(call1));put.addColumn(family,Bytes.toBytes("call2"),Bytes.toBytes(call2));put.addColumn(family,Bytes.toBytes("calltime"),Bytes.toBytes(calltime));put.addColumn(family,Bytes.toBytes("duration"),Bytes.toBytes(duration));put.addColumn(family, Bytes.toBytes("flg"), Bytes.toBytes("1"));String calleeRowkey = genRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";// // 被叫用户
// Put calleePut = new Put(Bytes.toBytes(calleeRowkey));
// byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());
// calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));
// calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));
// calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));
// calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));
// calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));// 3. 保存数据List<Put> puts = new ArrayList<Put>();puts.add(put);// puts.add(calleePut);putData(Names.TABLE.getValue(), puts);}
}
获取连接对象,设置表的相关方法,生成分区键,分区号,数据的插入,删除等
/*** 基础的数据访问对象*/
public abstract class BaseDao {/****/private ThreadLocal<Connection> connHolder=new ThreadLocal<Connection>();private ThreadLocal<Admin> adminHolder=new ThreadLocal<Admin>();protected void start() throws Exception{getConnection();getAdmin();}protected void end() throws Exception{Admin admin=getAdmin();if (admin!=null){admin.close();adminHolder.remove();}Connection conn=getConnection();if (conn!=null){conn.close();;connHolder.remove();}}/*** 创建表,如果表已经存在,name删除后创建新的* @param name* @param family* @throws IOException*/protected void createTableXX(String name,String... family) throws Exception {createTableXX(name,null,null,family);}protected void createTableXX(String name,String coprocessorClass,Integer regionCount,String... family) throws Exception {Admin admin=getAdmin();TableName tableName= TableName.valueOf(name);if (admin.tableExists(tableName)){//表存在,删除表deleteTable(name);}//创建表createTable(name,coprocessorClass,regionCount,family);}private void createTable(String name,String coprocessorClass,Integer regionCount,String... family) throws Exception{Admin admin=getAdmin();TableName tablename=TableName.valueOf(name);HTableDescriptor tableDescriptor=new HTableDescriptor(tablename);if (family==null||family.length==0){family=new String[1];family[0]= Names.CF_INFO.getValue();}for (String families : family) {HColumnDescriptor columnDescriptor=new HColumnDescriptor(families);tableDescriptor.addFamily(columnDescriptor);}if(coprocessorClass!=null&& !"".equals(coprocessorClass)){tableDescriptor.addCoprocessor(coprocessorClass);}//增加预分区if(regionCount==null||regionCount<=0){admin.createTable(tableDescriptor);}else {//分区键byte[][] splitKeys=genSpliKeys(regionCount);admin.createTable(tableDescriptor,splitKeys);}}/*** 获取查询时startrow,stoprom集合* @return*/protected List<String[]> getStartStorRowkeys(String tel,String start,String end){List<String[]> rowkeyss=new ArrayList<String[]>();String startTime = start.substring(0, 6);String endTime = end.substring(0, 6);Calendar startCal = Calendar.getInstance();startCal.setTime(DataUtil.parse(startTime, "yyyyMM"));Calendar endCal = Calendar.getInstance();endCal.setTime(DataUtil.parse(endTime, "yyyyMM"));while (startCal.getTimeInMillis() <= endCal.getTimeInMillis()) {// 当前时间String nowTime = DataUtil.format(startCal.getTime(), "yyyyMM");int regionNum = genRegionNum(tel, nowTime);String startRow = regionNum + "_" + tel + "_" + nowTime;String stopRow = startRow + "|";String[] rowkeys = {startRow, stopRow};rowkeyss.add(rowkeys);// 月份+1startCal.add(Calendar.MONTH, 1);}return rowkeyss;}/*** 计算分区号* @param tel* @param date* @return*/protected int genRegionNum(String tel,String date){String usercode=tel.substring(tel.length()-4);String yearMonth=date.substring(0,6);int userCodehash=usercode.hashCode();int yearMonthHash=yearMonth.hashCode();//crc校验采用异或算法int crc=Math.abs(userCodehash ^ yearMonthHash);//取模int regionNum=crc% ValueConstant.REGION_COUNT;return regionNum;}/*** 生成分区键* @return*/private byte[][] genSpliKeys(int regionCount){int splitkeyCount=regionCount-1;byte[][] bs=new byte[splitkeyCount][];//0,1,2,3,4List<byte[]> bsList=new ArrayList<byte[]>();for (int i = 0; i < splitkeyCount; i++) {String splitkey=i+"|";bsList.add(Bytes.toBytes(splitkey));}//Collections.sort(bsList,new Bytes.ByteArrayComparator());bsList.toArray(bs);return bs;}/*** 增加对象:自动封装数据,将对象数据直接保存到Hbase中* @param obj* @throws Exception*/protected void putData(Object obj) throws Exception{// 反射Class clazz = obj.getClass();TableRef tableRef = (TableRef)clazz.getAnnotation(TableRef.class);String tableName = tableRef.value();Field[] fs = clazz.getDeclaredFields();String stringRowkey = "";for (Field f : fs) {Rowkey rowkey = f.getAnnotation(Rowkey.class);if ( rowkey != null ) {f.setAccessible(true);stringRowkey = (String)f.get(obj);break;}}//获取表对象Connection connection=getConnection();Table table = connection.getTable(TableName.valueOf(tableName));Put put=new Put(Bytes.toBytes(stringRowkey));for (Field f : fs) {Column column = f.getAnnotation(Column.class);if (column != null) {String family = column.family();String colName = column.column();if (colName == null || "".equals(colName)) {colName = f.getName();}f.setAccessible(true);String value = (String) f.get(obj);put.addColumn(Bytes.toBytes(family), Bytes.toBytes(colName), Bytes.toBytes(value));}}//增加数据table.put(put);//关闭表table.close();}/*** 增加多条数据* @param name* @param puts* @throws IOException*/protected void putData( String name, List<Put> puts ) throws Exception {// 获取表对象Connection conn = getConnection();Table table = conn.getTable(TableName.valueOf(name));// 增加数据table.put(puts);// 关闭表table.close();}/*** 增加数据* @param name* @param put*/protected void putData(String name, Put put) throws IOException {//获取表对象Connection connection=getConnection();Table table = connection.getTable(TableName.valueOf(name));//增加数据table.put(put);//关闭表table.close();}/*** 删除表格* @param name* @throws Exception*/protected void deleteTable(String name) throws Exception{TableName tableName=TableName.valueOf(name);Admin admin=getAdmin();admin.disableTable(tableName);admin.deleteTable(tableName);}/*** 创建命名空间,如果命名空间已经存在,不需要创建,否则创建新的* @param namespace*/protected void createNamespaseNX(String namespace) throws IOException {Admin admin =getAdmin();try{admin.getNamespaceDescriptor(namespace);}catch (NamespaceNotFoundException e){admin.createNamespace(NamespaceDescriptor.create(namespace).build());}}/*** 获取管理对象*/protected synchronized Admin getAdmin() throws IOException {Admin admin = adminHolder.get();if(admin==null){admin=getConnection().getAdmin();adminHolder.set(admin);}return admin;}/*** 获取连接对象* @return*/protected synchronized Connection getConnection() throws IOException {Connection conn=connHolder.get();if(conn==null){Configuration conf= HBaseConfiguration.create();conn= ConnectionFactory.createConnection(conf);connHolder.set(conn);}return conn;}}
7.2.4配置协处理器
用于区分主叫和被叫,减少数据的插入量
/**** 使用协处理器保存被叫用户的数据** 协处理器的使用* 1. 创建类* 2. 让表找到协处理类(和表有关联)* 3. 将项目打成jar包发布到hbase中(关联的jar包也需要发布),并且需要分发*/
public class InsertCalleeCoprocessor extends BaseRegionObserver {// 方法的命名规则// login// logout// prePut// doPut :模板方法设计模式// 存在父子类:// 父类搭建算法的骨架// 1. tel取用户代码,2时间取年月,3,异或运算,4 hash散列// 子类重写算法的细节// do1. tel取后4位,do2,201810, do3 ^, 4, % &// postPut/*** 保存主叫用户数据之后,由Hbase自动保存被叫用户数据* @param e* @param put* @param edit* @param durability* @throws IOException*/@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {// 获取表Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));// 主叫用户的rowkeyString rowkey = Bytes.toString(put.getRow());// 1_133_2019_144_1010_1String[] values = rowkey.split("_");CoprocessorDao dao = new CoprocessorDao();String call1 = values[1];String call2 = values[3];String calltime = values[2];String duration = values[4];String flg = values[5];if ( "1".equals(flg) ) {// 只有主叫用户保存后才需要触发被叫用户的保存String calleeRowkey = dao.getRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";// 保存数据Put calleePut = new Put(Bytes.toBytes(calleeRowkey));byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));table.put( calleePut );}// 关闭表table.close();}private class CoprocessorDao extends BaseDao {public int getRegionNum(String tel, String time) {return genRegionNum(tel, time);}}
}
7.3 模块③—导出数据及运算(P23-P29)
7.3.1数据库表设计
7.3.2数据计算
用于map和reduce,通过对应的方法获取Hbase的数据
/*** 分析数据工具类*/
public class AnalysisTextTool implements Tool {public int run(String[] args) throws Exception {Job job = Job.getInstance();job.setJarByClass(AnalysisTextTool.class);//从Hbase中读去数据Scan scan = new Scan();scan.addFamily(Bytes.toBytes(Names.CF_CALLER.getValue()));// mapperTableMapReduceUtil.initTableMapperJob(Names.TABLE.getValue(),scan,AnalysisTextMapper.class,Text.class,Text.class,job);// reducerjob.setReducerClass(AnalysisTextReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setOutputFormatClass(MySQLRedisTextOutputFormat.class);boolean flg = job.waitForCompletion(true);if ( flg ) {return JobStatus.State.SUCCEEDED.getValue();} else {return JobStatus.State.FAILED.getValue();}}public void setConf(Configuration configuration) {}public Configuration getConf() {return null;}
}
将从Hbase中获取到的数据进行拆封,封装为集合
/*** maper*/
public class AnalysisTextMapper extends TableMapper<Text, Text> {@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {String rowkey = Bytes.toString(key.get());String[] values = rowkey.split("_");String call1 = values[1];String call2 = values[3];String calltime = values[2];String duration = values[4];String year = calltime.substring(0, 4);String month = calltime.substring(0, 6);String date= calltime.substring(0, 8);// 主叫用户 - 年context.write(new Text(call1+"_"+year), new Text(duration));// 主叫用户 - 月context.write(new Text(call1+"_"+month), new Text(duration));// 主叫用户 - 日context.write(new Text(call1+"_"+date), new Text(duration));// 被叫用户 - 年context.write(new Text(call2+"_"+year), new Text(duration));// 被叫用户 - 月context.write(new Text(call2+"_"+month), new Text(duration));// 被叫用户 - 日context.write(new Text(call2+"_"+date), new Text(duration));}
}
获取到Mapper的数据对数据进行对应的运算
/*** 分析数据的Reducer*/
public class AnalysisTextReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int sumCall = 0;int sumDuration = 0;for (Text value : values) {int duration = Integer.parseInt(value.toString());sumDuration = sumDuration + duration;sumCall++;}context.write(key, new Text(sumCall + "_" + sumDuration));}}
将数据上传到Mysql,并通过Redis对数据表进行缓存
/*** Mysql的数据格式化输出对象*/
public class MySQLTextOutputFormat extends OutputFormat<Text, Text> {protected static class MySQLRecordWriter extends RecordWriter<Text,Text> {private Connection connection =null;private Map<String,Integer> userMap=new HashMap<String, Integer>();Map<String, Integer> dateMap = new HashMap<String, Integer>();public MySQLRecordWriter() {// 获取资源connection = JDBCUtil.getConnection();PreparedStatement pstat = null;ResultSet rs = null;try {String queryUserSql = "select id, tel from ct_user";pstat = connection.prepareStatement(queryUserSql);rs = pstat.executeQuery();while ( rs.next() ) {Integer id = rs.getInt(1);String tel = rs.getString(2);userMap.put(tel, id);}rs.close();String queryDateSql = "select id, year, month, day from ct_date";//将整张表中的数据查出pstat = connection.prepareStatement(queryDateSql);rs = pstat.executeQuery();while ( rs.next() ) {Integer id = rs.getInt(1);String year = rs.getString(2);String month = rs.getString(3);if ( month.length() == 1) {month = "0" + month;}String day = rs.getString(4);if ( day.length() == 1 ) {day = "0" + day;}dateMap.put(year + month + day, id);}} catch (Exception e) {e.printStackTrace();} finally {if ( rs != null ) {try {rs.close();} catch (SQLException e) {e.printStackTrace();}}if ( pstat != null ) {try {pstat.close();} catch (SQLException e) {e.printStackTrace();}}}}/*** 输出数据* @param key* @param value* @throws IOException* @throws InterruptedException*/@Overridepublic void write(Text key, Text value) throws IOException, InterruptedException {String[] values = value.toString().split("_");String sumCall = values[0];String sumDuration = values[1];PreparedStatement pstat = null;try {String insertSQL = "insert into ct_call ( telid, dateid, sumcall, sumduration ) values ( ?, ?, ?, ? )";pstat = connection.prepareStatement(insertSQL);String k = key.toString();String[] ks = k.split("_");String tel = ks[0];String date = ks[1];pstat.setInt(1, userMap.get(tel));pstat.setInt(2, dateMap.get(date));pstat.setInt(3, Integer.parseInt(sumCall) );pstat.setInt(4, Integer.parseInt(sumDuration));pstat.executeUpdate();} catch (SQLException e) {e.printStackTrace();} finally {if ( pstat != null ) {try {pstat.close();} catch (SQLException e) {e.printStackTrace();}}}}/*** 释放资源* @param context* @throws IOException* @throws InterruptedException*/@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if ( connection != null ) {try {connection.close();} catch (SQLException e) {e.printStackTrace();}}}}@Overridepublic RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {return new MySQLRecordWriter();}@Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {}private FileOutputCommitter committer = null;public static Path getOutputPath(JobContext job) {String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);return name == null ? null: new Path(name);}public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {if (committer == null) {Path output = getOutputPath(context);committer = new FileOutputCommitter(output, context);}return committer;}
}
7.4 模块④—数据展示
通过controlller层获取浏览器请求
/*** 通话日志控制器对象*/
@Controller
public class CalllogController {@Autowiredprivate CalllogService calllogService;@RequestMapping("/query")public String query() {return "query";}// Object ==> json ==> String//@ResponseBody@RequestMapping("/view")public String view( String tel, String calltime, Model model ) {// 查询统计结果 : MysqlList<Calllog> logs = calllogService.queryMonthDatas(tel, calltime);System.out.println(logs.size());model.addAttribute("calllogs", logs);return "view";}
}
通过service层调用dao层接口获取数据,并将数据封装为对象
/** 通话日志服务对象*/
@Service
public class CalllogServiceImpl implements CalllogService {@Autowiredprivate CalllogDao calllogDao;/*** 查询用户指定时间的通话统计信息* @param tel* @param calltime* @return*/@Overridepublic List<Calllog> queryMonthDatas(String tel, String calltime) {Map<String, Object> paramMap = new HashMap<String, Object>();paramMap.put("tel", tel);if ( calltime.length() > 4 ) {calltime = calltime.substring(0, 4);}paramMap.put("year", calltime);System.out.println(paramMap);return calllogDao.queryMonthDatas(paramMap);}
}
Dao层获取数据
/*
* 通话日志数据访问对象*/
public interface CalllogDao {List<Calllog> queryMonthDatas(Map<String, Object> paramMap);
}
通过Mybatiis配置文件执行SQL对数据库中的数据进行查询
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.atguigu.ct.web.dao.CalllogDao" ><select id="queryMonthDatas" resultType="com.atguigu.ct.web.bean.Calllog">select * from ct_call where telid = (SELECTIDfrom ct_userwhere tel = #{tel}) and dateid in (SELECTIDfrom ct_datewhere year = #{year} and month != '' and day = '')</select></mapper>
8.0云服务器部署与搭建
8.1工具准备
序号 | 工具 | 版本 |
---|---|---|
1 | 阿里云服务器 | 1核1G CenterOS7.3 |
2 | Xshell | 7.0 |
3 | Xftp | 7.0 |
4 | Navicat | 15.0 |
5 | JDK | Linux版本:jdk-8u212-linux-x64.tar.gz |
6 | Mysql | Linux版本:mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar |
7 | Tomcat | Linux版本:apache-tomcat-8.5.75.tar.gz |
8.2安装JDK
(1)将文件放入/opt/software目录下
jdk-8u212-linux-x64.tar.gz
(2)将JDK和Hadoop两个文件分别解压
#卸载虚拟机自带的java
rpm -qa | grep -i java | xargs -n1 rpm -e --nodepstar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
(3)添加JDK的环境变量
sudo vim /etc/profile.d/my_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin最后让文件生效:
source /etc/profile
(4)对虚拟机进行重启并验证JDK是否安装成功
java -version
8.3安装Tomcat
(1)上传Tomcat文件包到/opt/softwere
apache-tomcat-8.5.75
(2)解压Tomacat
tar -zxvf apache-tomcat-8.5.75
mv apache-tomcat-8.5.75 /opt/module
8.4安装MySql
1)检查当前系统是否安装过 MySQL
[atguigu@hadoop102 ~]$ rpm -qa|grep mariadb
mariadb-libs-5.5.56-2.el7.x86_64
//如果存在通过如下命令卸载
[atguigu @hadoop102 ~]$ sudo rpm -e --nodeps mariadb-libs
2)将 MySQL 安装包拷贝到/opt/software 目录下
[atguigu @hadoop102 software]# ll
总用量 528384
-rw-r--r--. 1 root root 609556480 3 月 21 15:41 mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar
3)解压 MySQL 安装包
[atguigu @hadoop102 software]# tar -xf mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar
4)在安装目录下执行 rpm 安装
[atguigu @hadoop102 software]$
sudo rpm -ivh mysql-community-common-5.7.28-1.el7.x86_64.rpm
sudo rpm -ivh mysql-community-libs-5.7.28-1.el7.x86_64.rpm
sudo rpm -ivh mysql-community-libs-compat-5.7.28-1.el7.x86_64.rpm
sudo rpm -ivh mysql-community-client-5.7.28-1.el7.x86_64.rpm
sudo rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm
过程中出现报错则说明需要安装依赖,通过 yum 安装缺少的依赖,然后重新安装 mysql-community-server-5.7.28-1.el7.x86_64 即可cd
[atguigu@hadoop102 software] yum install -y libaio
5)删除/etc/my.cnf 文件中 datadir 指向的目录下的所有内容,如果有内容的情况下:
查看 datadir 的值:
[mysqld]
datadir=/var/lib/mysql
删除/var/lib/mysql 目录下的所有内容(一般没有任何东西):
[atguigu @hadoop102 mysql]# cd /var/lib/mysql
[atguigu @hadoop102 mysql]# sudo rm -rf ./* //注意执行命令的位置
6)初始化数据库
[atguigu @hadoop102 opt]$ sudo mysqld --initialize --user=mysql
7)查看临时生成的 root 用户的密码
[atguigu @hadoop102 opt]$ sudo cat /var/log/mysqld.log jZu%i>4hD
8)启动 MySQL 服务
[atguigu @hadoop102 opt]$ sudo systemctl start mysqld
9)登录 MySQL 数据库
[atguigu @hadoop102 opt]$ mysql -uroot -p
Enter password: 输入临时生成的密码
10)必须先修改 root 用户的密码,否则执行其他的操作会报错
mysql> set password = password("fgl123");
11)修改 mysql 库下的 user 表中的 root 用户允许任意 ip 连接
mysql> update mysql.user set host='%' where user='root';
mysql> flush privileges;
12)使用本机Navicat连接远程数据库
8.5配置防火墙
1.查看防火墙服务状态
systemctl status firewalld
2.如果没有开启可进行如下操作
#开启服务
service firewalld start
#关闭服务
service firewalld stop
#重启服务
service firewalld restart
3.查看防火墙状态并开启防火墙
#查看防火墙状态
firewall-cmd --state
#开启防火墙
service mysqld start
4.查看防护墙规则
firewall-cmd --list-all
5.如果没有开放这些端口可使用如下命令开放
#开放80端口
firewall-cmd --permanent --add-port=80/tcp
#开放8080端口
firewall-cmd --permanent --add-port=8080/tcp
#开放3306端口
firewall-cmd --permanent --add-port=3306/tcp#移除端口命令
firewall-cmd --permanent --remove-port=3306/tcp
6.开通完成后重启防火墙
firewall-cmd --reload
7.重新查看端口是否开启
firewall-cmd --list-all
8.6配置云服务器安全组
以阿里云为例
本机已经开启所以手动添加为灰色
8.7Tomcat配置与启动
1.将war包放到tomcat目录下的webapp中
2.文件配置修改con中的server.xml配置文件
#进入conf目录
cd conf#修改配置文件
vim server.xml#将8080端口该为80端口#按照个人war包在系统中的地址进行配置
<Context docBase="/opt/module/apache-tomcat-8.5.75/webapps/ssmhe" path="" reloadable="false"/>
3.启动Tomcat
#进入bin目录#开启tomcat(等待20秒)
./startup.sh#关闭tomcat
./shutdown.sh#重新开启
./startup.sh
4.通过云服务器公网IP在浏览器中进行访问
如: 47.94.139.13
可通过购买域名与公网IP进行配置从而让用户使用域名进行访问xs
8.8补充知识
8.8.1如何将项目打war包
1.在IDEA中打开Project Structure
2.选择
3.点击+号
4.选择
5.配置并打包
8.8.2如何在本地Tomcat运行war包
1.将war包放入本地Tomcat目录中的webapp下
2.在tomcat bin目录下打开终端输入.\startup.bat
3.弹出运行框正常运行即可访问web项目
4.关闭Tomcat
9.0 项目总结
9.1项目背景
通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等。我们以此为背景,寻找一个切入点,学习其中的方法论。当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。
9.2项目流程
9.3项目环境
1.工具
工具 | 版本 |
---|---|
IDEA | 2020.2 |
Maven | 3.3.9 |
JDK | 1.8+ |
Navicat | 15 |
Tomcat | 8.5.75 |
CenertOS | 7.0 |
Xshell | 7 |
2.框架信息
框架 | 版本 |
---|---|
Hadoop | 2.7.2 |
Zookeeper | 3.5.7 |
Hbase | 1.3.1 |
Flume | 1.9.0 |
Kafka | 2.1.2-3.0.0 |
Redis | 3.0.0 |
3.集群环境
hadoop102 | hadoop103 | hadoop104 | |
---|---|---|---|
内存 | 4G | 4G | 4G |
CPU | 2核 | 2核 | 2核 |
硬盘 | 50G | 50G | 50G |
进程 | hadoop102 | hadoop103 | hadoop104 |
---|---|---|---|
Hadoop HDFS | NameNode | SecondaryNameNode | |
DataNode | DataNode | DataNode | |
Hadoop YARN | ResourceManager | ||
NodeManager | NodeManager | NodeManager | |
Hadoop历史服务器 | JobHistoryServer | ||
Zookeeper | QuorumPeerMain | QuorumPeerMain | QuorumPeerMain |
Kafka | Kafka | Kafka | Kafka |
HBase | HMaster | ||
HRegionServer | HRegionServer | HRegionServer | |
jsp | jsp | jsp |
9.4项目运行图
1.查询界面
2.统计界面
附件分享:
个人gitee笔记:https://gitee.com/fanggaolei/learning-notes-warehouse
包含JavaWEB 大数据 算法 SQL Java等typora文档。附带对应的SSM项目和大数据项目,欢迎大家多多Starred