AWS EMR使用Apache Kylin快速分析大数据

ops/2025/2/5 4:24:20/

在AWS Elastic MapReduce(EMR)集群上部署和使用Apache Kylin,以实现对大规模数据集的快速分析,企业可以充分利用云计算的强大资源和Kylin的数据分析能力,实现快速、高效的数据分析。以下是该案例的详细步骤和要点:

背景

Apache Kylin是一个开源的分布式分析引擎,设计用于处理超大规模数据集,提供亚秒级的查询响应时间。AWS(Amazon Web Services)是亚马逊公司的云计算平台,提供包括弹性计算、存储、数据库在内的一整套云计算服务。结合AWS的强大计算能力和Kylin的数据分析能力,企业可以加速数据分析过程,提升数据挖掘能力。

实施过程

  1. 准备AWS服务资源

    • 创建一个AWS账号,并配置必要的权限。

    • 了解与Amazon EMR集群相关的AWS服务资源,如VPC(Virtual Private Cloud)、EC2(Elastic Compute Cloud)和S3(Simple Storage Service)。

  2. 创建Amazon EMR集群

    • 在AWS控制台中选择EMR服务,点击“创建集群”。

    • 配置集群参数,包括选择EMR版本(如emr-5.21.0或更高版本,以确保支持Apache Kylin)、实例类型、数量以及网络设置等。

    • 勾选Apache Kylin运行必需的服务组件,如Hadoop、HBase、Hive等。

  3. 在EMR集群上安装Kylin

    • 登录到EMR集群的主节点。

    • 下载并解压Apache Kylin安装包。

    • 配置Kylin的环境变量和kylin.properties文件。

    • 替换必要的Jar包,以确保Kylin与EMR集群中的其他服务组件兼容。

  4. 配置Kylin数据源和Cube

    • 将数据存储在AWS的S3或HDFS中,并使用Hive进行预处理和清洗。

    • 在Kylin中定义数据源,指向存储在S3或HDFS中的数据。

    • 创建Cube,定义维度和度量,以及分区策略。

  5. 构建和查询Cube

    • 配置Cube构建任务,定期从数据源中提取数据并加载到Kylin中进行预计算。

    • 使用Kylin的Web界面或REST API进行查询,享受亚秒级的查询响应时间。

结果

通过在AWS的EMR集群上部署Apache Kylin,企业可以实现以下效益:

• 加速数据分析:Kylin的预计算机制显著减少了实时查询的计算量,提高了查询速度。

• 降低成本:利用AWS的按需付费和弹性扩展特性,企业可以根据实际需求灵活调整资源使用,降低IT投入成本。

• 提高系统稳定性:Kylin的分布式架构和高可用性设计确保了系统在高并发查询下的稳定运行。

示例代码

以下是一个在AWS EMR上创建Kylin Cube的示例代码:

 CREATE CUBE my_cube
DIMENSIONS (dimension1,dimension2
)
MEASURES (SUM(measure1),COUNT(measure2)
)
PARTITIONED BY (partition_date);

此代码创建了一个名为my_cube的Cube,包含了两个维度dimension1和dimension2,以及两个度量SUM(measure1)和COUNT(measure2)。数据按partition_date进行分区。

以下是在AWS EMR上部署Apache Kylin并实现数据分析的具体流程与关键Python代码实现:


一、AWS EMR集群创建(Python自动化)

使用boto3库自动化创建EMR集群:

