華為雲API接入MySQL數據(優化版)
目的:為了獲取華為雲每月賬單,對應API:
https://support.huaweicloud.com/api-oce/mbc_00008.html?ticket=ST-8209549-9rRSxR7PabAB4dKwttvz3Dpb-sso
1.讀取配置文件
config.py
import os
import yaml
class ConfigParser(object):config_file = os.path.dirname(os.path.realpath(__file__)) + '/config.yaml'configs = yaml.load(open(config_file, 'r'), yaml.FullLoader)@classmethoddef get(cls, server='config', key=None):if not cls.configs:cls.configs = yaml.load(open(cls.config_file, 'r'))section = cls.configs.get(server, None)if section is None:raise NotImplementedErrorvalue = section.get(key, None)if value is None:raise NotImplementedErrorreturn value
config.yaml
具體配置
config:conn:host: ipport: 3306db: 'database'user: userpassword: passwdcharset: 'utf8mb4'key:ak: 華為雲aksk: 華為雲sk
2.MySQL建表
module.py
from sqlalchemy import Column, String, JSON, Integer, DOUBLE
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import configBase = declarative_base()configs = config.ConfigParser()
if configs is None:print('can not load config file')exit(0)
cn = configs.get(key='conn')
print('success load config file')sqlalchemy_database_url = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}' \.format(cn['user'], cn['password'], cn['host'], cn['port'], cn['db'], cn['charset'], )
engine = create_engine(sqlalchemy_database_url)class CustomerMonthlyBillsSum(Base):__tablename__ = "customer_monthly_bills"id = Column(String(300), index=True, primary_key=True) # 审批编号total_count = Column(Integer) # 总条数bill_sums = Column(JSON) # 账单记录consume_amount = Column(DOUBLE) # 总金额(包含退订)debt_amount = Column(DOUBLE) # 总欠费金额coupon_amount = Column(DOUBLE) # 代金券金额flexipurchase_coupon_amount = Column(DOUBLE) # 现金券金额,预留stored_value_card_amount = Column(DOUBLE) # 储值卡金额,预留cash_amount = Column(DOUBLE) # 现金账户金额credit_amount = Column(DOUBLE) # 信用账户金额writeoff_amount = Column(DOUBLE) # 欠费核销金额measure_id = Column(DOUBLE) # 金额单位currency = Column(String(10)) # 金额单位class BillSumRecordInfo(Base):__tablename__ = "bill_sum_record_info"monthly_bills_id = Column(String(50)) # 每月總賬單idid = Column(String(300), index=True, primary_key=True) # 每月總賬單idbill_cycle = Column(String(50)) # 消费汇总数据所在账期,东八区时间,格式:YYYY-MM。bill_type = Column(Integer) # 账单类型。1:消费 2:退款 3:调账customer_id = Column(String(50)) # 消费的客户账号IDresource_type_code = Column(String(50)) # 资源类型编码service_type_code = Column(String(50)) # 云服务类型编码resource_type_name = Column(String(50)) # 资源类型名称service_type_name = Column(String(50)) # 云服务类型名称。charging_mode = Column(Integer) # 计费模式 1:包年/包月 3:按需 10:预留实例official_amount = Column(DOUBLE) # 官网价official_discount_amount = Column(DOUBLE) # 折扣金额truncated_amount = Column(DOUBLE) # 抹零金额consume_amount = Column(DOUBLE) # 应付金额coupon_amount = Column(DOUBLE) # 代金券金额flexipurchase_coupon_amount = Column(DOUBLE) # 现金券金额,预留stored_value_card_amount = Column(DOUBLE) # 储值卡金额,预留debt_amount = Column(DOUBLE) # 欠费金额。即伙伴从客户账户扣费时,客户账户金额不足,欠费的金额writeoff_amount = Column(DOUBLE) # 欠费核销金额cash_amount = Column(DOUBLE) # 现金账户金额credit_amount = Column(DOUBLE) # 信用账户金额measure_id = Column(DOUBLE) # 金额单位Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
3.寫入MySQL
import_database.py
import model
from sqlalchemy.orm import sessionmakersession = model.sessiondef customer_monthly_bills(info):obj = model.CustomerMonthlyBillsSum(id=info['id'],total_count=info['total_count'],bill_sums=info['bill_sums'],consume_amount=info['consume_amount'],debt_amount=info['debt_amount'],coupon_amount=info['coupon_amount'],flexipurchase_coupon_amount=info['flexipurchase_coupon_amount'],stored_value_card_amount=info['stored_value_card_amount'],cash_amount=info['cash_amount'],credit_amount=info['credit_amount'],writeoff_amount=info['writeoff_amount'],measure_id=info['measure_id'],currency=info['currency'],)session.merge(obj)session.commit()return objdef bill_sum_record_info(info):obj = model.BillSumRecordInfo(monthly_bills_id=info['monthly_bills_id'], # 每月總賬單idid=info['id'], # 主鍵bill_cycle=info['bill_cycle'], # 消费汇总数据所在账期,东八区时间,格式:YYYY-MM。bill_type=info['bill_type'], # 账单类型。1:消费 2:退款 3:调账customer_id=info['customer_id'], # 消费的客户账号IDresource_type_code=info['resource_type_code'], # 资源类型编码service_type_code=info['service_type_code'], # 云服务类型编码resource_type_name=info['resource_type_name'], # 资源类型名称service_type_name=info['service_type_name'], # 云服务类型名称。charging_mode=info['charging_mode'], # 计费模式 1:包年/包月 3:按需 10:预留实例official_amount=info['official_amount'], # 官网价official_discount_amount=info['official_discount_amount'], # 折扣金额truncated_amount=info['truncated_amount'], # 抹零金额consume_amount=info['consume_amount'], # 应付金额coupon_amount=info['coupon_amount'], # 代金券金额flexipurchase_coupon_amount=info['flexipurchase_coupon_amount'], # 现金券金额,预留stored_value_card_amount=info['stored_value_card_amount'], # 储值卡金额,预留debt_amount=info['debt_amount'], # 欠费金额。即伙伴从客户账户扣费时,客户账户金额不足,欠费的金额writeoff_amount=info['writeoff_amount'], # 欠费核销金额cash_amount=info['cash_amount'], # 现金账户金额credit_amount=info['credit_amount'], # 信用账户金额measure_id=info['measure_id'], # 金额单位)session.merge(obj)session.commit()return obj
4. 獲取月份數據,格式yyyy-mm
target_month.py
from datetime import datetime
from dateutil.relativedelta import relativedelta
import time_utildef get_target_month():# Start datestart_date = datetime.strptime('2023-01', '%Y-%m')# Assuming time_util.convert_cst_today() returns a string in the format 'YYYY-MM-DD'# We fetch current CST time as a string and parse itcurrent_cst_date_str = time_util.convert_cst_today()end_date_str = current_cst_date_str.split('-')[0] + '-' + current_cst_date_str.split('-')[1] # Extract 'YYYY-MM'end_date = datetime.strptime(end_date_str, '%Y-%m')# List to hold all monthsmonths = []# Current date we will incrementcurrent_date = start_datewhile current_date <= end_date:# Append current date in yyyy-MM format to the listmonths.append(current_date.strftime('%Y-%m'))# Move to the next monthcurrent_date += relativedelta(months=1)# # Print all collected months# for month in months:# print(month)## print(months)return months
# get_target_month()
5.時間工具
time_util.py
import time
import pytz
from datetime import datetime, timedelta# 時間戳轉美國和中國時區
def convert_to_cst(timestamp, time_zone):dt = datetime.fromtimestamp(timestamp, pytz.utc) # 将时间戳转换为datetime对象(UTC时间)pst = dt.astimezone(pytz.timezone(time_zone)) # 转换为美国PST时区时间return pst# return pst.strftime('%Y-%m-%d %H:%M:%S') # 格式化为字符串def convert_to_pst(timestamp):dt = datetime.fromtimestamp(timestamp/1000) # 将时间戳转换为datetime对象(UTC时间)pst = dt + timedelta(hours=7) # 增加8小时,即美国PST时间return pst.strftime('%Y-%m-%d %H:%M:%S') # 格式化为字符串def convert_cst_yesterday():target_date = convert_to_cst(time.time(), 'Asia/Shanghai')# print(target_date)# 獲取前一天的日期previous_dt_shanghai = target_date - timedelta(days=1)# 格式化輸出target_date = previous_dt_shanghai.strftime('%Y-%m-%d')return target_datedef convert_cst_today():# target_date = convert_to_cst(time.time(), 'Asia/Shanghai').strftime('%Y-%m-%d')target_date = convert_to_cst(time.time(), 'Asia/Shanghai').strftime('%Y-%m')return target_date
6. 每月賬單API接口
util.py
from huaweicloudsdkcore.auth.credentials import GlobalCredentials
from huaweicloudsdkbss.v2.region.bss_region import BssRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkbss.v2 import *import config
import jsonconfigs = config.ConfigParser()if configs is None:print('can not load config file')exit(0)
cn = configs.get(key='key')ak = cn['ak']
sk = cn['sk']def get_customer_monthly_sum(target_month, limit, offset):credentials = GlobalCredentials(ak, sk)client = BssClient.new_builder() \.with_credentials(credentials) \.with_region(BssRegion.value_of("cn-north-1")) \.build()try:request = ShowCustomerMonthlySumRequest()request.bill_cycle = target_monthrequest.limit = limitrequest.offset = offsetresponse = client.show_customer_monthly_sum(request).to_json_object()return responseexcept exceptions.ClientRequestException as e:print(e.status_code)return "error"
7.主程序
service.py
# coding: utf-8
import util
import import_database
import target_monthlimit = 1000
offset = 0if __name__ == "__main__":# target_month = '2023-07'months = target_month.get_target_month()for target_month in months:print(target_month)# 調用賬單數據data = util.get_customer_monthly_sum(target_month, limit, offset)if (data != 'error'):all_cnt = len(data['bill_sums'])# print(all_cnt)# 拼接主鍵id:偏移量+月份data['id'] = target_month + '-' + str(offset)# 寫入MySQLimport_database.customer_monthly_bills(data)# 寫入每類賬單信息for item in data['bill_sums']:item['monthly_bills_id'] = data['id']# 主鍵組成# bill_cycle,# bill_type,# customer_id,# resource_type_code,# service_type_code,# charging_modeitem['id'] = item['bill_cycle'] + '-' + str(item['bill_type']) + '-' + item['customer_id'] + '-' + item['resource_type_code'] + '-' + item['service_type_code'] + '-' + str(item['charging_mode'])item['measure_id'] = 0print(item)import_database.bill_sum_record_info(item)# # 判斷是否取完數據while all_cnt < data['total_count']:offset = limit + offsetdata = util.get_customer_monthly_sum(target_month, limit, offset)if (data != 'error'):# 拼接主鍵id:偏移量+月份data['id'] = target_month + '-' + str(offset)# 寫入MySQLimport_database.customer_monthly_bills(data)# 寫入每類賬單信息for item in data['bill_sums']:item['monthly_bills_id'] = data['id']item['id'] = item['bill_cycle'] + '-' + item['resource_type_name'] + '-' + str(item['charging_mode'])print(item)import_database.bill_sum_record_info(item)all_cnt += len(data['bill_sums'])# print(all_cnt)