电信客服项目笔记

news/2024/10/18 3:23:46/

笔记目录

  • 电信客服案例笔记
  • 1.0 Hadoop准备
    • 1.1克隆虚拟机102,103,104
    • 1.2伪分布式的测试
    • 1.3完全分布式搭建
    • 1.4对配置文件进行配置
  • 2.0 zookeeper准备
    • 2.1解压安装
    • 2.2文件配置
  • 3.0HBase准备
    • 3.1解压安装、环境变量
    • 3.2文件配置
  • 4.0 Flume准备
    • 4.1解压安装
    • 4.2 配置
  • 5.0 Kafka准备
    • 5.1解压安装配置环境变量
    • 5.2集群启动
  • 6.0Redis环境配置
  • 7.0业务流程
    • 7.1 模块①—数据的生产-采集-消费(P1-P13)
      • 7.1.1创建生产者
        • P6尚硅谷_数据生产 - 创建生产者对象04:25
        • P7尚硅谷_数据生产 - 获取通讯录数据33:32
        • P8尚硅谷_数据生产 - 随机生成主被叫电话号码12:59
        • P9尚硅谷_数据生产 - 构建通话记录25:40
      • 7.1.3数据采集和消费
    • 7.2 模块②—HBase数据消费(P14-P22)
      • 7.2.1建表
        • P14尚硅谷__数据消费 - Hbase数据访问封装32:21
      • 7.2.4配置协处理器
    • 7.3 模块③—导出数据及运算(P23-P29)
      • 7.3.1数据库表设计
      • 7.3.2数据计算
    • 7.4 模块④—数据展示
  • 8.0云服务器部署与搭建
    • 8.1工具准备
    • 8.2安装JDK
    • 8.3安装Tomcat
    • 8.4安装MySql
    • 8.5配置防火墙
    • 8.6配置云服务器安全组
    • 8.7Tomcat配置与启动
    • 8.8补充知识
      • 8.8.1如何将项目打war包
      • 8.8.2如何在本地Tomcat运行war包
  • 9.0 项目总结
    • 9.1项目背景
    • 9.2项目流程
    • 9.3项目环境
    • 9.4项目运行图
        • 附件分享:

电信客服案例笔记

本教程主要根据,尚硅谷电信客服项目,从集群的配置,到对业务流程的剖析,同时配备了很多相应的业务架构图,笔记后面部分,包含将本项目部署到服务器中,以及页面展示,希望本教程可以为你带来一定的参考,感谢支持。

如需获取项目源码请点击访问:https://gitee.com/fanggaolei/learning-notes-warehouse
1660893847839

1.0 Hadoop准备

1659765478218

1659766068823

1.1克隆虚拟机102,103,104

(1)修改克隆虚拟机的静态IP

vim /etc/sysconfig/network-scripts/ifcfg-ens33

(2)修改主机名

vim /etc/hostname

(3)连接xshell和XFTP

1.2伪分布式的测试

1659766135778

(1)将下面六个文件放入/opt/software目录下

 apache-flume-1.9.0-bin.tar.gzapache-zookeeper-3.5.7-bin.tar.gzhadoop-3.1.3.tar.gzhbase-2.4.11-bin.tar.gzjdk-8u212-linux-x64.tar.gzkafka_2.12-3.0.0.tgz

(2)将JDK和Hadoop两个文件分别解压

tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/

(3)添加JDK和Hadoop的环境变量

sudo vim /etc/profile.d/my_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin最后让文件生效:
source /etc/profile

(4)对虚拟机进行重启并验证hadoop和JDK是否安装成功

java -version
hadoop version Hadoop 3.1.3

可以看到

Hadoop 3.1.3  
Source code repository https://gitbox.apache.org/repos/asf/hadoop.git -r ba631c436b806728f8ec2f54ab1e289526c90579
Compiled by ztang on 2019-09-12T02:47Z
Compiled with protoc 2.5.0
From source with checksum ec785077c385118ac91aadde5ec9799
This command was run using /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-common-3.1.3.jarjava version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)

跳过伪分布式启动

1.3完全分布式搭建

1659767783263

(1)首先进行ssh免密登录

生成公钥和私钥
[atguigu@hadoop102 .ssh]$ pwd
/home/atguigu/.ssh[atguigu@hadoop102 .ssh]$ ssh-keygen -t rsa
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)将公钥拷贝到要免密登录的目标机器上(每个机器上均执行以下三个命令)
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop102
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop103
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop104

(2)测试成功后进行xsync配置

(a)在/home/atguigu/bin目录下创建xsync文件
[atguigu@hadoop102 opt]$ cd /home/atguigu
[atguigu@hadoop102 ~]$ mkdir bin
[atguigu@hadoop102 ~]$ cd bin
[atguigu@hadoop102 bin]$ vim xsync
在该文件中编写如下代码#!/bin/bash#1. 判断参数个数
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
doecho ====================  $host  ====================#3. 遍历所有目录,挨个发送for file in $@do#4. 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir=$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done(b)修改脚本 xsync 具有执行权限
[atguigu@hadoop102 bin]$ chmod +x xsync(c)测试脚本
[atguigu@hadoop102 ~]$ xsync /home/atguigu/bin(d)将脚本复制到/bin中,以便全局调用
[atguigu@hadoop102 bin]$ sudo cp xsync /bin/(e)同步环境变量配置(root所有者)
[atguigu@hadoop102 ~]$ sudo ./bin/xsync /etc/profile.d/my_env.sh
注意:如果用了sudo,那么xsync一定要给它的路径补全。让环境变量生效
[atguigu@hadoop103 bin]$ source /etc/profile
[atguigu@hadoop104 opt]$ source /etc/profile

(3)检查103 104 上的环境变量是否生效,若果没有生效可以重启虚拟机再次检查

java -version
hadoop version Hadoop 3.1.3

1.4对配置文件进行配置

1659770217317

(1)核心配置文件

[atguigu@hadoop102 ~]$ cd $HADOOP_HOME/etc/hadoop
[atguigu@hadoop102 hadoop]$ vim core-site.xml<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><!-- 指定NameNode的地址 --><property><name>fs.defaultFS</name><value>hdfs://hadoop102:8020</value></property><!-- 指定hadoop数据的存储目录 --><property><name>hadoop.tmp.dir</name><value>/opt/module/hadoop-3.1.3/data</value></property><!-- 配置HDFS网页登录使用的静态用户为atguigu --><property><name>hadoop.http.staticuser.user</name><value>atguigu</value>  !!!记得换成自己的用户名</property>
</configuration>

(2)HDFS 配置文件

[atguigu@hadoop102 hadoop]$ vim hdfs-site.xml<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><!-- nn web端访问地址--><property><name>dfs.namenode.http-address</name><value>hadoop102:9870</value></property><!-- 2nn web端访问地址--><property><name>dfs.namenode.secondary.http-address</name><value>hadoop104:9868</value></property>
</configuration>

(3)YARN 配置文件