import boto3def create_emr_cluster():emr = boto3.client('emr', region_name='us-west-2')response = emr.run_job_flow(Name='Kylin-EMR-Cluster',ReleaseLabel='emr-6.8.0',  # 确保支持KylinApplications=[{'Name': 'Hadoop'},{'Name': 'Hive'},{'Name': 'HBase'}],Instances={'InstanceGroups': [{'Name': 'MasterNode','Market': 'ON_DEMAND','InstanceRole': 'MASTER','InstanceType': 'm5.xlarge','InstanceCount': 1,},{'Name': 'CoreNodes','Market': 'SPOT',  # 使用Spot实例降低成本'InstanceRole': 'CORE','InstanceType': 'm5.xlarge','InstanceCount': 2,}],'Ec2KeyName': 'your-key-pair','KeepJobFlowAliveWhenNoSteps': True,'Ec2SubnetId': 'subnet-xxxxxx'},BootstrapActions=[{'Name': 'Install-Kylin','ScriptBootstrapAction': {'Path': 's3://your-bucket/install-kylin.sh'  # 引导脚本自动安装Kylin}}],ServiceRole='EMR_DefaultRole',JobFlowRole='EMR_EC2_DefaultRole')return response['JobFlowId']# 执行创建
cluster_id = create_emr_cluster()
print(f"Cluster created with ID: {cluster_id}")

kylinsh_134">二、Kylin安装引导脚本(install-kylin.sh)

#!/bin/bash
# 下载并解压Kylin
wget https://archive.apache.org/dist/kylin/apache-kylin-3.1.2/apache-kylin-3.1.2-bin-hbase1x.tar.gz
tar -xzf apache-kylin-3.1.2-bin-hbase1x.tar.gz -C /opt/
mv /opt/apache-kylin-3.1.2-bin-hbase1x /opt/kylin# 配置环境变量
echo 'export KYLIN_HOME=/opt/kylin' >> /etc/profile
echo 'export PATH=$KYLIN_HOME/bin:$PATH' >> /etc/profile
source /etc/profile# 替换HBase兼容性JAR(根据EMR版本调整)
cp /usr/lib/hbase/lib/*.jar /opt/kylin/ext/# 启动Kylin服务
kylin.sh start

三、Hive表创建(指向S3数据)

使用pyhive连接Hive并定义外部表:

from pyhive import hiveconn = hive.Connection(host='emr-master-node-ip', port=10000)
cursor = conn.cursor()# 创建外部表指向S3数据
cursor.execute('''
CREATE EXTERNAL TABLE IF NOT EXISTS sales_data (transaction_id STRING,product_id STRING,sale_amount DOUBLE,transaction_date DATE
)
STORED AS PARQUET
LOCATION 's3://your-bucket/sales-data/'
''')
print("Hive table created successfully.")

四、Kylin Cube创建(REST API调用)

使用requests调用Kylin API创建Cube:

import requests
import jsonkylin_url = 'http://<emr-master-ip>:7070/kylin/api'
headers = {'Content-Type': 'application/json', 'Authorization': 'Basic YWRtaW46S1lMSU4='}  # 默认admin/KYLIN# 1. 创建项目
project_payload = {"name": "Sales_Project"}
requests.post(f'{kylin_url}/projects', headers=headers, data=json.dumps(project_payload))# 2. 创建数据模型
model_payload = {"name": "sales_model","project": "Sales_Project","fact_table": "SALES_DATA","lookups": [],"dimensions": [{"table": "SALES_DATA", "column": "PRODUCT_ID"},{"table": "SALES_DATA", "column": "TRANSACTION_DATE"}],"metrics": ["SUM(SALE_AMOUNT)", "COUNT(TRANSACTION_ID)"],"partition_desc": {"partition_date_column": "TRANSACTION_DATE"}
}
requests.post(f'{kylin_url}/models', headers=headers, data=json.dumps(model_payload))# 3. 创建Cube
cube_payload = {"name": "sales_cube","model_name": "sales_model","dimensions": [{"name": "PRODUCT_ID", "table": "SALES_DATA", "column": "PRODUCT_ID"},{"name": "TRANSACTION_DATE", "table": "SALES_DATA", "column": "TRANSACTION_DATE"}],"measures": [{"name": "TOTAL_SALES", "function": {"expression": "SUM(SALE_AMOUNT)"}},{"name": "TRANSACTION_COUNT", "function": {"expression": "COUNT(TRANSACTION_ID)"}}],"partition_date_start": "2023-01-01","auto_merge_time_ranges": [7, 30]
}
response = requests.post(f'{kylin_url}/cubes', headers=headers, data=json.dumps(cube_payload))
print("Cube创建状态:", response.status_code)

五、触发Cube构建与查询

# 触发Cube构建
build_payload = {"startTime": "2023-01-01","endTime": "2023-12-31","buildType": "BUILD"
}
requests.put(f'{kylin_url}/cubes/sales_cube/build', headers=headers, data=json.dumps(build_payload))# 执行SQL查询
query = """
SELECT PRODUCT_ID, SUM(SALE_AMOUNT) 
FROM SALES_DATA 
WHERE TRANSACTION_DATE BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY PRODUCT_ID
"""
result = requests.post(f'{kylin_url}/query', headers=headers, data=json.dumps({"sql": query}))
print("查询结果:", result.json())

关键要点说明

  1. 自动化部署:通过boto3和引导脚本实现EMR集群与Kylin的一键部署。
  2. 数据准备:Hive表直接映射S3数据,避免数据迁移。
  3. Cube优化:按日期分区和自动合并策略提升查询性能。
  4. 成本控制:使用Spot实例和EMR自动伸缩降低资源成本。
  5. 安全实践:在AWS中配置VPC和安全组限制访问来源IP。

实际部署时需替换代码中的占位符(如S3路径、EMR主节点IP),并根据数据规模调整EMR集群配置。


http://www.ppmy.cn/ops/155761.html

相关文章

【自开发工具介绍】SQLSERVER的ImpDp和ExpDp工具03

SQLSERVER的ImpDp和ExpDp工具 1、全部的表导出&#xff08;仅表结构导出&#xff09; 2、导出的表结构&#xff0c;导入到新的数据库 导入前&#xff0c;test3数据没有任何表 导入 导入结果确认&#xff1a;表都被做成&#xff0c;但是没有数据 3、全部的表导出&#x…

MySQL事务详解

MySQL事务详解 概念事务四个特性事务三种运行模式事务保存点事务使用原则数据库读现象事务四种隔离级别 概念 事务(Transaction)&#xff0c;顾名思义就是要做的或所做的事情&#xff0c;数据库事务指的则是作为单个逻辑工作单元执行的一系列操作(SQL语句)。这些操作要么全部执…

LLMs:open-r1(完全-完整-开放式-复现DeepSeek-R1)的简介、安装和使用方法、案例应用之详细攻略

LLMs&#xff1a;open-r1(完全-完整-开放式-复现DeepSeek-R1)的简介、安装和使用方法、案例应用之详细攻略 目录 相关文章 LLMs之MoE之DeepSeek&#xff1a;《DeepSeek-V3 Technical Report》翻译与解读 LLMs之MoE之DeepSeek-V3&#xff1a;DeepSeek-V3的简介、安装和使用方…

sql主从同步

今天给大家介绍两种mysql的主从同步方式&#xff1a;第一种是基于binlogzhu主从同步&#xff1b;第二种就是基于gtid的主从同步方式。 首先给大家介绍一下什么是sql的主从复制。 主从复制&#xff1a; 通过将MySQL的某一台主机&#xff08;master&#xff09;的数据复制到其…

3.[羊城杯2020]easyphp

打开题目页面如下 给出PHP源码&#xff0c;进行代码审计 <?php// 使用 scandir 函数扫描当前目录&#xff08;即 ./ 表示的当前目录&#xff09;&#xff0c;将目录下的所有文件和文件夹名存储到 $files 数组中$files scandir(./); // 遍历 $files 数组&#xff0c;对数组…

LeetCode 344: 反转字符串

LeetCode 344: 反转字符串 - C语言题解 这道题的目标是反转一个字符数组&#xff08;字符串&#xff09;。我们将通过双指针法来实现这一功能。 代码实现 #include <stdio.h>void reverseString(char* s, int sSize) {int left 0, right sSize - 1; // 定义左右指针…

pytorch实现循环神经网络

人工智能例子汇总&#xff1a;AI常见的算法和例子-CSDN博客 PyTorch 提供三种主要的 RNN 变体&#xff1a; nn.RNN&#xff1a;最基本的循环神经网络&#xff0c;适用于短时依赖任务。nn.LSTM&#xff1a;长短时记忆网络&#xff0c;适用于长序列数据&#xff0c;能有效解决…

自动化数据备份与恢复:让数据安全无忧

友友们好! 我的新专栏《Python进阶》正式启动啦!这是一个专为那些渴望提升Python技能的朋友们量身打造的专栏,无论你是已经有一定基础的开发者,还是希望深入挖掘Python潜力的爱好者,这里都将是你不可错过的宝藏。 在这个专栏中,你将会找到: ● 深入解析:每一篇文章都将…