Python 实现的风控系统(使用了kafka、Faust、模拟drools、redis、分布式数据库)

ops/2024/10/24 4:58:07/

以下是一个使用 Python 实现的风控系统示例,涵盖以下技术组件:

  1. Kafka 消息中间件:用于实时接收支付业务系统传递的交易数据。
  2. Faust(Kafka Streams 的 Python 等价):用于流式处理 Kafka 中的消息。
  3. 规则引擎:使用 Python 实现简单的规则评估逻辑,模拟 Drools 的功能。
  4. Redis 内存数据库:用于存储风险标签,快速获取账户的风险级别。
  5. 分布式数据库:使用 SQLite 模拟,从中获取风险标签数据(当 Redis 中没有时)。

我们将构建一个简单的风控系统,流程如下:

  • 从 Kafka 中消费实时交易数据。
  • 从 Redis 获取对应的风险标签,如果没有则从分布式数据库获取并更新到 Redis。
  • 使用规则引擎对交易数据和风险标签进行评估。
  • 将评估结果返回给支付业务系统或记录下来。
  • 实时交易模块:接收交易数据 ——> 获取风险标签(Redis) ——> 调用规则引擎 ——> 评估结果返回↓                                           ↓                          ↑
    规则引擎模块:交易数据 + 风险标签 ---> 规则执行 ----> 输出评估结果(通过/拒绝)
    

     

项目结构和依赖

1. 项目结构

risk_control_demo/
├── app.py                      # 主应用程序
├── models.py                   # 数据模型定义
├── rules.py                    # 规则引擎逻辑
├── database.py                 # 数据库服务类
├── redis_service.py            # Redis 服务类
├── requirements.txt            # 项目依赖
└── producer.py                 # Kafka 生产者,发送测试数据

2. 项目依赖(requirements.txt)

faust==1.10.4
redis==4.5.5
aiokafka==0.7.2
sqlite3==0.0.1

安装依赖

pip install -r requirements.txt

详细代码

1. models.py(数据模型定义)
# models.py
from dataclasses import dataclass@dataclass
class Transaction:transaction_id: straccount_id: stramount: floattimestamp: float@dataclass
class RiskTag:account_id: strrisk_level: int  # 1-低风险, 2-中风险, 3-高风险
2. database.py(数据库服务类)
# database.py
import sqlite3
from models import RiskTagclass DatabaseService:def __init__(self):# 连接 SQLite 数据库,内存模式self.conn = sqlite3.connect(':memory:')self.initialize_database()def initialize_database(self):cursor = self.conn.cursor()# 创建风险标签表cursor.execute('''CREATE TABLE IF NOT EXISTS risk_tags (account_id TEXT PRIMARY KEY,risk_level INTEGER)''')# 插入示例数据cursor.execute('''INSERT INTO risk_tags (account_id, risk_level) VALUES ('account123', 2)''')self.conn.commit()def get_risk_tag(self, account_id):cursor = self.conn.cursor()cursor.execute('SELECT risk_level FROM risk_tags WHERE account_id = ?', (account_id,))result = cursor.fetchone()if result:return RiskTag(account_id, result[0])else:return Nonedef close(self):self.conn.close()

3. redis_service.py(Redis 服务类)

# redis_service.py
import redis
from models import RiskTagclass RedisService:def __init__(self, host='localhost', port=6379):self.redis_client = redis.Redis(host=host, port=port, decode_responses=True)def get_risk_tag(self, account_id):risk_level = self.redis_client.get(f'risk:{account_id}')if risk_level:return RiskTag(account_id, int(risk_level))return Nonedef set_risk_tag(self, risk_tag):self.redis_client.set(f'risk:{risk_tag.account_id}', risk_tag.risk_level)def close(self):self.redis_client.close()

 4. rules.py(规则引擎逻辑)

# rules.py
from models import Transaction, RiskTagclass RiskEvaluator:def evaluate(self, transaction: Transaction, risk_tag: RiskTag) -> bool:"""返回 True 表示交易存在风险,需要阻止。返回 False 表示交易安全,可以通过。"""# 高风险交易规则if transaction.amount > 10000 and risk_tag.risk_level == 3:print(f"检测到高风险交易:{transaction}")return True  # 阻止交易# 中风险交易规则if 5000 < transaction.amount <= 10000 and risk_tag.risk_level >= 2:print(f"检测到中风险交易:{transaction}")return True  # 阻止交易# 低风险交易规则print(f"交易通过:{transaction}")return False  # 允许交易

