AWS EMR上的Spark日志实时搜索关键指标网页呈现的设计和实现

news/2025/2/7 4:46:52/

为了在AWS EMR上实现基于Spark的大数据日志处理系统,并通过Kafka、ElasticSearch和Python Flask构建实时搜索与可视化平台,以下是详细的设计与实现方案:


一、架构设计

日志生成端
Apache Kafka
AWS EMR Spark Streaming
ElasticSearch
Python Flask Web
用户浏览器
核心组件说明
  1. Kafka Cluster:日志收集缓冲层
    • 建议使用Amazon MSK(托管Kafka服务)
    • 按日志量规划分区数和副本数
  2. EMR Spark:实时处理引擎
    • 启用Spark Structured Streaming
    • 使用EMR 6.x+版本(内置Spark 3.x)
  3. ElasticSearch:搜索与存储层
    • 推荐使用Amazon OpenSearch Service(托管ES)
    • 配置热/冷节点架构优化成本
  4. Flask Web:可视化层
    • 部署于EC2或ECS容器
    • 集成Jinja2模板与ECharts可视化

二、详细实现步骤

1. Kafka日志采集配置
# 生产者示例(Python)
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='kafka-broker1:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8')
)log_data = {"timestamp": datetime.now().isoformat(),"level": "ERROR","service": "payment-gateway","message": "Transaction timeout"
}
producer.send('app-logs', log_data)
2. EMR集群配置
  • Bootstrap Action:
    #!/bin/bash
    sudo pip-3.7 install elasticsearch-hadoop
    
  • 集群参数
    {"Classification": "spark-defaults","Properties": {"spark.sql.streaming.schemaInference": "true","spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"}
    }
    
3. Spark Streaming处理(Scala示例)
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka-broker1:9092").option("subscribe", "app-logs").load()// JSON解析与字段提取
val logsDF = df.selectExpr("CAST(value AS STRING)").select(from_json($"value", schema).as("log")).select("log.*").withColumn("timestamp", to_timestamp($"timestamp"))// 关键指标计算
val metricsDF = logsDF.withWatermark("timestamp", "5 minutes").groupBy(window($"timestamp", "1 minute"), $"service").agg(count("*").as("total_errors"),sum(when($"level" === "ERROR", 1).otherwise(0)).as("critical_errors"))// 写入ElasticSearch
metricsDF.writeStream.outputMode("update").format("org.elasticsearch.spark.sql").option("es.nodes", "opensearch-domain:9200").option("es.mapping.id", "window").option("checkpointLocation", "/checkpoint").start("metrics-index/_doc")
4. ElasticSearch索引优化
PUT /metrics-index
{"settings": {"number_of_shards": 3,"refresh_interval": "30s"},"mappings": {"dynamic": "strict","properties": {"window": {"type": "date_range", "format": "epoch_millis"},"service": {"type": "keyword"},"total_errors": {"type": "integer"},"critical_errors": {"type": "integer"}}}
}
5. Flask Web服务实现
python"># app.py
from flask import Flask, render_template
from elasticsearch import Elasticsearchapp = Flask(__name__)
es = Elasticsearch(['opensearch-domain:9200'])@app.route('/dashboard')
def dashboard():query = {"size": 0,"aggs": {"services": {"terms": {"field": "service.keyword"},"aggs": {"total_errors": {"sum": {"field": "total_errors"}}}}}}res = es.search(index="metrics-index", body=query)return render_template('dashboard.html', data=res['aggregations'])# templates/dashboard.html
<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.0/dist/echarts.min.js"></script>
<div id="chart" style="width:800px;height:600px;"></div>
<script>let chart = echarts.init(document.getElementById('chart'));let option = {title: {text: '实时服务错误统计'},tooltip: {},xAxis: {data: {{ data.services.buckets|map(attribute='key')|list|tojson }} },yAxis: {},series: [{type: 'bar', data: {{ data.services.buckets|map(attribute='total_errors.value')|list|tojson }}}]};chart.setOption(option);
</script>

三、性能优化策略

  1. Kafka优化

    • 启用Snappy压缩
    • 配置linger.ms=20和batch.size=16384
  2. Spark调优

    spark-submit --executor-memory 8G \--executor-cores 4 \--conf spark.sql.shuffle.partitions=200
    
  3. ES写入优化

    • 设置es.batch.size.bytes=10mb
    • 禁用副本写入es.write.operation=create
  4. Web层缓存

    python">@app.route('/dashboard')
    @cache.cached(timeout=10)  # 使用Flask-Caching
    def dashboard():# ...
    

