使用Python实现对接Hadoop集群(通过Hive)并提供API接口

embedded/2024/11/17 20:12:06/

安装必要的库

首先,确保已经安装了以下库:

python">pip install flask
pip install pyhive

代码实现

1. app.py(主应用文件)

python">from flask import Flask, jsonify, request, abort
from pyhive import hive
import re
from datetime import datetimeapp = Flask(__name__)# Hive连接配置
HIVE_HOST = "hadoop-cluster-ip"
HIVE_PORT = 10000
HIVE_USERNAME = "your_username"
HIVE_PASSWORD = "your_password"# 日期格式正则表达式,用于校验输入的日期格式
DATE_FORMAT_REGEX = re.compile(r'^\d{4}-\d{2}-\d{2}$')def validate_date(date_str):"""校验日期字符串是否符合指定格式(YYYY-MM-DD)"""if not DATE_FORMAT_REGEX.match(date_str):raise ValueError("日期格式不正确,请使用YYYY-MM-DD格式。")try:datetime.strptime(date_str, '%Y-%m-%d')return Trueexcept ValueError:raise ValueError("日期格式不正确,请使用YYYY-MM-DD格式。")@app.route('/api/orders', methods=['GET'])
def get_orders():start_date = request.args.get('start_date')end_date = request.args.get('end_date')# 校验日期参数格式try:if start_date:validate_date(start_date)if end_date:validate_date(end_date)except ValueError as e:abort(400, description=str(e))try:# 连接Hiveconnection = hive.connect(host=HIVE_HOST,port=HIVE_PORT,username=HIVE_USERNAME,password=HIVE_PASSWORD)cursor = connection.cursor()# 构建查询语句,添加必要的防止SQL注入的处理query = f"SELECT order_id, order_date, order_amount FROM orders WHERE order_date BETWEEN '{start_date}' AND '{end_date}'"query = query.replace("'", "''")  # 将单引号替换为两个单引号,防止SQL注入cursor.execute(query)results = cursor.fetchall()# 将结果转换为字典列表形式orders = []for row in results:order = {"order_id": row[0],"order_date": row[1],"order_amount": row[2]}orders.append(order)cursor.close()connection.close()return jsonify(orders)except hive.DatabaseError as e:# 针对Hive数据库相关错误进行更详细的错误处理abort(500, description=f"Hive数据库错误: {str(e)}")except Exception as e:abort(500, description=f"其他错误: {str(e)}")if __name__ == '__main__':app.run(debug=True)

代码解析

输入参数校验

定义了validate_date函数,通过正则表达式和datetime.strptime来严格校验输入的日期参数是否符合YYYY-MM-DD格式。如果不符合格式,将直接返回400错误给客户端,提示正确的日期格式要求。

错误处理

●	在get_orders函数中,对可能出现的不同类型的错误进行了更细致的处理。对于Hive数据库相关的错误(如连接失败、查询失败等),会返回500错误并明确告知是Hive数据库错误及具体错误信息。对于其他一般性的错误,同样返回500错误并给出相应的错误描述。

安全防护(防止SQL注入)

●	在构建查询语句时,对输入的日期参数进行了处理,将单引号替换为两个单引号。这样可以在一定程度上防止SQL注入攻击,确保查询语句的安全性。

单元测试

  • 以下是使用Python的unittest模块对代码进行单元测试的示例:
python">import unittest
from unittest.mock import patch
from app import app, validate_dateclass TestApp(unittest.TestCase):def setUp(self):self.app = app.test_client()def test_validate_date_valid(self):self.assertTrue(validate_date('2024-11-10'))def test_validate_date_invalid_format(self):with self.assertRaises(ValueError) as context:validate_date('2024/11/10')self.assertEqual(str(context.exception), "日期格式不正确,请使用YYYY-MM-DD格式。")def test_validate_date_invalid_value(self):with self.assertRaises(ValueError) as context:validate_date('2024-13-32')self.assertEqual(str(context.exception), "日期格式不正确,请使用YYYY-MM-DD格式。")@patch('app.hive.connect')def test_get_orders_success(self, mock_connect):# 模拟查询结果mock_cursor = mock_connect.return_value.cursor.return_valuemock_cursor.fetchall.return_value = [(1, '2024-11-10', 100.0)]response = self.app.get('/api/orders?start_date=2024-11-10&end_date=2024-11-10')self.assertEqual(response.status_code, 200)self.assertEqual(response.get_json(), [{"order_id": 1, "order_date": "2024-11-10", "order_amount": 100.0}])def test_get_orders_missing_parameters(self):response = self.app.get('/api/orders')self.assertEqual(response.status_code, 400)self.assertEqual(response.get_json()['description'], "日期格式不正确,请使用YYYY-MM-DD格式。")@patch('app.hive.connect')def test_get_orders_database_error(self, mock_connect):mock_connect.side_effect = hive.DatabaseError("模拟数据库错误")response = self.app.get('/api/orders?start_date=2024-11-10&end_date=2024-11-10')self.assertEqual(response.status_code, 500)self.assertEqual(response.get_json()['description'], "Hive数据库错误: 模拟数据库错误")@patch('app.hive.connect')def test_get_orders_general_error(self, mock_connect):mock_connect.side_effect = Exception("模拟一般错误")response = self.app.get('/api/orders?start_date=2024-11-10&end_date=2024-11-10')self.assertEqual(response.status_code, 500)self.assertEqual(response.get_json()['description'], "其他错误: 模拟一般错误")if __name__ == '__main__':unittest.main()