5. app.py(主应用程序)

# app.py
import faust
import asyncio
import json
from models import Transaction, RiskTag
from database.py import DatabaseService
from redis_service import RedisService
from rules import RiskEvaluator# 定义 Faust 应用
app = faust.App('risk_control_app',broker='kafka://localhost:9092',value_serializer='raw',
)# 定义 Kafka 主题
transaction_topic = app.topic('transaction_topic')# 初始化服务
redis_service = RedisService()
database_service = DatabaseService()
risk_evaluator = RiskEvaluator()@app.agent(transaction_topic)
async def process_transaction(stream):async for event in stream:try:# 解析交易数据data = json.loads(event)transaction = Transaction(transaction_id=data['transaction_id'],account_id=data['account_id'],amount=data['amount'],timestamp=data['timestamp'])# 从 Redis 获取风险标签risk_tag = redis_service.get_risk_tag(transaction.account_id)if not risk_tag:# 如果 Redis 中没有,从数据库获取并更新到 Redisrisk_tag = database_service.get_risk_tag(transaction.account_id)if risk_tag:redis_service.set_risk_tag(risk_tag)else:# 如果数据库中也没有,设定默认风险标签risk_tag = RiskTag(transaction.account_id, 1)# 使用规则引擎进行风险评估is_risky = risk_evaluator.evaluate(transaction, risk_tag)# 根据评估结果进行处理if is_risky:print(f"交易 {transaction.transaction_id} 存在风险,执行阻止操作")# TODO: 将结果返回给支付业务系统,阻止交易else:print(f"交易 {transaction.transaction_id} 安全,允许通过")# TODO: 将结果返回给支付业务系统,允许交易except Exception as e:print(f"处理交易时发生错误:{e}")if __name__ == '__main__':app.main()

注释:

  • 使用 Faust 定义 Kafka Streams 应用程序,处理 transaction_topic 中的消息。
  • process_transaction 函数中,逐条处理交易数据。
  • 从 Redis 获取风险标签,如果没有则从数据库获取并更新到 Redis。
  • 使用自定义的 RiskEvaluator 进行风险评估,根据评估结果执行相应的操作

6. producer.py(Kafka 生产者,发送测试数据)

# producer.py
from kafka import KafkaProducer
import json
import timeproducer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8')
)# 创建示例交易数据
transaction_data = {'transaction_id': 'tx1001','account_id': 'account123','amount': 12000.0,'timestamp': time.time()
}# 发送交易数据到 Kafka
producer.send('transaction_topic', transaction_data)
producer.flush()
print(f"已发送交易数据:{transaction_data}")
producer.close()

运行示例

1. 启动必要的服务

注意事项


总结

上述示例提供了一个基本的 Python 程序框架,演示了如何将 Kafka、Faust、Redis、规则引擎和分布式数据库集成在一起,完成实时风控的基本功能。您可以根据具体的业务需求和技术环境,对程序进行扩展和优化。

扩展建议:

  • Redis:确保 Redis 服务在本地的 6379 端口运行

  • redis-server
    

    Kafka:确保 Kafka 服务在本地的 9092 端口运行,并创建主题 transaction_topic

  • # 启动 Zookeeper
    zookeeper-server-start.sh config/zookeeper.properties
    # 启动 Kafka
    kafka-server-start.sh config/server.properties
    # 创建主题
    kafka-topics.sh --create --topic transaction_topic --bootstrap-server localhost:9092
    

    2. 运行应用程序

  • 启动风控系统(app.py):

  • python app.py worker -l info
    

    运行 Kafka 生产者,发送交易数据(producer.py):

  • python producer.py
    

    3. 预期输出

    风控系统将处理交易数据,使用规则引擎进行评估,并根据规则打印评估结果。例如:

  • 检测到高风险交易:Transaction(transaction_id='tx1001', account_id='account123', amount=12000.0, timestamp=...)
    交易 tx1001 存在风险,执行阻止操作
    

    说明

  • Faust:Python 的流式处理库,类似于 Kafka Streams,用于处理 Kafka 中的消息流。
  • 规则引擎:使用 Python 自定义规则评估逻辑,模拟 Drools 的功能。
  • Redis:作为缓存,存储风险标签,快速获取账户的风险级别。
  • 分布式数据库(SQLite 模拟):当 Redis 中没有风险标签时,从数据库获取,并更新到 Redis。
  • 风险标签:简单地使用风险级别(1-低风险,2-中风险,3-高风险)来表示。
  • 异常处理:在实际应用中,需要更完善的异常处理机制,防止因异常导致程序崩溃。
  • 引入异步 Redis 客户端:使用 aioredis 提升 Redis 操作的性能。
  • 使用真正的分布式数据库:替换 SQLite,使用例如 PostgreSQL、MySQL 等数据库,并配置集群模式。
  • 完善规则引擎:使用现有的 Python 规则引擎库(如 durable_rulesexperta)实现更复杂的规则逻辑。
  • 添加日志和监控:集成日志系统和监控工具,便于维护和故障排查。
    • 性能优化:对于高并发场景,需要考虑异步 I/O、连接池等技术优化性能。
    • 配置管理:将硬编码的配置(如主机地址、端口、主题名)提取到配置文件或环境变量中,便于管理和修改。
    • 安全性:在生产环境中,注意保护敏感信息,确保数据传输和存储的安全。