---### **四、监控与运维**1. **监控指标**:- Kafka Consumer Lag- Spark Streaming批处理时间- ES JVM Heap使用率2. **日志排查工具**:```bash# 查看Spark Streaming进度yarn logs -applicationId <appId> -log_files stdout# ES慢查询日志GET /_search?pretty&pre_filter_shard_size=1024&typed_keys=true

五、安全方案

  1. 网络隔离

    • 将Kafka/ES部署在私有子网
    • 使用Security Group限制访问源
  2. 认证授权

    python">es = Elasticsearch(hosts=['https://opensearch-domain:9200'],http_auth=('admin', 'password'),use_ssl=True
    )
    

该方案可实现每秒处理万级日志事件,并在5秒内完成从日志产生到可视化展示的全流程。建议根据实际业务需求调整时间窗口和聚合粒度,可通过增加EMR Task节点实现水平扩展。


http://www.ppmy.cn/news/1569981.html

相关文章

如何利用maven更优雅的打包

最近在客户现场部署项目&#xff0c;有两套环境&#xff0c;无法连接互联网&#xff0c;两套环境之间也是完全隔离&#xff0c;于是问题就来了&#xff0c;每次都要远程到公司电脑改完代码&#xff0c;打包&#xff0c;通过网盘&#xff08;如果没有会员&#xff0c;上传下载慢…

PHP 中 `foreach` 循环结合引用使用时可能出现的问题

问题背景 假设你有如下 PHP 代码&#xff1a; <?php $arr array(1, 2, 3, 4);// 使用引用遍历并修改数组元素 foreach ($arr as &$value) {$value $value * 2; } // 此时 $arr 变为 array(2, 4, 6, 8)// 再使用非引用方式遍历数组 foreach ($arr as $key > $val…

arm-linux-gnueabihf安装

Linaro Releases windows下打开wsl2中的ubuntu&#xff0c;资源管理器中输入&#xff1a; \\wsl$gcc-linaro-4.9.4-2017.01-x86_64_arm-linux-gnueabihf.tar.xz 复制到/home/ark01/tool 在 Ubuntu 中创建目录&#xff1a; /usr/local/arm&#xff0c;命令如下&#xff1a; …

MySQL 索引原理

索引&#xff08;Index&#xff09;是 MySQL 用来提高查询效率的数据结构。索引的核心原理是 通过减少数据扫描的范围&#xff0c;提高查询性能。索引类似于一本书的目录&#xff0c;可以加快查找的速度。 1. 索引的底层数据结构 MySQL 主要使用两种索引数据结构&#xff1a;…

python学opencv|读取图像(四十九)原理探究:使用cv2.bitwise()系列函数实现图像按位运算

【0】基础定义 按位与运算&#xff1a;两个等长度二进制数上下对齐&#xff0c;全1取1&#xff0c;其余取0。 按位或运算&#xff1a;两个等长度二进制数上下对齐&#xff0c;有1取1&#xff0c;其余取0。 按位异或运算&#xff1a; 两个等长度二进制数上下对齐&#xff0c;相…

C_位运算符及其在单片机寄存器的操作

C语言的位运算符用于直接操作二进制位&#xff0c;本篇简单结束各个位运算符的作业及其在操作寄存器的应用场景。 一、位运算符的简单说明 1、按位与运算符&#xff08;&&#xff09; 功能&#xff1a;按位与运算符对两个操作数的每一位执行与操作。如果两个对应的二进制…

中国城商行信贷业务数仓建设白皮书(第一期:总体规划)

一、项目背景与行业现状 1.1 国内城商行信贷业务痛点 2024年统计数据显示:全国134家城商行平均历史数据处理延迟达37小时/次 传统Oracle架构日均处理能力上限仅为320万笔交易 客户特征维度不足(现行系统平均维护86个客户标签) 监管报表生成耗时超同业股份制银行2.3倍 1.2 H…

代码随想录二刷|回溯1

回溯 组合问题 方法 组合 题干 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 思路 &#xff08;1&#xff09;定义全局变量数组&#xff0c;作为存放组合的数组和存放最终答案的数组 &#xff08;2&…