代码块解析

上述单元测试代码主要涵盖了以下几个方面:

测试validate_date函数

• test_validate_date_valid测试了validate_date函数对于有效日期格式的验证是否正确。

• test_validate_date_invalid_format和test_validate_date_invalid_value分别测试了对于无效日期格式和无效日期值的情况,是否能正确抛出ValueError异常并给出正确的错误信息。

测试get_orders函数

• test_get_orders_success通过patch模拟了hive.connect和查询结果,测试了get_orders函数在正常情况下是否能正确返回查询结果和状态码200 。

• test_get_orders_missing_parameters测试了在缺少查询参数时,是否能正确返回400错误及相应的错误描述。

• test_get_orders_database_error和test_get_orders_general_error分别模拟了hive.DatabaseError和一般Exception的情况,测试了get_orders函数在出现不同类型错误时是否能正确返回500错误及相应的错误描述。

通过这些单元测试,可以较为全面地验证优化后的代码的正确性和可靠性,确保各个功能模块能够按照预期工作。


http://www.ppmy.cn/embedded/138330.html

相关文章

统信操作系统离线安装JDK、Nginx、elasticsearch、kibana、ik、pinyin

准备:挂载光盘 1、查看设备名称 查看光盘的名称,通常是以 /dev/sr0 或者类似格式显示lsblk 2、创建挂载点并挂载光盘 创建挂载目录sudo mkdir /mnt/cdrom 进行光盘目录挂载,/dev/sr0 要替换为实际查看到的光盘设备名称sudo mount /dev/s…

职场汇报技巧:选择合适的汇报形式与提供数据依据

在职场中,汇报工作是不可避免的。我们常常有一种感觉,即“向上哄好,向下唬住”。制作精美的PPT、制定各种不近人情的规范制度等,似乎成为了有效的证据,仿佛在揭露事实一样。 这种感觉本身并没有什么大问题&#xff0c…

基于STM32智能电流表

采用STM32F103C8T6微控制器为核心,设计了一款精密的电流表。该电流表通过精确采集采样电阻上的分压信号,并进行信号放大处理,随后利用ADC(模数转换器)高效地捕获放大后的电压信号,通过一系列算法运算&#…

【jvm】如何判断一个对象是否可以回收

目录 1.引用计数法1.1 原理1.2 缺点 2.可达性分析算法2.1 原理2.2 GC Roots2.3 标记-清除阶段 3.引用类型 1.引用计数法 1.1 原理 1.为每个对象创建一个引用计数,当有对象引用该对象时,计数器加1。 2.当引用失效时,计数器减1。 3.当计数器的…

mindtorch study

安装 pip install mindtorch mindtorch 用于帮助迁移torch模型到mindspore 大部分都可以直接把mindtorch的torch搞成torch,就和以前的代码一致,注意下面 只有静态图有点点差异 step也有差异 自定义优化器就麻烦了。 pyttorch还是牛啊 并行计算还是用的…

06.VSCODE:备战大项目,CMake专项配置

娇小灵活的简捷配置不过是年轻人谈情说爱的玩具,帝国大厦的构建,终归要交给CMake去母仪天下。一个没有使用 CMake 的 C 项目,就像未来世界里的一台相声表演,有了德纲却无谦,观众笑着遗憾。—— 语出《双城记》作者&…

时间序列数据结构、持久数据结构详细解读

一、时间序列数据结构 (Time Series Data Structures) 时间序列数据结构 专门设计用于存储、查询和分析 时间序列数据,即一组按时间顺序排列的数据点。这些数据结构在金融分析、物联网监控、传感器数据收集等场景中应用广泛。常见的时间序列数据结构包括 时间序列数…

RabbitMQ 在 Java 和 Spring Boot 中的应用详解

1. 引言 RabbitMQ 是一种开源消息代理软件,广泛用于实现消息传递、队列管理和负载均衡。它通过实现 AMQP(Advanced Message Queuing Protocol)来支持复杂的消息传递模式,是常见的消息中间件之一。本文将深入探讨如何在纯 Java 环…