http://www.ppmy.cn/ops/128011.html

相关文章

axios直接上传binary

axios直接上传二进制文件 、 axios直接上传apk、axios直接上传binary postman中的参数选项中有个binary&#xff0c;平常我们很少使用&#xff0c;可能有的同学遇到这种情况不太会了&#xff0c;认为后端应该有个字段名来接收&#xff0c;或者使用 Formdata&#xff0c;但其实…

报表工具怎么选?山海鲸VS帆软,哪个更适合你?

概述 在国产报表软件市场中&#xff0c;山海鲸报表和帆软这两款工具都占有一席之地&#xff0c;许多企业在选择报表工具时常常在它们之间徘徊。然而&#xff0c;随着企业对数据分析需求的不断增长和复杂化&#xff0c;如何选取一款高效、易用且性价比高的报表工具&#xff0c;…

中小型医院网站:Spring Boot开发策略

2 相关技术简介 2.1 Java技术 Java是一种非常常用的编程语言&#xff0c;在全球编程语言排行版上总是前三。在方兴未艾的计算机技术发展历程中&#xff0c;Java的身影无处不在&#xff0c;并且拥有旺盛的生命力。Java的跨平台能力十分强大&#xff0c;只需一次编译&#xff0c;…

15分钟学Go 第1天:Go语言简介与特点

Go语言简介与特点 1. Go语言概述 Go语言&#xff08;又称Golang&#xff09;是由谷歌于2007年开发并在2009年正式发布的一种开源编程语言。它旨在简单、高效地进行软件开发&#xff0c;尤其适合于网络编程和分布式系统。 1.1 发展背景 多核处理器&#xff1a;随着计算机硬件…

SOLID 原则:编写可扩展且可维护的代码

有人告诉过你&#xff0c;你写的是“糟糕的代码”吗&#xff1f; 如果你有&#xff0c;那真的没什么可羞愧的。我们在学习的过程中都会写出有缺陷的代码。好消息是&#xff0c;改进起来相当简单——但前提是你愿意。 改进代码的最佳方法之一是学习一些编程设计原则。你可以将…

毕业设计项目系统:基于Springboot框架的心理咨询评估管理系统,完整源代码+数据库+毕设文档+部署说明

本文关键字&#xff1a;Java编程&#xff1b;Springboot框架&#xff1b;毕业设计&#xff1b;毕设项目&#xff1b;编程实战&#xff1b;医护人员管理系统&#xff1b;项目源代码&#xff1b;程序数据库&#xff1b;毕设文档&#xff1b;开题报告和任务书&#xff1b;项目部署…

Android 12.0进程保活白名单功能实现

在Android 12.0系统中&#xff0c;实现进程保活白名单功能是为了确保某些重要的应用程序即使进入后台也能长时间保持运行状态&#xff0c;不被系统自动杀死。这一功能的实现涉及多个核心类和文件&#xff0c;以下是具体的实现步骤和核心功能分析&#xff1a; 一、实现步骤 …

【python Arrow库】一个处理日期和时间的Python库

Arrow库 引言&#xff1a;箭&#xff0c;不仅仅是武器1、安装&#xff1a;搭弓上箭2、基础&#xff1a;箭头的构造3、实战&#xff1a;箭无虚发3.1 案例一&#xff1a;时间比较3.2 案例二&#xff1a;时间格式化3.3 案例三&#xff1a;时区转换 4、结语&#xff1a;箭已离弦 引…