pip3 install redis
1.创建 Redis 连接池
文件地址:db/redis_db.py
代码尝试连接到本地运行的 Redis 服务器,并设置数据库为 1,最大连接数为 20。
如果连接过程中出现任何异常,它会捕获异常并打印错误信息。
import redis
try:pool = redis.ConnectionPool(host='localhost',port=6379,db=1,max_connections=20)
except Exception as e:print(e)
2.查找用于缓存的记录
文件地址:db/news_dao.py
用 sql 语言在 Navicat 中寻找要作为缓存的记录,
目的是将审批通过的信息,存进 Redis 数据库中。
#查找用于缓存的记录def search_cache(self, id):# 创建异常try:# 获得连接项conn = pool.get_connection()# 获得游标cursor = conn.cursor()# 创建sqlsql = """SELECT n.title,u.username,t.type,n.content_id,n.is_top,n.create_timeFROM t_news n join t_type t on n.type_id=t.idjoin t_user u on n.editor_id=u.idwhere n.id=%s"""# 执行sqlcursor.execute(sql, [id])# 获得查询结果result = cursor.fetchone()return result# 返回获得结果except Exception as e:print(e)finally:if "conn" in dir():conn.close()
3.操作Redis数据库
3.1 创建操作类
文件地址:db/redis_news_dao.py
创建一个 Python 类 RedisNewsDao
,它包含两个方法:insert
和 delete
,
分别用于添加与删除 Redis 数据库中的新闻数据。
3.1.1 添加数据
from db.redis_db import pool
import redis
class RedisNewsDao:#添加数据def insert(self,id,title,username,type,content,is_top,create_time):conn = redis.Redis(connection_pool=pool,)try:#向redis中存放数据conn.hmset(id,{'title':title,'author':username,'type':type,'content':content,'is_top':is_top,'create_time':create_time})#如果是今日热文则保留一天后销毁if is_top == 0 :conn.expire(id,24*60*60)except Exception as e:print(e)finally:del conn#删除数据def delete(self,id):conn = redis.Redis(connection_pool=pool)try:conn.delete(id)except Exception as e:print(e)finally:del conn
3.1.2 删除数据
from db.redis_db import pool
import redis
class RedisNewsDao:#删除数据def delete(self,id):conn = redis.Redis(connection_pool=pool)try:conn.delete(id)except Exception as e:print(e)finally:del conn
3.2 创建业务类
文件地址:service/news_service.py
from db.news_dao import NewsDao
from db.redis_news_dao import RedisNewsDao
class NewsService:#实例化新闻Dao对象__news_dao = NewsDao()__redis_news_dao = RedisNewsDao()#查找用户管理中的记录def search_cache(self,id):result = self.__news_dao.search_cache(id)return result#向Redis中保存缓存的新闻def cache_news(self,id,title,username,type,content,is_top,create_time):self.__redis_news_dao.insert(id,title,username,type,content,is_top,create_time)#删除缓存新闻def delete_cache(self,id):self.__redis_news_dao.delete(id)
4.更改Navicat与Redis数据库
4.1 创建操作类
# 删除新闻
def delete_by_id(self, id):try:conn = pool.get_connection()conn.start_transaction()cursor = conn.cursor()sql = "delete from t_news where id=%s"cursor.execute(sql, [id])conn.commit()except Exception as e:print(e)if "conn" in dir():conn.rollback()finally:if "conn" in dir():conn.close()# 添加新闻
def insert_news(self, title, editor_id, type_id, content_id, is_top):try:conn = pool.get_connection()conn.start_transaction()cursor = conn.cursor()sql = """insert into t_news(title,editor_id,type_id,content_id,is_top,state) values (%s,%s,%s,%s,%s,%s)"""cursor.execute(sql, (title, editor_id, type_id, content_id, is_top, "待审批"))conn.commit()except Exception as e:print(e)if "conn" in dir():conn.rollback()finally:if "conn" in dir():conn.close()# 查找用于缓存的记录
def search_cache(self, id):# 创建异常try:# 获得连接项conn = pool.get_connection()# 获得游标cursor = conn.cursor()# 创建sqlsql = """SELECT n.title,u.username,t.type,n.content_id,n.is_top,n.create_timeFROM t_news n join t_type t on n.type_id=t.idjoin t_user u on n.editor_id=u.idwhere n.id=%s"""# 执行sqlcursor.execute(sql, [id])# 获得查询结果result = cursor.fetchone()return result# 返回获得结果except Exception as e:print(e)finally:if "conn" in dir():conn.close()# 修改id查找新闻
def search_by_id(self, id):# 创建异常try:# 获得连接项conn = pool.get_connection()# 获得游标cursor = conn.cursor()# 创建sqlsql = """SELECT n.title,t.type,n.is_topFROM t_news n join t_type t on n.type_id=t.idwhere n.id=%s"""# 执行sqlcursor.execute(sql, [id])# 获得查询结果result = cursor.fetchone()return result# 返回获得结果except Exception as e:print(e)finally:if "conn" in dir():conn.close()# 更新修改新闻
def update(self, id, title, type_id, content_id, is_top):try:conn = pool.get_connection()conn.start_transaction()cursor = conn.cursor()sql = "update t_news set title=%s,type_id=%s,content_id=%s,is_top=%s,state=%s,update_time=now() where id=%s"cursor.execute(sql, (title, type_id, content_id, is_top, "待审批", id))conn.commit()except Exception as e:print(e)if "conn" in dir():conn.rollback()finally:if "conn" in dir():conn.close()
4.2 创建操作类
#删除新闻
def delete_news(self,id):self.__news_dao.delete_by_id(id)
#添加新闻
def insert_news(self,title,editor_id,type_id,content_id,is_top):self.__news_dao.insert_news(title, editor_id, type_id, content_id, is_top)
#查找用户管理中的记录
def search_cache(self,id):result = self.__news_dao.search_cache(id)return result
#向Redis中保存缓存的新闻
def cache_news(self,id,title,username,type,content,is_top,create_time):self.__redis_news_dao.insert(id,title,username,type,content,is_top,create_time)
#删除缓存新闻
def delete_cache(self,id):self.__redis_news_dao.delete(id)
#根据id查询新闻信息
def search_by_id(self,id):result = self.__news_dao.search_by_id(id)return result
#更新修改新闻信息
def update(self,id,title,type_id,content_id,is_top):self.__news_dao.update(id,title,type_id,content_id,is_top)self.delete_cache(id)
5.实现Redis操作
文件地址:app_py.py
5.1 添加数据
opt = input("\n\t请输入操作编号") # 等待控制台输入
if opt == "prev" and page > 1:page += -1
elif opt == "next" and page < count_page:page += 1
elif opt == "back":break
elif int(opt) >= 1 and int(opt) <= 5: # 由于控制台获得的是字符串类型,需要转换成数字类型进行比较# 获得新闻id值news_id = result[int(opt) - 1][0] # 获得对应行数及列# 调用news_service的审批函数__news_service.update_unreview_news(news_id)# 新闻缓存 👇result = __news_service.search_cache(news_id)title = result[0]username = result[1]type = result[2]content_id = result[3]# 查询新闻正文content = "100"is_top = result[4]create_time = str(result[5])__news_service.cache_news(news_id, title, username, type, content, is_top, create_time)👆
5.2 删除数据
opt = input("\n\t请输入操作编号") # 等待控制台输入
if opt == "prev" and page > 1:page += -1
elif opt == "next" and page < count_page:page += 1
elif opt == "back":break👇
elif int(opt) >=1 and int(opt) <=5 :news_id = result[int(opt) - 1][0]result = __news_service.search_by_id(news_id)title = result[0]type = result[1]is_top = result[2]print("\n\t新闻原标题:%s"%(title))new_title = input("\n\t新标题")print("\n\t新闻原类型:%s"%(type))result = __type_service.search_list()for index in range(len(result)):t = result[index]print(Fore.LIGHTBLUE_EX,"\n\t%d.%s"%(index+1,t[1]))print(Style.RESET_ALL)opt = input("\n\t类型编号:")new_type = result[int(opt) - 1][0]content_id = 10print("原置顶级别%s"%(is_top))new_is_top = input("\n\t置顶级别(0-5):")is_commit = input("\n\t是否提交?(Y/N)")if is_commit == 'y' or is_commit == 'Y':__news_service.update(news_id,new_title, new_type, content_id, new_is_top)print("\n\t保存成功(3秒自动返回)")time.sleep(3)👆
5.3 更改数据
print(Fore.LIGHTGREEN_EX," \n\t1.发表新闻")
print(Fore.LIGHTGREEN_EX, "\n\t2.编辑新闻")
print(Fore.LIGHTGREEN_EX, "\n\t3.退出登录")
print(Fore.LIGHTGREEN_EX, "\n\t4.退出系统")
print(Style.RESET_ALL)
opt = input("\n\t输入操作编号")
if opt == "1":...
elif opt == "2":
page = 1
while True:os.system("cls")count_page = __news_service.search_count_page()result = __news_service.search_list(page)for index in range(len(result)):new = result[index]print(Fore.LIGHTBLUE_EX, "\n\t%d\t%s\t%s\t%s\t" % (index + 1, new[1], new[2], new[3]))# 输入在新闻列表中一共有多少页,当前多少页print(Fore.LIGHTBLUE_EX, "\n\t%d/%d" % (page, count_page))print(Fore.LIGHTBLUE_EX, "\n\t------------------")print(Fore.LIGHTBLUE_EX, "\n\tprev.上一页")print(Fore.LIGHTBLUE_EX, "\n\tnext.下一页")print(Fore.LIGHTBLUE_EX, "\n\tback.返回上一层")print(Style.RESET_ALL)opt = input("\n\t请输入操作编号") # 等待控制台输入if opt == "prev" and page > 1:page += -1elif opt == "next" and page < count_page:page += 1elif opt == "back":breakelif int(opt) >= 1 and int(opt) <= 5:news_id = result[int(opt) - 1][0]result = __news_service.search_by_id(news_id)title = result[0]type = result[1]is_top = result[2]print("\n\t新闻原标题:%s" % (title))new_title = input("\n\t新标题")print("\n\t新闻原类型:%s" % (type))result = __type_service.search_list()for index in range(len(result)):t = result[index]print(Fore.LIGHTBLUE_EX, "\n\t%d.%s" % (index + 1, t[1]))print(Style.RESET_ALL)opt = input("\n\t类型编号:")new_type = result[int(opt) - 1][0]content_id = 10print("原置顶级别%s" % (is_top))new_is_top = input("\n\t置顶级别(0-5):")is_commit = input("\n\t是否提交?(Y/N)")if is_commit == 'y' or is_commit == 'Y':__news_service.update(news_id, new_title, new_type, content_id, new_is_top)print("\n\t保存成功(3秒自动返回)")time.sleep(3)