将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)

server/2025/1/22 17:07:23/

前情回顾:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

前边的方案是挺好的,但 Azure Event Hubs 是付费服务,我这里只是一个获取日志进行必要的分析,并且不要求实时性,关键 没钱,那怎么办呢?

替代方案:

如果对实时性没有严格要求,并且希望避免使用付费的 Azure Event Hubs 服务,可以采用更经济的方式,比如将 Azure Blob 存储的日志直接发送到 Elasticsearch 或通过 Azure Storage QueueAzure Function 来实现。这些方法可以有效减少成本,同时满足日志解析和分析需求。

1. 直接从 Azure Blob 存储读取日志并处理

思路

直接从 Azure Blob 存储 中读取生成的日志文件,解析所需字段后发送到 Elasticsearch。这种方式适合没有实时性需求的场景,定期运行任务即可。

实现步骤
  1. 启用 Azure 存储日志记录

    • 在存储账户中启用 诊断设置
    • 将日志记录输出到同一存储账户的 Blob 容器中,例如 insights-logs
  2. 使用 Azure Function 或定时任务读取日志

    • 编写脚本(可以用 Python、PowerShell、C# 等语言),从指定的 Blob 容器中下载日志文件。
    • 解析日志文件,根据需要提取字段并格式化为 Elasticsearch 可接受的 JSON 数据。
    • 将数据批量发送到 Elasticsearch。
  3. 示例代码

    • 使用 Pythonazure-storage-blob 库:
    from azure.storage.blob import BlobServiceClient
    from elasticsearch import Elasticsearch
    import json# Azure Blob Storage 配置
    storage_account_name = "your_storage_account"
    storage_account_key = "your_storage_account_key"
    container_name = "insights-logs"# Elasticsearch 配置
    es = Elasticsearch("http://your-elasticsearch-server:9200")def process_blob_logs():blob_service_client = BlobServiceClient(account_url=f"https://{storage_account_name}.blob.core.windows.net",credential=storage_account_key)container_client = blob_service_client.get_container_client(container_name)# 列出日志文件blobs = container_client.list_blobs()for blob in blobs:blob_client = container_client.get_blob_client(blob)log_data = blob_client.download_blob().readall().decode('utf-8')# 假设日志是 JSON 格式for line in log_data.splitlines():try:log_entry = json.loads(line)# 提取你需要的字段parsed_entry = {"timestamp": log_entry["time"],"operation": log_entry["operationName"],"blobName": log_entry.get("blobName"),"requestorIp": log_entry.get("requestorIpAddress"),}# 写入 Elasticsearches.index(index="storage-logs", document=parsed_entry)except Exception as e:print(f"Error processing log entry: {e}")if __name__ == "__main__":process_blob_logs()
    
  4. 设置定时运行

    • 如果使用 Azure Function,配置 Timer Trigger 定期运行该脚本。
    • 如果使用本地脚本,使用 cron(Linux)或计划任务(Windows)实现定时任务。

2. 使用 Azure Storage Queue

思路

利用 Azure Storage Queue 作为消息队列替代 Event Hubs,用于暂存日志文件元数据或小型消息,然后由处理程序(如 Azure Function 或脚本)消费队列消息,读取并处理日志。

实现步骤
  1. 启用日志记录

    • 将日志写入 Blob 存储。
  2. 配置 Azure Storage Queue

    • 创建一个 Azure Queue
    • 编写脚本将日志的元数据(例如 Blob 的路径)写入队列。
  3. 编写处理脚本

    • 消费队列消息,根据消息中的 Blob 路径读取并解析日志文件。
  4. 示例代码

    • 使用 Python 和 azure-storage-queue
    from azure.storage.queue import QueueClient
    from azure.storage.blob import BlobServiceClient
    import jsonqueue_name = "your-queue"
    storage_account_name = "your_storage_account"
    storage_account_key = "your_storage_account_key"def process_queue_messages():queue_client = QueueClient(account_url=f"https://{storage_account_name}.queue.core.windows.net",credential=storage_account_key,queue_name=queue_name)blob_service_client = BlobServiceClient(account_url=f"https://{storage_account_name}.blob.core.windows.net",credential=storage_account_key)messages = queue_client.receive_messages()for msg in messages:blob_path = msg.content  # 假设队列消息中存储的是 Blob 路径container_name, blob_name = blob_path.split('/', 1)blob_client = blob_service_client.get_blob_client(container_name, blob_name)log_data = blob_client.download_blob().readall().decode('utf-8')# 解析日志并发送到 Elasticsearch(同上)print(f"Processing blob: {blob_path}")queue_client.delete_message(msg)  # 删除已处理消息if __name__ == "__main__":process_queue_messages()
    

3. 直接读取和解析日志文件后推送到 Elasticsearch

这种方法可以完全避开队列服务,直接通过脚本定期从存储账户下载日志文件并解析后推送到 Elasticsearch。

注意事项
  • 定时任务频率:根据日志生成的频率设定脚本运行的时间间隔。
  • 日志存储策略:Blob 日志文件可能会快速增长,考虑启用存储生命周期管理规则定期删除过期日志文件。
  • 安全性:确保存储帐户密钥或连接字符串的安全性,可使用 Azure 的 Managed Identity 替代密钥。

比较与选择

方案适用场景实现复杂度成本
直接从 Blob 存储读取数据量不大,无实时性需求
使用 Storage Queue有一定的队列需求,异步处理较低
使用 Azure Event Hubs需要高吞吐量和实时性

推荐:

  • 如果数据量不大,选择 直接从 Blob 存储读取
  • 如果需要解耦消息和处理程序,可以选择 Azure Storage Queue


前情后续:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)-CSDN博客