[atguigu@hadoop102 hadoop]$ vim yarn-site.xml<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><!-- 指定MR走shuffle --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><!-- 指定ResourceManager的地址--><property><name>yarn.resourcemanager.hostname</name><value>hadoop103</value></property><!-- 环境变量的继承 --><property><name>yarn.nodemanager.env-whitelist</name><value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value></property>
<!-- 开启日志聚集功能 -->
<property><name>yarn.log-aggregation-enable</name><value>true</value>
</property>
<!-- 设置日志聚集服务器地址 -->
<property>  <name>yarn.log.server.url</name>  <value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!-- 设置日志保留时间为7天 -->
<property><name>yarn.log-aggregation.retain-seconds</name><value>604800</value>
</property>
</configuration>

(4)MapReduce 配置文件

[atguigu@hadoop102 hadoop]$ vim mapred-site.xml<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><!-- 指定MapReduce程序运行在Yarn上 --><property><name>mapreduce.framework.name</name><value>yarn</value></property>
<!-- 历史服务器端地址 -->
<property><name>mapreduce.jobhistory.address</name><value>hadoop102:10020</value>
</property><!-- 历史服务器web端地址 -->
<property><name>mapreduce.jobhistory.webapp.address</name><value>hadoop102:19888</value>
</property>
</configuration>

(5)配置 workers

[atguigu@hadoop102 hadoop]$ vim /opt/module/hadoop-3.1.3/etc/hadoop/workershadoop102
hadoop103
hadoop104同步所有配置
[atguigu@hadoop102 hadoop]$ xsync /opt/module/hadoop-3.1.3/etc

(6)初始化NameNode

[atguigu@hadoop102 hadoop-3.1.3]$ hdfs namenode -format

(7)编写myhadoop.sh脚本

[atguigu@hadoop102 ~]$ cd /home/atguigu/bin
[atguigu@hadoop102 bin]$ vim myhadoop.sh#!/bin/bashif [ $# -lt 1 ]
thenecho "No Args Input..."exit ;
ficase $1 in
"start")echo " =================== 启动 hadoop集群 ==================="echo " --------------- 启动 hdfs ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"echo " --------------- 启动 yarn ---------------"ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"echo " --------------- 启动 historyserver ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")echo " =================== 关闭 hadoop集群 ==================="echo " --------------- 关闭 historyserver ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"echo " --------------- 关闭 yarn ---------------"ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"echo " --------------- 关闭 hdfs ---------------"ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)echo "Input Args Error..."
;;
esac记得添加执行权限
[atguigu@hadoop102 bin]$ chmod +x myhadoop.sh 

(8)编写jpsall脚本

[atguigu@hadoop102 ~]$ cd /home/atguigu/bin
[atguigu@hadoop102 bin]$ vim jpsal#!/bin/bashfor host in hadoop102 hadoop103 hadoop104
doecho =============== $host ===============ssh $host jps 
done添加脚本并分发脚本
[atguigu@hadoop102 bin]$ chmod +x jpsall
[atguigu@hadoop102 ~]$ xsync /home/atguigu/bin/
在103 104上添加执行权限

(9)启动集群

myhadoop.sh start  启动集群jpsall      查看所有节点是否启动

(10)查看3个网址是否可以正常访问

http://hadoop102:9870
http://hadoop103:8088/
http://hadoop102:19888/jobhistory

如果节点都正常启动,页面无法访问,检查hosts文件是否修改,如果无误,可以重写启动,并进行测试

2.0 zookeeper准备

2.1解压安装

解压
[atguigu@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/更名
[atguigu@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7

2.2文件配置

配置服务器编号

(1)在/opt/module/zookeeper-3.5.7/这个目录下创建 zkData

[atguigu@hadoop102 zookeeper-3.5.7]$ mkdir zkData

(2)在/opt/module/zookeeper-3.5.7/zkData 目录下创建一个 myid 的文件

[atguigu@hadoop102 zkData]$ vi myid在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)2[atguigu@hadoop102 module ]$ xsync zookeeper-3.5.7vim /opt/module/zookeeper-3.5.7/zkData/myid
并分别在 hadoop103、hadoop104 上修改 myid 文件中内容为 3、4

(3)配置zoo.cfg文件

(1)重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg
[atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg(2)打开 zoo.cfg 文件
[atguigu@hadoop102 conf]$ vim zoo.cfg#修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.5.7/zkData
#增加如下配置
#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888(3)同步 zoo.cfg 配置文件
[atguigu@hadoop102 conf]$ xsync zoo.cfg

(4)编写启停脚本

1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本
[atguigu@hadoop102 bin]$ vim zk.sh#!/bin/bash
case $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 启动 ------------ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 停止 ------------    ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"done
};;
"status"){for i in hadoop102 hadoop103 hadoop104doecho ---------- zookeeper $i 状态 ------------    ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"done
};;
esac2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod u+x zk.sh3)Zookeeper 集群启动脚本
[atguigu@hadoop102 module]$ zk.sh start4)Zookeeper 集群停止脚本
[atguigu@hadoop102 module]$ zk.sh stop输入jpsall正常启动即可

(5)zookeeper的操作

启动zookeeper客户端
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop102:2181查看zookeeper节点信息
[zk: hadoop102:2181(CONNECTED) 0] ls /

3.0HBase准备

​ HBase 通过 Zookeeper 来做 master 的高可用、记录 RegionServer 的部署信息、并且存储有 meta 表的位置信息。

​ HBase 对于数据的读写操作时直接访问 Zookeeper 的,在 2.3 版本推出 Master Registry 模式,客户端可以直接访问 master。使用此功能,会加大对 master 的压力,减轻对 Zookeeper的压力。

3.1解压安装、环境变量

1)解压 Hbase 到指定目录
[atguigu@hadoop102 software]$ tar -zxvf hbase-2.4.11-bin.tar.gz -C /opt/module/
[atguigu@hadoop102 software]$ mv /opt/module/hbase-2.4.11 /opt/module/hbase2)配置环境变量
[atguigu@hadoop102 ~]$ sudo vim /etc/profile.d/my_env.sh添加
:3)使用 source 让配置的环境变量生效
[atguigu@hadoop102 module]$ source /etc/profile.d/my_env.sh[fang@hadoop102 hbase]$ sudo /home/fang/bin/xsync /etc/profile.d/my_env.sh 

3.2文件配置

1)hbase-env.sh 修改内容,可以添加到最后:

在hbase/conf目录下

[fang@hadoop102 conf]$ vim hbase-env.sh最后面添加一句
export HBASE_MANAGES_ZK=false

2)hbase-site.xml 修改内容: 把原先标签内的内容删掉,直接替换成下方的即可

<property><name>hbase.cluster.distributed</name><value>true</value>
</property><property><name>hbase.zookeeper.quorum</name><value>hadoop102,hadoop103,hadoop104</value><description>The directory shared by RegionServers.</description></property><!-- <property>-->
<!-- <name>hbase.zookeeper.property.dataDir</name>-->
<!-- <value>/export/zookeeper</value>-->
<!-- <description> 记得修改 ZK 的配置文件 -->
<!-- ZK 的信息不能保存到临时文件夹-->
<!-- </description>-->
<!-- </property>--><property><name>hbase.rootdir</name><value>hdfs://hadoop102:8020/hbase</value><description>The directory shared by RegionServers.</description></property><property><name>hbase.cluster.distributed</name><value>true</value></property>

