出行项目案例

server/2025/2/24 5:31:40/

spark和kafka主要通过Scala实现,Hadoop和HBase主要基于java实现。

通过该项目,主要达到以下目的:

(1)通用的数据处理流程,入门大数据领域

(2)真实体验大数据开发工程师的工作

(3)企业级的项目,利用这个思路可以做二次拓展开发

(4)从0到有,数据抽取、数据存储、数据处理、展现

大数据平台架构图:

大数据没有事务的概念,需要不间断完整地把流程跑完,没有事务回滚的概念。

1. 项目需求

打车、叫车,出行的便捷问题等问题在一个出行平台建设中需要解决,与此同时安全出行也是重中之重,为了增加出行的便捷,提高出行的安全,对我们乘车的细节以及发生点我们迫切的需要及时知道,为此特地通过大数据的手段来处理我们海量的出行数据,做到订单的实时监控,乘车轨迹的的回放,虚拟打车站的选定等功能。

重点:乘车轨迹的的细节回放,虚拟打车站

2. 效果示意图

轨迹回放:

订单监控:指标的计算


3. 技术选型 


我们的项目建设主要是依据数据的生命周期来做的技术选型,目前主要依照的是大家都在用的一些技术,具体生产中应用要考虑实际的场景。比如人员、技术、接入难度、社区、版权等等各种问题

3.1 数据的生命周期