http://www.ppmy.cn/server/160518.html

相关文章

Lsky-Pro在线图片搭建教程(Docker部署方式)

Lsky Pro+ 是一个使用 PHP 语言,采用 Laravel 框架开发的一款 Web 图片管理程序,中文名:兰空图床。如果你需要一个在线图床程序,那么这个开源项目可以帮助到你,部署流程非常简单。本章教程记录如何部署Lsky-Pro。 一、拉取镜像 docker pull halcyonazure/lsky-pro-docke…

AF3 AttentionPairBias类源码解读

AttentionPairBias 是 AlphaFold3 的一个注意力机制模块,设计用于实现全自注意力(Full Self-Attention)并结合成对表示的偏置(Pair Bias)。它在 AlphaFold3 的架构中发挥重要作用,特别是在处理蛋白质序列和空间对称性相关的任务时。 源代码: class AttentionPairBias(…

C++ ——— 模拟实现 vector 类

目录 vector 类的框架 无参数的构造函数 析构函数 获取有效数据个数 获取容量 重载 [] 运算符 可读可写版本 只可读版本 扩容 尾插 实现迭代器 可读可写版本 只可读版本 自定义设置size长度和内容 在任意位置插入 删除任意位置的数据 赋值重载 vector 类的框…

使用递归处理无限自关联表

无限自关联表 所谓的无限自关联表就是类似于下图&#xff0c;有parent_id的这种&#xff0c;需要分级显示 实体类 实体类需要添加一个属性--下级列表 List<SysMenu> children;这个属性中是我们该实体的孩子结点 工具类处理分级树 public class MenuHelper {public st…

Python绘制数据地图-MovingPandas

MovingPandas 是一个用于时空数据分析的 Python 库&#xff0c;它扩展了 Pandas 和 GeoPandas&#xff0c;使得处理和分析带有时间戳的地理数据变得更加方便。虽然 MovingPandas 本身不直接提供数据可视化功能&#xff0c;但你可以结合其他库如 matplotlib、folium 或 plotly 来…

Stable Diffusion 提示词编写技巧及示例

Stable Diffusion 提示词&#xff08;Prompt&#xff09;的编写技巧及示例是生成高质量图像的关键。以下将结合我搜索到的资料&#xff0c;详细说明提示词的编写规则、技巧和示例。 一、提示词的基本规则与格式 分隔符&#xff1a;提示词之间需用英文逗号&#xff08;,&#…

五、华为 RSTP

RSTP&#xff08;Rapid Spanning Tree Protocol&#xff0c;快速生成树协议&#xff09;是 STP 的优化版本&#xff0c;能实现网络拓扑的快速收敛。 一、RSTP 原理 快速收敛机制&#xff1a;RSTP 通过引入边缘端口、P/A&#xff08;Proposal/Agreement&#xff09;机制等&…

macOS如何进入 Application Support 目录(cd: string not in pwd: Application)

错误信息 cd: string not in pwd: Application 表示在当前目录下找不到名为 Application Support 的目录。可能的原因如下&#xff1a; 拼写错误或路径错误&#xff1a;确保你输入的目录名称正确。目录名称是区分大小写的&#xff0c;因此请确保使用正确的大小写。正确的目录名…