3)修改regionservers

hadoop102 
hadoop103 
hadoop104 

4)解决 HBase 和 Hadoop 的 log4j 兼容性问题,修改 HBase 的 jar 包,使用 Hadoop 的 jar 包

[atguigu@hadoop102hbase]$mv /opt/module/hbase/lib/client-facing-thirdparty/slf4j-reload4j-1.7.33.jar /opt/module/hbase/lib/client-facing-thirdparty/slf4j-reload4j-1.7.33.jar.bak发送到其他
xsync hbase/

5)Hbase启停

[atguigu@hadoop102 hbase]$ bin/start-hbase.sh
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh

4.0 Flume准备

4.1解压安装

解压
[atguigu@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/改名
[atguigu@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
[atguigu@hadoop102 lib]$ rm /opt/module/flume/lib/guava-11.0.2.jar

4.2 配置

(1)安装 netcat 工具
[atguigu@hadoop102 software]$ sudo yum install -y nc(2)判断 44444 端口是否被占用
[atguigu@hadoop102 flume-telnet]$ sudo netstat -nlp | grep 44444(3)创建 Flume Agent 配置文件 flume-netcat-logger.conf
(4)在 flume 目录下创建 job 文件夹并进入 job 文件夹。
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ cd job/(5)在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。
[atguigu@hadoop102 job]$ vim flume-netcat-logger.conf(6)在 flume-netcat-logger.conf 文件中添加如下内容。
添加内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1(7)先开启 flume 监听端口
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console复制102会话(8)使用 netcat 工具向本机的 44444 端口发送内容
[atguigu@hadoop102 ~]$ nc localhost 44444

到这就可以了,项目中会继续进行配置

5.0 Kafka准备

5.1解压安装配置环境变量

解压
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/改名
[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka进入到/opt/module/kafka 目录,修改配置文件
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties修改三个地方:记得找对相应的位置#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka分发
[atguigu@hadoop102 module]$ xsync kafka/分别在 hadoop103 和 hadoop104 上修改配置文件
[fang@hadoop102 flume]$ vim /opt/module/kafka/config/server.properties 
中的 broker.id=1、broker.id=2在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
[atguigu@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新
[atguigu@hadoop102 module]$ source /etc/profile分发并刷新
[atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
[atguigu@hadoop103 module]$ source /etc/profile
[atguigu@hadoop104 module]$ source /etc/profile

5.2集群启动

(1)先启动 Zookeeper 集群,然后启动 Kafka。
[atguigu@hadoop102 kafka]$ zk.sh start(2)集群脚本
1)在/home/atguigu/bin 目录下创建文件 kf.sh 脚本文件
[atguigu@hadoop102 bin]$ vim kf.sh#! /bin/bash
case $1 in"start" ){for i in hadoop102 hadoop103 hadoop104;doecho "==============$i start=============="ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;"stop" ){for i in hadoop102 hadoop103 hadoop104;doecho "==============$i stop=============="ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"done
};;
esac2)添加执行权限
[atguigu@hadoop102 bin]$ chmod +x kf.sh
3)启动集群命令
[atguigu@hadoop102 ~]$ kf.sh start
4)停止集群命令
[atguigu@hadoop102 ~]$ kf.sh stop

6.0Redis环境配置

1.将软件包上传到/opt/softwere目录下,解压到/opt/modul下

[root@linux softwere]# tar -zxvf redis-3.0.0.tar.gz -C /opt/module/

2.进入redis目录进行启动

[root@linux redis-3.0.0]# make

出现如下报错:没有找到gcc,此时需要安装gcc

[root@linux module]# yum -y install gcc

1660718220098

安装完成后验证是否安装成功

[root@linux module]# gcc -v

1660721470107

3.再次进行make出现如下报错

1660721564091

此时执行

#清理残余文件
make distclean#再次进行make
make

执行成功

1660721704391

4.执行

make install

1660721859955

5.查看安装目录

[root@linux redis-3.0.0]# cd /usr/local/bin/

1660722008875

6.回到Redis目录备份文件

#在根目录创建文件夹
[root@linux /]# mkdir myredis#将redis.conf移动到文件夹汇总
[root@linux redis-3.0.0]# cp redis.conf /myredis/#在myredis目录中编辑文件
[root@linux myredis]# vi redis.conf 

1660722373407

#切换目录
[root@linux myredis]# cd /usr/local/bin/#启动redis
[root@linux bin]# redis-server /myredis/redis.conf #查看客户端
[root@linux bin]# redis-cli -p 6379

1660722559207

测试

1660722595954

查看redis在后台是否启动

[root@linux bin]# ps -ef|grep redis

1660722694207

集群环境查看

此时输入:jpsall 可查看到一下节点相应信息

软件hadoop102hadoop103hadoop104
Hadoop HDFSNameNodeSecondaryNameNode
DataNodeDataNodeDataNode
Hadoop YARNResourceManager
NodeManagerNodeManagerNodeManager
Hadoop历史服务器JobHistoryServer
ZookeeperQuorumPeerMainQuorumPeerMainQuorumPeerMain
KafkaKafkaKafkaKafka
HBaseHMaster
HRegionServerHRegionServerHRegionServer
jspjspjsp

1660900727767

集群停止脚本:**

Hadoop启停:
myhadoop.sh start
myhadoop.sh stopHBase启停:
bin/start-hbase.sh
bin/stop-hbase.shZookeeper启停: 先启停Zookeeper再启停Kafka
zk.sh start
zk.sh stopKafKa启停:
kf.sh start
kf.sh stop各大网址:
http://hadoop102:9870
http://hadoop103:8088/
http://hadoop102:19888/jobhistory
http://hadoop102:16010/master-status

注意:

Hbase启动前先启动zookeeper再启动Hadoop

必须先关闭KafKa再关闭Zookeeper

7.0业务流程

1660893847839

7.1 模块①—数据的生产-采集-消费(P1-P13)

1660893874562

7.1.1创建生产者

P6尚硅谷_数据生产 - 创建生产者对象04:25