[参考其他数据平台的建设](https://www.sohu.com/a/242008443_468661)

数据的生产(web应用)>数据的传输>数据存储>计算>应用
 

3.2 数据传输

 数据采集:

采集框架名称主要功能版本
flume擅长日志数据的采集和解析1.9.0

消息中间件:

概述版本
KafkaLinkedIn用Scala语言实现,支持hadoop数据并行加载2.6.2

3.3 数据存储

框架名称主要用途版本
Hadoop分布式文件存储系统3.2.2
HbaseKey,value对的nosql数据库2.2.7

3.4 计算框架

框架名称基本介绍版本
Spark基于Spark,一站式解决批流处理问题3.1.1

4. 日志格式

本项目会使用到两份数据,原始文件名称为 order.txt以及gps。其中order.txt数据主要用来做我们的虚拟车站功能,gps主要用来做我们的数据回放功能。 日志存放在网盘中,可以下载,在/root目录下解压

gps数据:

字段名称类型示例备注
DRIVERID司机IDStringglox.jrrlltBMvCh8nxqktdr2dtopmlH
ORDERID订单IDStringjkkt8kxniovIFuns9qrrlvst@iqnpkwz
TIME时间戳String1501584540unix时间戳,单位为秒
LNG经度String104.04392GCJ-02坐标系
LAT纬度String30.67518GCJ-02坐标系

订单数据: 

字段ID字段名称字段样本描述
order_id订单IDstring类型且已脱敏
product_id产品线ID1滴滴专车, 2滴滴企业专车, 3滴滴快车, 4滴滴企业快车
city_id城市ID选取海口当地
district城市区号海口区号
county二级区县记录区县id
type订单时效0实时,1预约
combo_type订单类型1包车,4拼车
traffic_type交通类型1企业时租,2企业接机套餐,3企业送机套餐,4拼车,5接机,6送机,302跨城拼车
passenger_count乘车人数拼车场景,乘客选择的乘车人数
driver_product_id司机子产品线司机所属产品线
start_dest_distance乘客发单时出发地与终点的预估路面距离乘客发单时,出发地与终点的预估路面距离
arrive_time司机点击‘到达’的时间司机点击‘到达目的地’的时间
departure_time出发时间如果是实时单,出发时间(departure_time) 与司机点击‘开始计费’的时间(begin_charge_time)含义相同;如果是预约单,是指乘客填写的出发时间
pre_total_fee预估价格根据用户输入的起始点和目的地预估价格
normal_time时长分钟
bubble_trace_id
product_1level一级业务线1专车,3快车,9豪华车
dest_lng终点经度对应乘客填写的目的地对应的经度
dest_lat终点纬度对应乘客填写的目的地对应的纬度
starting_lng起点经度对应乘客填写的起始点对应的经度
starting_lat起点纬度对应乘客填写的起始点对应的纬度
year年份对应出行的年份
month月份对应出行的月份
day日期对应出行的日期

5. 项目架构

  • 数据采集 flume 去采集 order gps、发往kafka

  • spark 消费kafka数据存入redis里面(实时的监控)

  • spark 消费kafka数据存入hbase里面(计算绿点)

6. 环境搭建

所有的软件安装,请先bing XX分布式环境安装,然后在结合文档看本项目是如何配置的,否则如果你本身不清楚如何安装的,看下面的安装步骤会很懵 前提:我们的集群,使用了三台机器,机器的基础配置建议4C8G+50G-MEM的配置,否则项目会很卡,大数据环境存储基于MEM,计算基于内存,存在许多IO,所以对性能的要求是较高,学习的话建议大家可以按量购买云产品使用

安装环境 centos7.3

  • 所有的软件都基于root用户安装(生产环境中用普通用户),软件都安装在/root目录下

  • 三台机器需要提前设置好免密配置 免密配置参考

  • 所有的文件下载后,都需要改名称,例如mv hadoop-3.2.2 hadoop,其他的软件也需要改名称,目的是方便管理、升级

  • 所有涉及到的脚本都可以在doc目录下找到

节点角色
Hadoop01

HDFS: namenode,datanode,secondarynamenode

YARN: resourcemanager, nodemanager

Kafka

Spark: master,worker

Zookeeper: QuorumPeerMain

HBase: Hmaster,regionServer

Hadoop02

HDFS: datanode,

YARN: nodemanager

Spark: worker

kafka

Zookeeper: QuorumPeerMain

Hbase: regionServer

Hadoop03

HDFS: datanode

YARN: nodemanager

Spark: worker

kafka

flume

redis(docker)

Zookeeper: QuorumPeerMain

Hbase: regionServer

HDFS:基础数据的存储

YARN:计算调度,可以调度本地资源

kafka:流式处理

spark:计算软件,具体任务调度依赖于yarn,yarn依赖于HDFS

6.1 Java1.8安装

需要配置好环境变量

6.2 Hadoop

  • 节点1,下载hadoop wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-3.2.2/hadoop-3.2.2.tar.gz

  • 修改配置文件/root/hadoop/etc/hadoopworkersmapred-site.xmlhadoop-env.shyarn-site.xmlcore-site.xmlhdfs-site.xml 具体文件内容参考放在了doc下

    • cd etc/hadoop  然后cat 以下文件

      • workers表示分别部署在哪几台机器上:cat workers

      • mapred-site.xml配置了环境的位置

      • hadoop-env.sh配置了环境变量

      • yarn-site.xml:yarn是分布式调度软件,主要用来配置yarn的主节点在哪里

      • core-site.xml:定义了数据保存到本地的哪个位置,以及数据的副本个数(生产上一般是三个副本或以上)

      • hdfs-site.xml

  • 分发到其他节点 节点2 节点3

  • 启动前要格式化NameNode

  • 启动整体集群:/root/hadoop/sbin/start-all.sh

  • jsp查看启动了哪些进程:NameNode、SecondaryNameNode和DataNode代表的是HDFS的进程节点;ResourceManager和NodeManager是yarn的节点。此时yarn和HDFS就启动好了。

6.3 Spark

官网:spark.apache.org

  • 节点1,下载对应包 wget https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz  (注意sparkhadoop版本的对应)

  • 修改 /root/spark/conf/workersspark-env.sh 两个文件

    • cd conf/     cat workers  : 表示worker节点在哪些地方启动,master在本机启动,workers需要自己配置

  • xsync /root/spark /root/spark 分发到其他节点

  • 启动/root/spark/sbin/start-all.sh

通过java -jar fileOperator.jar将日志中的数据以每秒一条的速度定向写入到dest文件夹的gps文件中,flume实时监控目标文件gsp,并把采集到的数据发送到kafka中,kafka中也是每秒一条的速度进行接收。

6.4 Flume

官网:Welcome to Apache Flume — Apache Flume

  • 节点3,下载 wget https://mirrors.bfsu.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

  • 先把kafka启动起来

  • 在节点3启动agent

  • flume启动:nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-mem-kafka-all.conf -Dflume.root.logger=INFO,console &

  • jps中application代表flume进程启动起来了。

6.5 Kafka

  • 节点1,下载 wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz

  • 修改 server.properties 文件,注意brokerid,zookeeper地址修改

  • xsync /root/kafka /root/kafka

  • 三台服务器启动kafka:(前提:三台服务器启动zookeeper,参考:kafka入门-CSDN博客

  • # 三台服务器分别进到zookeeper路径下,启动zookeeper服务:[root@192 local]# cd apache-zookeeper-3.8.4-bin/
    [root@192 apache-zookeeper-3.8.4-bin]# bin/zkServer.sh --config conf start
    
  • bin/kafka-server-start.sh -daemon config/server.properties
    # daemon为后台启动 不占用当前页面
    # 如果启动不生效,可以使用
    [root@192 kafka_2.13-3.8.0]# nohup bin/kafka-server-start.sh config/server.properties &
    

    sh kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic chengdu

6.6 HBase

  • 节点1,下载wget https://archive.apache.org/dist/hbase/2.2.7/hbase-2.2.7-bin.tar.gz

  • 修改 conf/hbase-site.xml \ regionServers文件

  • 分发到其他三个节点

  • bin/start-hbase.sh

6.7 zookeeper

  • 节点1,下载wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz

  • 修改 zoo.cfg

  • /tmp/zookeeper/添加myid文件,文件内容分别为0,1,2

  • xsync /root/zookeeper /root/hadoop/zookeeper

HBase

6.8 Redis

Redis使用了docker安装,生产中使用的一般是集群,我们这里就不安装redis集群环境了

docker run --name myredis -p 6379:6379 -v /home/disk1/redis:/data -d redis redis-server --appendonly yes

7. 一致性语义

流处理的一致性语义

kafka 是一个stream消息系统,包含三种角色:

Producer:写数据  -->  broker:存储数据  -->  consumer:从kafka消费数据

1. 一个是Producer到broker端数据如果没有确认,则重复发送,确保数据不丢失
2. consumer消费数据如果失败,则从失败的位置开始消费,并且需要做一个去重

保证数据不丢失,并且数据是不重复的

at-most-once:数据最多发送一次。适用于不太重要的日志数据。

如果在 ack 超时或返回错误时 producer 不重试,则该消息可能最终不会写入 Kafka,因此不会传递给 consumer。在大多数情况下,这样做是为了避免重复的可能性,业务上必须接收数据传递可能的丢失。ack=0.

at-least-once:至少发送一次

如果 producer 收到来自 Kafka broker 的确认(ack)或者 acks = all,则表示该消息已经写入到 Kafka。但如果 producer ack 超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入 Kafka。如果 broker 在发送 Ack 之前失败,但在消息成功写入 Kafka 之后,此重试将导致该消息被写入两次,因此消息会被不止一次地传递给最终 consumer,这种策略可能导致重复的工作和不正确的结果。

exactly-once:精准一次

producer没有收到broker的回复时,需要重复发送数据,但是即使 producer 重试发送消息,消息也会保证最多一次地传递给最终consumer(通过consumer端的重复值校验:Redis重复id去重的性质)。该语义是最理想的,但也难以实现,因为它需要消息系统本身与生产和消费消息的应用程序进行协作。ack =-1

并且设置offset自身存储,防止consumer消费失败。

Kafka

主要失败原因

- Broker失败:Kafka 作为一个高可用、持久化系统,保证每条消息被持久化并且冗余多份(假设是 n 份),所以 Kafka 可以容忍 n-1 个 broker 故障,意味着一个分区只要至少有一个 broker 可用,分区就可用。Kafka 的副本协议保证了只要消息被成功写入了主副本,它就会被复制到其他所有的可用副本(ISR)。
- Producer 到 Broker 的 RPC 失败:Kafka 的持久性依赖于生产者接收broker 的 ack 。没有接收成功 ack 不代表生产请求本身失败了。broker 可能在写入消息后,发送 ack 给生产者的时候挂了,甚至 broker 也可能在写入消息前就挂了。由于生产者没有办法知道错误是什么造成的,所以它就只能认为消息没写入成功,并且会重试发送。在一些情况下,这会造成同样的消息在 Kafka 分区日志中重复,进而造成消费端多次收到这条消息。
- 客户端也可能会失败:Exactly-once delivery 也必须考虑客户端失败的情况。但是如何去区分客户端是真的挂了(永久性宕机)还是说只是暂时丢失心跳?追求正确性的话,broker 应该丢弃由 zombie producer 发送的消息。 consumer 也是如此,一旦新的客户端实例已经启动,它必须能够从失败实例的任何状态中恢复,并从安全点( safe checkpoint )开始处理,这意味着消费的偏移量必须始终与生成的输出保持同步。

1. 增加业务处理

- 为每个消息增加唯一主键,生产者不做处理,由消费者根据主键去重
- producer(flume)设置ack=-1确保数据不丢失(flume的配置文件file-mem-kafka-all.conf中设置ack=-1,表示必须给回执,否则一直发送重复的消息)
- 消费者,往往是我们的流式任务,我们需要关闭自动提交 offset 的功能,业务保存offset,将保存offset与消费操作放入到一个事务当中去执行

Sparkstreming/Flink

- 支持一致性语义,只要是在任务调度过程中失败了,那么会去寻找checkpoint 拿到最新的副本数据


http://www.ppmy.cn/server/170275.html

相关文章

计算机毕业设计Python+DeepSeek-R1高考推荐系统 高考分数线预测 大数据毕设(源码+LW文档+PPT+讲解)

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

冬训周报(四)

一、补题 天梯赛训练 补题-CSDN博客 蓝桥杯训练 补题-CSDN博客 二、算法 本周主要是蓝桥杯的一个训练,这其中对于搜索和二分的算法居多,对于搜索而言,简单的搜索可能还不成问题,但稍微一复杂写起来还是有些吃力的;…

Docker 与 CI/CD:自动化构建和部署

在现代软件开发中,CI/CD(持续集成/持续部署) 是一种高效的软件开发和运维方法。CI/CD 通过自动化构建、测试和部署流程,减少了人为错误,提高了软件交付的速度和质量。Docker,作为一种容器化平台&#xff0c…

MySQL 视图入门

一、什么是 MySQL 视图 1.1 视图的基本概念 在 MySQL 中,视图是一种虚拟表,它本身并不存储实际的数据,而是基于一个或多个真实表(基表)的查询结果集。可以把视图想象成是一个预定义好的查询语句的快捷方式。当你查询…

2016年下半年试题二:论软件设计模式及其应用

论文库链接:系统架构设计师论文 论文题目 软件设计模式(Software DesignPatter)是一套被反复使用的、多数人知晓的、经过分类编目的代码设计经验的总结。使用设计模式是为了重用代码以提高编码效率增加代码的可理解性、保证代码的可靠性。软件设计模式是软件开发中的…

【拥抱AI】GPT Researcher 源码试跑成功的心得与总结

一、引言 在人工智能领域,自然语言处理(NLP)技术的发展日新月异。GPT Researcher 是一个基于大型语言模型(LLM)的开源研究工具,旨在帮助用户快速生成高质量的研究报告。通过自动化的方式,它能够…

使用大语言模型(Deepseek)构建一个基于 SQL 数据的问答系统

GitHub代码仓库 架构 从高层次来看,这些系统的步骤如下: 将问题转换为SQL查询:模型将用户输入转换为SQL查询。 执行SQL查询:执行查询。 回答问题:模型根据查询结果响应用户输入。 样本数据 下载样本数据&#xf…

ubuntu环境编译ffmepg支持nvidia显卡加速

文章目录 1. 安装NVIDIA驱动2. 安装CUDA&NV-CODEC2.1 安装CUDA2.2 安装NV-CODEC 3. 编译ffmpeg3.1 安装依赖3.2 下载源码安装依赖3.3 验证 4. 使用 1. 安装NVIDIA驱动 安装依赖包 sudo apt install -y ubuntu-drivers-common编辑 /etc/modprobe.d/blacklist-nouveau.conf 文…