public class Bootstrap {public static void main(String[] args) throws IOException {if(args.length<2){System.out.println("系统参数不正确,请按照指定格式传递");System.exit(1);}//构建生产者对象Producer producer=new LocalFileProducer();//        producer.setIn(new LocalFileDataIn("D:\\大数据学习资料\\尚硅谷大数据技术之电信客服综合案例\\2.资料\\辅助文档\\contact.log"));
//        producer.setOut(new LocalFileDataOut("D:\\大数据学习资料\\尚硅谷大数据技术之电信客服综合案例\\2.资料\\辅助文档\\call.log"));producer.setIn(new LocalFileDataIn(args[0]));producer.setOut(new LocalFileDataOut(args[1]));//生产数据producer.producer();//关闭生产者对象producer.close();}
}

P7尚硅谷_数据生产 - 获取通讯录数据33:32

创建输入流,将数据保存到集合中

public class LocalFileDataIn implements DataIn {private BufferedReader reader = null;/*** 构造方法* @param path*/public LocalFileDataIn(String path) {setPath(path);}/*** 设置路径* 创建输入流* @param path*/public void setPath(String path) {try {reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();}}public Object read() throws IOException {return null;}/*** 读取数据,将数据返回到集合中** @param <T>* @param clazz* @throws IOException* @return*/public <T extends Data> List<T> read(Class<T> clazz) throws IOException {List<T> ts = new ArrayList<T>();try {//从数据文件中读取所有的数据String line = null;while ((line = reader.readLine()) != null) {//将数据转换为指定类型的对象,封装为集合返回T t = (T) clazz.newInstance();t.setValue(line);ts.add(t);}} catch (Exception e) {e.printStackTrace();}return ts;}/*** 关闭资源* @throws IOException*/public void close() throws IOException {if (reader!=null){reader.close();}}
}

P8尚硅谷_数据生产 - 随机生成主被叫电话号码12:59

从集合中获取两条数据构成主叫和被叫,并随机生成通话 日期和时间

/*** 本地数据文件的生产者*/
public class LocalFileProducer implements Producer {/*** 数据来源* @param in*/private DataIn in;private DataOut out;private volatile boolean flg=true; //增强内存可见性public void setIn(DataIn in) {this.in=in;}/*** 数据输出* @param out*/public void setOut(DataOut out) {this.out=out;}/*** 数据生产*/public void producer() {/*** 数据返回类型为一个对象*/try {List<Contact> contacts=in.read(Contact.class);//读取通讯录的数据while ( flg ){int call1Index=new Random().nextInt(contacts.size());int call2Index;while (true){call2Index=new Random().nextInt(contacts.size());if (call1Index!=call2Index){break;}}Contact call1=contacts.get(call1Index);Contact call2=contacts.get(call2Index);//生成随机的通话时间String startDate="20180101000000";  //开始时间String endDate="20190101000000";    //结束时间long startTime= DataUtil.parse(startDate,"yyyyMMddHHmmss").getTime();//通话时间long endtime=DataUtil.parse(endDate,"yyyyMMddHHmmss").getTime();//通话时间字符串long  calltime=startTime+(long)((endtime-startTime)* Math.random());//通话时间字符串String callTimeString=DataUtil.format(new Date(calltime),"yyyyMMddHHmmss");//生成随机的通话时长String duration= NumberUtil.format(new Random().nextInt(3000),4);//生成通话记录Callog log=new Callog(call1.getTel(),call2.getTel(),callTimeString,duration);System.out.println(log);//将通话记录写入到数据文件中out.write(log);Thread.sleep(500);}}catch (Exception e){e.printStackTrace();}}/*** 关闭生产者* @throws IOException*/public void close() throws IOException {if (in!=null){in.close();}if (out!=null){out.close();}}
}

P9尚硅谷_数据生产 - 构建通话记录25:40

将数据封装为对象输出到通话日志中

/*** 本地文件数据输出*/
public class LocalFileDataOut implements DataOut {private PrintWriter writer=null;public LocalFileDataOut(String path){setPath(path);}/*** 设置路径* @param path*/public void setPath(String path) {try {writer=new PrintWriter(new OutputStreamWriter(new FileOutputStream(path),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();}}public void write(Object data) throws Exception {write(data.toString());}/*** 将数据字符串生成到文件中* @param data* @throws Exception*/public void write(String data) throws Exception {writer.println(data);writer.flush();}/*** 释放资源* @throws IOException*/public void close() throws IOException {if(writer!=null){writer.close();}}
}
public class Callog {private String call1;private String call2;private String calltime;private String duration;public Callog(String call1, String call2, String calltime, String duration) {this.call1 = call1;this.call2 = call2;this.calltime = calltime;this.duration = duration;}@Overridepublic String toString() {return call1+"\t"+call2+"\t"+calltime+"\t"+duration;}public String getCall1() {return call1;}public void setCall1(String call1) {this.call1 = call1;}public String getCall2() {return call2;}public void setCall2(String call2) {this.call2 = call2;}public String getCalltime() {return calltime;}public void setCalltime(String calltime) {this.calltime = calltime;}public String getDuration() {return duration;}public void setDuration(String duration) {this.duration = duration;}
}

7.1.3数据采集和消费

配置Flume的配置文件监听日志信息和Kafka输出存储区

在kafka创建一个ct的数据存储区开启kafka获取到数据

到指定目录下配置flume

[fang@hadoop102 kafka]$ cd /opt/module/data/
[fang@hadoop102 data]$ vim flume-2-kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/call.log
a1.sources.r1.shell = /bin/bash -c# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动jar包:

[fang@hadoop102 data]$ java -jar ct_producer.jar contact.log call.log 

在kafka目录下创建topic:

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic ct --partitions 3 --replication-factor 2

kafka开始消费

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 -topic ct

启动flume对数据进行采集:

bin/flume-ng agent -c conf/ -n a1 -f /opt/module/data/flume-2-kafka.conf 

使用Kafka消费者API获取数据,并将数据发送到Hbase中

/*** 启动Kafka消费者**/
//使用kafka消费者获取Flume采集数据//将数据存储到HBase中
public class Bootstrap {public static void main(String[] args) throws IOException {//创建消费者Consumer consumer=new CallogConsumer();//消费数据consumer.consume();//关闭资源consumer.close();}
}
/*** 通话日志消费者*/
public class CallogConsumer implements Consumer {/*** 消费数据*/public void consume() {try {//创建配置对象Properties prop =new Properties();prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));//获取flume采集的数据KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(prop);//关注主题consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));//HBase数据访问对象HBaseDao dao=new HBaseDao();dao.init();//初始化HBase//消费数据while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(100);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());dao.insertData(consumerRecord.value());//将数据插入HBase中//Callog log=new Callog(consumerRecord.value());//dao.insertData(log);}}} catch (IOException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}/*** 关闭资源* @throws IOException*/public void close() throws IOException {}
}

7.2 模块②—HBase数据消费(P14-P22)

1660893904098

7.2.1建表

P14尚硅谷__数据消费 - Hbase数据访问封装32:21

在Hbase中创建对应的表格,设置两个列族和rowkey

/*** HBase的数据访问对象*/
public class HBaseDao extends BaseDao {/*** 初始化*/public void init() throws Exception {start();//创建命名空间ctcreateNamespaseNX(Names.NAMESPACE.getValue());//创建表名ct:calllog 分区数为6createTableXX(Names.TABLE.getValue(),"com.fang.ct.consumer.coprocessor.InsertCalleeCoprocessor", 6,Names.CF_CALLER.getValue(),Names.CF_CALLEE.getValue());end();}/*** 插入对象* @param log* @throws Exception*/public void insertData(Callog log) throws Exception{log.setRowkey(genRegionNum(log.getCall1(),log.getCalltime())+"_"+log.getCall1()+"_"+log.getCalltime()+"_"+log.getCall2()+"_"+log.getDuration());putData(log);}/*** 插入数据*/public void insertData(String value) throws Exception{//将通话日志保存到Hbase的表中//1.获取通话日志数据String[] values= value.split("\t");String call1=values[0];String call2=values[1];String calltime=values[2];String duration=values[3];//2.创建数据对象//rowkey设计//主叫用户String rowkey=genRegionNum(call1,calltime)+"_"+call1+"_"+calltime+"_"+call2+"_"+duration+ "_1";Put put =new Put(Bytes.toBytes(rowkey));byte[] family=Bytes.toBytes(Names.CF_CALLER.getValue());put.addColumn(family,Bytes.toBytes("call1"),Bytes.toBytes(call1));put.addColumn(family,Bytes.toBytes("call2"),Bytes.toBytes(call2));put.addColumn(family,Bytes.toBytes("calltime"),Bytes.toBytes(calltime));put.addColumn(family,Bytes.toBytes("duration"),Bytes.toBytes(duration));put.addColumn(family, Bytes.toBytes("flg"), Bytes.toBytes("1"));String calleeRowkey = genRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";//        // 被叫用户
//        Put calleePut = new Put(Bytes.toBytes(calleeRowkey));
//        byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));// 3. 保存数据List<Put> puts = new ArrayList<Put>();puts.add(put);// puts.add(calleePut);putData(Names.TABLE.getValue(), puts);}
}

获取连接对象,设置表的相关方法,生成分区键,分区号,数据的插入,删除等

/*** 基础的数据访问对象*/
public abstract class BaseDao {/****/private ThreadLocal<Connection> connHolder=new ThreadLocal<Connection>();private ThreadLocal<Admin> adminHolder=new ThreadLocal<Admin>();protected void start() throws Exception{getConnection();getAdmin();}protected void end() throws Exception{Admin admin=getAdmin();if (admin!=null){admin.close();adminHolder.remove();}Connection conn=getConnection();if (conn!=null){conn.close();;connHolder.remove();}}/*** 创建表,如果表已经存在,name删除后创建新的* @param name* @param family* @throws IOException*/protected void createTableXX(String name,String... family) throws Exception {createTableXX(name,null,null,family);}protected void createTableXX(String name,String coprocessorClass,Integer regionCount,String... family) throws Exception {Admin admin=getAdmin();TableName tableName= TableName.valueOf(name);if (admin.tableExists(tableName)){//表存在,删除表deleteTable(name);}//创建表createTable(name,coprocessorClass,regionCount,family);}private void createTable(String name,String coprocessorClass,Integer regionCount,String... family) throws Exception{Admin admin=getAdmin();TableName tablename=TableName.valueOf(name);HTableDescriptor tableDescriptor=new HTableDescriptor(tablename);if (family==null||family.length==0){family=new String[1];family[0]= Names.CF_INFO.getValue();}for (String families : family) {HColumnDescriptor columnDescriptor=new HColumnDescriptor(families);tableDescriptor.addFamily(columnDescriptor);}if(coprocessorClass!=null&& !"".equals(coprocessorClass)){tableDescriptor.addCoprocessor(coprocessorClass);}//增加预分区if(regionCount==null||regionCount<=0){admin.createTable(tableDescriptor);}else {//分区键byte[][] splitKeys=genSpliKeys(regionCount);admin.createTable(tableDescriptor,splitKeys);}}/*** 获取查询时startrow,stoprom集合* @return*/protected  List<String[]> getStartStorRowkeys(String tel,String start,String end){List<String[]> rowkeyss=new ArrayList<String[]>();String startTime = start.substring(0, 6);String endTime = end.substring(0, 6);Calendar startCal = Calendar.getInstance();startCal.setTime(DataUtil.parse(startTime, "yyyyMM"));Calendar endCal = Calendar.getInstance();endCal.setTime(DataUtil.parse(endTime, "yyyyMM"));while (startCal.getTimeInMillis() <= endCal.getTimeInMillis()) {// 当前时间String nowTime = DataUtil.format(startCal.getTime(), "yyyyMM");int regionNum = genRegionNum(tel, nowTime);String startRow = regionNum + "_" + tel + "_" + nowTime;String stopRow = startRow + "|";String[] rowkeys = {startRow, stopRow};rowkeyss.add(rowkeys);// 月份+1startCal.add(Calendar.MONTH, 1);}return  rowkeyss;}/*** 计算分区号* @param tel* @param date* @return*/protected  int genRegionNum(String tel,String date){String usercode=tel.substring(tel.length()-4);String yearMonth=date.substring(0,6);int userCodehash=usercode.hashCode();int yearMonthHash=yearMonth.hashCode();//crc校验采用异或算法int crc=Math.abs(userCodehash ^ yearMonthHash);//取模int regionNum=crc% ValueConstant.REGION_COUNT;return regionNum;}/*** 生成分区键* @return*/private  byte[][] genSpliKeys(int regionCount){int splitkeyCount=regionCount-1;byte[][] bs=new byte[splitkeyCount][];//0,1,2,3,4List<byte[]> bsList=new ArrayList<byte[]>();for (int i = 0; i < splitkeyCount; i++) {String splitkey=i+"|";bsList.add(Bytes.toBytes(splitkey));}//Collections.sort(bsList,new Bytes.ByteArrayComparator());bsList.toArray(bs);return bs;}/*** 增加对象:自动封装数据,将对象数据直接保存到Hbase中* @param obj* @throws Exception*/protected void putData(Object obj) throws Exception{// 反射Class clazz = obj.getClass();TableRef tableRef = (TableRef)clazz.getAnnotation(TableRef.class);String tableName = tableRef.value();Field[] fs = clazz.getDeclaredFields();String stringRowkey = "";for (Field f : fs) {Rowkey rowkey = f.getAnnotation(Rowkey.class);if ( rowkey != null ) {f.setAccessible(true);stringRowkey = (String)f.get(obj);break;}}//获取表对象Connection connection=getConnection();Table table = connection.getTable(TableName.valueOf(tableName));Put put=new Put(Bytes.toBytes(stringRowkey));for (Field f : fs) {Column column = f.getAnnotation(Column.class);if (column != null) {String family = column.family();String colName = column.column();if (colName == null || "".equals(colName)) {colName = f.getName();}f.setAccessible(true);String value = (String) f.get(obj);put.addColumn(Bytes.toBytes(family), Bytes.toBytes(colName), Bytes.toBytes(value));}}//增加数据table.put(put);//关闭表table.close();}/*** 增加多条数据* @param name* @param puts* @throws IOException*/protected void putData( String name, List<Put> puts ) throws Exception {// 获取表对象Connection conn = getConnection();Table table = conn.getTable(TableName.valueOf(name));// 增加数据table.put(puts);// 关闭表table.close();}/*** 增加数据* @param name* @param put*/protected  void putData(String name, Put put) throws IOException {//获取表对象Connection connection=getConnection();Table table = connection.getTable(TableName.valueOf(name));//增加数据table.put(put);//关闭表table.close();}/*** 删除表格* @param name* @throws Exception*/protected  void deleteTable(String name) throws Exception{TableName tableName=TableName.valueOf(name);Admin admin=getAdmin();admin.disableTable(tableName);admin.deleteTable(tableName);}/*** 创建命名空间,如果命名空间已经存在,不需要创建,否则创建新的* @param namespace*/protected void createNamespaseNX(String namespace) throws IOException {Admin admin =getAdmin();try{admin.getNamespaceDescriptor(namespace);}catch (NamespaceNotFoundException e){admin.createNamespace(NamespaceDescriptor.create(namespace).build());}}/*** 获取管理对象*/protected synchronized Admin getAdmin() throws IOException {Admin admin = adminHolder.get();if(admin==null){admin=getConnection().getAdmin();adminHolder.set(admin);}return admin;}/*** 获取连接对象* @return*/protected synchronized Connection getConnection() throws IOException {Connection conn=connHolder.get();if(conn==null){Configuration conf= HBaseConfiguration.create();conn= ConnectionFactory.createConnection(conf);connHolder.set(conn);}return conn;}}

7.2.4配置协处理器

用于区分主叫和被叫,减少数据的插入量

/**** 使用协处理器保存被叫用户的数据** 协处理器的使用* 1. 创建类* 2. 让表找到协处理类(和表有关联)* 3. 将项目打成jar包发布到hbase中(关联的jar包也需要发布),并且需要分发*/
public class InsertCalleeCoprocessor extends BaseRegionObserver {// 方法的命名规则// login// logout// prePut// doPut :模板方法设计模式//    存在父子类://    父类搭建算法的骨架//    1. tel取用户代码,2时间取年月,3,异或运算,4 hash散列//    子类重写算法的细节//    do1. tel取后4位,do2,201810, do3 ^, 4, % &// postPut/*** 保存主叫用户数据之后,由Hbase自动保存被叫用户数据* @param e* @param put* @param edit* @param durability* @throws IOException*/@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {// 获取表Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));// 主叫用户的rowkeyString rowkey = Bytes.toString(put.getRow());// 1_133_2019_144_1010_1String[] values = rowkey.split("_");CoprocessorDao dao = new CoprocessorDao();String call1 = values[1];String call2 = values[3];String calltime = values[2];String duration = values[4];String flg = values[5];if ( "1".equals(flg) ) {// 只有主叫用户保存后才需要触发被叫用户的保存String calleeRowkey = dao.getRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";// 保存数据Put calleePut = new Put(Bytes.toBytes(calleeRowkey));byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));table.put( calleePut );}// 关闭表table.close();}private class CoprocessorDao extends BaseDao {public int getRegionNum(String tel, String time) {return genRegionNum(tel, time);}}
}

7.3 模块③—导出数据及运算(P23-P29)

1660893925799

7.3.1数据库表设计

1660887584035

1660887605668

1660887560833

7.3.2数据计算

用于map和reduce,通过对应的方法获取Hbase的数据

/*** 分析数据工具类*/
public class AnalysisTextTool implements Tool {public int run(String[] args) throws Exception {Job job = Job.getInstance();job.setJarByClass(AnalysisTextTool.class);//从Hbase中读去数据Scan scan = new Scan();scan.addFamily(Bytes.toBytes(Names.CF_CALLER.getValue()));// mapperTableMapReduceUtil.initTableMapperJob(Names.TABLE.getValue(),scan,AnalysisTextMapper.class,Text.class,Text.class,job);// reducerjob.setReducerClass(AnalysisTextReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setOutputFormatClass(MySQLRedisTextOutputFormat.class);boolean flg = job.waitForCompletion(true);if ( flg ) {return JobStatus.State.SUCCEEDED.getValue();} else {return JobStatus.State.FAILED.getValue();}}public void setConf(Configuration configuration) {}public Configuration getConf() {return null;}
}

将从Hbase中获取到的数据进行拆封,封装为集合

/*** maper*/
public class AnalysisTextMapper extends TableMapper<Text, Text> {@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {String rowkey = Bytes.toString(key.get());String[] values = rowkey.split("_");String call1 = values[1];String call2 = values[3];String calltime = values[2];String duration = values[4];String year = calltime.substring(0, 4);String month = calltime.substring(0, 6);String date= calltime.substring(0, 8);// 主叫用户 - 年context.write(new Text(call1+"_"+year), new Text(duration));// 主叫用户 - 月context.write(new Text(call1+"_"+month), new Text(duration));// 主叫用户 - 日context.write(new Text(call1+"_"+date), new Text(duration));// 被叫用户 - 年context.write(new Text(call2+"_"+year), new Text(duration));// 被叫用户 - 月context.write(new Text(call2+"_"+month), new Text(duration));// 被叫用户 - 日context.write(new Text(call2+"_"+date), new Text(duration));}
}

获取到Mapper的数据对数据进行对应的运算

/*** 分析数据的Reducer*/
public class AnalysisTextReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int sumCall = 0;int sumDuration = 0;for (Text value : values) {int duration = Integer.parseInt(value.toString());sumDuration = sumDuration + duration;sumCall++;}context.write(key, new Text(sumCall + "_" + sumDuration));}}

将数据上传到Mysql,并通过Redis对数据表进行缓存

/*** Mysql的数据格式化输出对象*/
public class MySQLTextOutputFormat extends OutputFormat<Text, Text> {protected static class MySQLRecordWriter extends RecordWriter<Text,Text> {private Connection connection =null;private Map<String,Integer> userMap=new HashMap<String, Integer>();Map<String, Integer> dateMap = new HashMap<String, Integer>();public MySQLRecordWriter() {// 获取资源connection = JDBCUtil.getConnection();PreparedStatement pstat = null;ResultSet rs = null;try {String queryUserSql = "select id, tel from ct_user";pstat = connection.prepareStatement(queryUserSql);rs = pstat.executeQuery();while ( rs.next() ) {Integer id = rs.getInt(1);String tel = rs.getString(2);userMap.put(tel, id);}rs.close();String queryDateSql = "select id, year, month, day from ct_date";//将整张表中的数据查出pstat = connection.prepareStatement(queryDateSql);rs = pstat.executeQuery();while ( rs.next() ) {Integer id = rs.getInt(1);String year = rs.getString(2);String month = rs.getString(3);if ( month.length() == 1) {month = "0" + month;}String day = rs.getString(4);if ( day.length() == 1 ) {day = "0" + day;}dateMap.put(year + month + day, id);}} catch (Exception e) {e.printStackTrace();} finally {if ( rs != null ) {try {rs.close();} catch (SQLException e) {e.printStackTrace();}}if ( pstat != null ) {try {pstat.close();} catch (SQLException e) {e.printStackTrace();}}}}/*** 输出数据* @param key* @param value* @throws IOException* @throws InterruptedException*/@Overridepublic void write(Text key, Text value) throws IOException, InterruptedException {String[] values = value.toString().split("_");String sumCall = values[0];String sumDuration = values[1];PreparedStatement pstat = null;try {String insertSQL = "insert into ct_call ( telid, dateid, sumcall, sumduration ) values ( ?, ?, ?, ? )";pstat = connection.prepareStatement(insertSQL);String k = key.toString();String[] ks = k.split("_");String tel = ks[0];String date = ks[1];pstat.setInt(1, userMap.get(tel));pstat.setInt(2, dateMap.get(date));pstat.setInt(3, Integer.parseInt(sumCall) );pstat.setInt(4, Integer.parseInt(sumDuration));pstat.executeUpdate();} catch (SQLException e) {e.printStackTrace();} finally {if ( pstat != null ) {try {pstat.close();} catch (SQLException e) {e.printStackTrace();}}}}/*** 释放资源* @param context* @throws IOException* @throws InterruptedException*/@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if ( connection != null ) {try {connection.close();} catch (SQLException e) {e.printStackTrace();}}}}@Overridepublic RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {return new MySQLRecordWriter();}@Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {}private FileOutputCommitter committer = null;public static Path getOutputPath(JobContext job) {String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);return name == null ? null: new Path(name);}public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {if (committer == null) {Path output = getOutputPath(context);committer = new FileOutputCommitter(output, context);}return committer;}
}

7.4 模块④—数据展示

1660893943056

通过controlller层获取浏览器请求

/*** 通话日志控制器对象*/
@Controller
public class CalllogController {@Autowiredprivate CalllogService calllogService;@RequestMapping("/query")public String query() {return "query";}// Object ==> json ==> String//@ResponseBody@RequestMapping("/view")public String view( String tel, String calltime, Model model ) {// 查询统计结果 : MysqlList<Calllog> logs = calllogService.queryMonthDatas(tel, calltime);System.out.println(logs.size());model.addAttribute("calllogs", logs);return "view";}
}

通过service层调用dao层接口获取数据,并将数据封装为对象

/** 通话日志服务对象*/
@Service
public class CalllogServiceImpl implements CalllogService {@Autowiredprivate CalllogDao calllogDao;/*** 查询用户指定时间的通话统计信息* @param tel* @param calltime* @return*/@Overridepublic List<Calllog> queryMonthDatas(String tel, String calltime) {Map<String, Object> paramMap = new HashMap<String, Object>();paramMap.put("tel", tel);if ( calltime.length() > 4 ) {calltime = calltime.substring(0, 4);}paramMap.put("year", calltime);System.out.println(paramMap);return calllogDao.queryMonthDatas(paramMap);}
}

Dao层获取数据

/*
* 通话日志数据访问对象*/
public interface CalllogDao {List<Calllog> queryMonthDatas(Map<String, Object> paramMap);
}

通过Mybatiis配置文件执行SQL对数据库中的数据进行查询

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.atguigu.ct.web.dao.CalllogDao" ><select id="queryMonthDatas" resultType="com.atguigu.ct.web.bean.Calllog">select * from ct_call where telid = (SELECTIDfrom ct_userwhere tel = #{tel}) and dateid in (SELECTIDfrom ct_datewhere year = #{year} and month != '' and day = '')</select></mapper>

8.0云服务器部署与搭建

8.1工具准备

序号工具版本
1阿里云服务器1核1G CenterOS7.3
2Xshell7.0
3Xftp7.0
4Navicat15.0
5JDKLinux版本:jdk-8u212-linux-x64.tar.gz
6MysqlLinux版本:mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar
7TomcatLinux版本:apache-tomcat-8.5.75.tar.gz

8.2安装JDK

(1)将文件放入/opt/software目录下

 jdk-8u212-linux-x64.tar.gz

(2)将JDK和Hadoop两个文件分别解压

#卸载虚拟机自带的java
rpm -qa | grep -i java | xargs -n1 rpm -e --nodepstar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

(3)添加JDK的环境变量

sudo vim /etc/profile.d/my_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin最后让文件生效:
source /etc/profile

(4)对虚拟机进行重启并验证JDK是否安装成功

java -version

8.3安装Tomcat

(1)上传Tomcat文件包到/opt/softwere

apache-tomcat-8.5.75 

(2)解压Tomacat

tar -zxvf apache-tomcat-8.5.75 
mv  apache-tomcat-8.5.75 /opt/module

8.4安装MySql

1)检查当前系统是否安装过 MySQL

[atguigu@hadoop102 ~]$ rpm -qa|grep mariadb
mariadb-libs-5.5.56-2.el7.x86_64 
//如果存在通过如下命令卸载
[atguigu @hadoop102 ~]$ sudo rpm -e --nodeps mariadb-libs

2)将 MySQL 安装包拷贝到/opt/software 目录下

[atguigu @hadoop102 software]# ll
总用量 528384
-rw-r--r--. 1 root root 609556480 3 月 21 15:41 mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar

3)解压 MySQL 安装包

[atguigu @hadoop102 software]# tar -xf mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar

4)在安装目录下执行 rpm 安装

[atguigu @hadoop102 software]$ 
sudo rpm -ivh mysql-community-common-5.7.28-1.el7.x86_64.rpm
sudo rpm -ivh mysql-community-libs-5.7.28-1.el7.x86_64.rpm
sudo rpm -ivh mysql-community-libs-compat-5.7.28-1.el7.x86_64.rpm
sudo rpm -ivh mysql-community-client-5.7.28-1.el7.x86_64.rpm
sudo rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm

过程中出现报错则说明需要安装依赖,通过 yum 安装缺少的依赖,然后重新安装 mysql-community-server-5.7.28-1.el7.x86_64 即可cd

[atguigu@hadoop102 software] yum install -y libaio

5)删除/etc/my.cnf 文件中 datadir 指向的目录下的所有内容,如果有内容的情况下:

查看 datadir 的值:

[mysqld]
datadir=/var/lib/mysql

删除/var/lib/mysql 目录下的所有内容(一般没有任何东西):

[atguigu @hadoop102 mysql]# cd /var/lib/mysql
[atguigu @hadoop102 mysql]# sudo rm -rf ./* //注意执行命令的位置

6)初始化数据库

[atguigu @hadoop102 opt]$ sudo mysqld --initialize --user=mysql

7)查看临时生成的 root 用户的密码

[atguigu @hadoop102 opt]$ sudo cat /var/log/mysqld.log       jZu%i>4hD

1660702382426

8)启动 MySQL 服务

[atguigu @hadoop102 opt]$ sudo systemctl start mysqld

9)登录 MySQL 数据库

[atguigu @hadoop102 opt]$ mysql -uroot -p
Enter password: 输入临时生成的密码

10)必须先修改 root 用户的密码,否则执行其他的操作会报错

mysql> set password = password("fgl123");

11)修改 mysql 库下的 user 表中的 root 用户允许任意 ip 连接

mysql> update mysql.user set host='%' where user='root';
mysql> flush privileges;

12)使用本机Navicat连接远程数据库

1660702611954

8.5配置防火墙

1.查看防火墙服务状态

systemctl status firewalld

1660703030846

2.如果没有开启可进行如下操作

#开启服务
service firewalld start
#关闭服务
service firewalld stop
#重启服务
service firewalld restart

3.查看防火墙状态并开启防火墙

#查看防火墙状态
firewall-cmd --state
#开启防火墙
service mysqld start

1660703092637

4.查看防护墙规则

firewall-cmd --list-all

1660703933950

5.如果没有开放这些端口可使用如下命令开放

#开放80端口
firewall-cmd --permanent --add-port=80/tcp
#开放8080端口
firewall-cmd --permanent --add-port=8080/tcp
#开放3306端口
firewall-cmd --permanent --add-port=3306/tcp#移除端口命令
firewall-cmd --permanent --remove-port=3306/tcp

6.开通完成后重启防火墙

firewall-cmd --reload

7.重新查看端口是否开启

firewall-cmd --list-all

8.6配置云服务器安全组

以阿里云为例

1660704395013

1660704437061

1660704460205

1660704509610

本机已经开启所以手动添加为灰色

8.7Tomcat配置与启动

1.将war包放到tomcat目录下的webapp中

2.文件配置修改con中的server.xml配置文件

#进入conf目录
cd conf#修改配置文件
vim server.xml#将8080端口该为80端口#按照个人war包在系统中的地址进行配置
<Context docBase="/opt/module/apache-tomcat-8.5.75/webapps/ssmhe" path="" reloadable="false"/>

1660704767005

1660704993900

1660704848608

3.启动Tomcat

#进入bin目录#开启tomcat(等待20秒)
./startup.sh#关闭tomcat
./shutdown.sh#重新开启
./startup.sh

4.通过云服务器公网IP在浏览器中进行访问

如: 47.94.139.13

可通过购买域名与公网IP进行配置从而让用户使用域名进行访问xs

8.8补充知识

8.8.1如何将项目打war包

1.在IDEA中打开Project Structure

1660705598516

2.选择

1660705637100

3.点击+号

1660705666763

4.选择

1660705700030

5.配置并打包

1660705778126

1660705807161

1660705831419

8.8.2如何在本地Tomcat运行war包

1.将war包放入本地Tomcat目录中的webapp下

1660706036288

2.在tomcat bin目录下打开终端输入.\startup.bat

1660706169703

3.弹出运行框正常运行即可访问web项目

1660706234298

4.关闭Tomcat

1660706297352

9.0 项目总结

9.1项目背景

通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等。我们以此为背景,寻找一个切入点,学习其中的方法论。当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。

9.2项目流程

1660645749939

9.3项目环境

1.工具

工具版本
IDEA2020.2
Maven3.3.9
JDK1.8+
Navicat15
Tomcat8.5.75
CenertOS7.0
Xshell7

2.框架信息

框架版本
Hadoop2.7.2
Zookeeper3.5.7
Hbase1.3.1
Flume1.9.0
Kafka2.1.2-3.0.0
Redis3.0.0

3.集群环境

hadoop102hadoop103hadoop104
内存4G4G4G
CPU2核2核2核
硬盘50G50G50G
进程hadoop102hadoop103hadoop104
Hadoop HDFSNameNodeSecondaryNameNode
DataNodeDataNodeDataNode
Hadoop YARNResourceManager
NodeManagerNodeManagerNodeManager
Hadoop历史服务器JobHistoryServer
ZookeeperQuorumPeerMainQuorumPeerMainQuorumPeerMain
KafkaKafkaKafkaKafka
HBaseHMaster
HRegionServerHRegionServerHRegionServer
jspjspjsp

9.4项目运行图

1.查询界面

1660645119842

2.统计界面

1660645167017

1660645179340

附件分享:

个人gitee笔记:https://gitee.com/fanggaolei/learning-notes-warehouse

包含JavaWEB 大数据 算法 SQL Java等typora文档。附带对应的SSM项目和大数据项目,欢迎大家多多Starred


http://www.ppmy.cn/news/193250.html

相关文章

仿10086电信业务平台

目录 配置环境 框架图 模块分析 四大板块 功能代码段 语音播报 数据库调用 显示时间 总体框架 完整代码&#xff1a; 文末附完整代码链接 配置环境 VC6.0 东进语音卡 Access数据库 ADO接口技术 在VC6.0中&#xff0c;我们通过新建MFC&#xff08;微软基础类库&…

电信的幽默

一岗二岗闭着眼睛下指标,三岗四岗做稳位子造假报,五岗六岗动动嘴巴填腰包,七岗八岗干死干活不讨好,九岗十岗起的比鸡早,睡的不小姐晚,责任比主席大,催帐不黄世仁狠,态度比孙子好,赚钱比算命少. 电信阳关明媚,老总笑容陶醉,中层摧残人类,职工心里交瘁,心里默默流泪,若遇领导来访…

从电信网络诈骗角度剖析,诈骗资金是如何流转的?

前言 近年来&#xff0c;随着我国经济社会向数字化快速转型&#xff0c;犯罪结构发生了根本性变化&#xff0c;传统犯罪持续下降&#xff0c;以电信网络诈骗为代表的新型犯罪快速上升成为“主流”&#xff0c;严重阻碍了我国数字经济的健康发展。 面对严峻的电信网络诈骗现状…

被新华社技术局点名!中国电信这个“黑盒子”干啥了?

全媒体时代&#xff0c;新闻报道又快又好&#xff0c;需要先进技术提供有力支撑。近日&#xff0c;中国电信收到新华社通信技术局感谢信&#xff0c;点赞中国电信5G商企专网产品&#xff0c;基于5G专线进行点对点快速组网&#xff0c;实现了前方报道重要节点物理线路的“空天备…

遭遇电信诈骗!

遭遇有生以来第一个电信诈骗&#xff0c;套路还挺深的&#xff0c;在这里写给大家看&#xff0c;希望大家都提高警惕。 事情是这样的&#xff0c;昨天下午&#xff0c;跟朋友一起上分&#xff0c;一波4连胜之后&#xff0c;下线休息一下。 正撸猫呢&#xff0c;突然接到一个029…

电信“青年一派”电话卡【沉默停机】解决方法

前言&#xff1a; 能够进来看到这条消息的想必都是拥有青年一派电话卡的吧&#xff0c;现在0月租且无低消的卡可以说基本没有了&#xff0c;且行且珍惜 如果由于长时间未使用而被沉默停机的&#xff0c;可以试试下面的方法&#xff0c;沉默停机两次总结出来的..... 一般沉默停…

“一老一幼”的智慧化守护,网易和中国电信交出“三年答卷”

“老年人越来越长寿&#xff0c;对老年人的服务要跟上。”“孩子们现在都是宝&#xff0c;对孩子们的养育和培养等工作要加强”。 早在2000多年前的时候&#xff0c;孟子就在《梁惠王》上篇中留下了“老吾老以及人之老&#xff0c;幼吾幼以及人之幼”的警言。“老有所依&#…

android手机是vivo吗,为什么最流畅的安卓手机竟然是VIVO和OPPO?

原标题&#xff1a;为什么最流畅的安卓手机竟然是VIVO和OPPO&#xff1f; 可能用过的OPPO和VIVO的朋友会说&#xff0c;OPPO和VIVO并不会比其它安卓手机的配置高很多&#xff0c;但是为什么会比其它安卓手机流畅呢&#xff1f;其实手机的流不流畅除了跟配置有关之外&#xff0c…