7.3. 数据监控与生成本地知识库
目的:监控新生成的小红书文案,记录每一次生成的小红书文案风格。后续根据输入topic,检索与某一topic有关的文案,可以根据先前的文案风格,生成类似风格的文案。 实现思路: 1.要实现文件监控功能,需要使用watchdog
库。watchdog
是一个Python库,用于监控文件系统的变化。它提供了多种事件类型,如文件创建、修改、删除等,可以用来监控文件的变化。启动一个线程,实时监控xiaohongshu_drafts
目录下的文件变化,当有新文件生成时,调用process_new_file(file_path)
函数生成知识库。 2.process_new_file(file_path)
函数读取新文件中的内容,并调用generate_knowledge_base()
函数对新生成的文案进行文本分割、对象转换、向量化等一系列操作来生成知识库。 代码实现:
'''
Author: yeffky
Date: 2025-02-12 13:29:31
LastEditTime: 2025-02-17 14:28:11
'''
from watchdog. observers import Observer
from watchdog. events import FileSystemEventHandler
from langchain_community. document_loaders import TextLoader
from langchain_community. vectorstores import FAISS
from langchain. embeddings. huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer
from langchain. schema import Document
from text2vec import SentenceModel
import time
import osclass NewFileHandler ( FileSystemEventHandler) : def on_created ( self, event) : if not event. is_directory: file_path = event. src_pathprocess_new_file( file_path) print ( f"新增文件已加载: { file_path} " ) def process_new_file ( file_path) : loader = TextLoader( file_path, encoding= "utf-8" ) documents = loader. load( ) print ( type ( documents[ 0 ] ) ) tokenizer = AutoTokenizer. from_pretrained( "bert-base-chinese" ) text_splitter = RecursiveCharacterTextSplitter. from_huggingface_tokenizer( tokenizer, chunk_size= 256 , chunk_overlap= 0 , separators= [ '---' ] ) text_chunks = text_splitter. split_text( documents[ 0 ] . page_content) chunk_docs = [ Document( page_content= chunk) for chunk in text_chunks] embeddings = HuggingFaceEmbeddings( model_name= "BAAI/bge-base-zh" ) vector_store_dir = "./vector_store" if os. path. exists( vector_store_dir) : vector_store = FAISS. load_local( vector_store_dir, embeddings, allow_dangerous_deserialization= True ) vector_store. add_documents( chunk_docs) else : vector_store = FAISS. from_documents( chunk_docs, embeddings) vector_store. save_local( vector_store_dir) def start_observer ( ) : observer = Observer( ) observer. schedule( NewFileHandler( ) , path= "./xiaohongshu_drafts" , recursive= False ) observer. start( ) try : while True : time. sleep( 1 ) except KeyboardInterrupt: observer. stop( ) observer. join( ) if __name__ == "__main__" : start_observer( )
目的:根据先前爬取数据,与本地知识库联动,调用deepseek api完成小红书文案生成。 实现思路: 要通过deepseek 生成文案首先需要构建prompt和提问词,首先从文件中加载prompt和提问词模板,然后将前期爬取的商品数据以及本地知识库中的文案数据作为输入,构建prompt和提问词。 代码实现:
'''
Author: yeffky
Date: 2025-02-11 11:17:04
LastEditTime: 2025-02-17 15:35:13
'''
import json
import os
import requests
from datetime import datetime
import random
from langchain import FAISS
from langchain. embeddings. huggingface import HuggingFaceEmbeddings
from text2vec import SentenceModelos. environ[ 'HF_ENDPOINT' ] = 'hf-mirror.com'
today_date = datetime. now( ) . strftime( '%Y-%m-%d' )
topic = "手机推荐"
def read_json_file ( filename) : with open ( f'data/ { filename} ' , 'r' , encoding= 'utf-8' ) as f: return json. load( f)
def build_prompt ( item) : with open ( './docs/prompt.txt' , 'r' , encoding= 'utf-8' ) as f: prompt = f. read( ) embeddings = HuggingFaceEmbeddings( model_name= "BAAI/bge-base-zh" ) vector_store = FAISS. load_local( "./vector_store" , embeddings, allow_dangerous_deserialization= True ) retrieved_docs = vector_store. similarity_search( topic, k= 5 ) random. shuffle( retrieved_docs) selected_docs = retrieved_docs[ : 3 ] return f""" { prompt} ,
{ json. dumps( item, ensure_ascii= False , indent= 2 ) }
**根据以下文案风格,做出创新**: { selected_docs} **注意**:- 在结尾加入提示,数据截至当前日期: { today_date} - 每一段内容使用 --- 进行分割
""" def build_preset ( ) : with open ( './docs/preset.txt' , 'r' , encoding= 'utf-8' ) as f: preset = f. read( ) embeddings = HuggingFaceEmbeddings( model_name= "BAAI/bge-base-zh" ) print ( "embeddings加载完毕" ) vector_store = FAISS. load_local( "./vector_store" , embeddings, allow_dangerous_deserialization= True ) retrieved_docs = vector_store. similarity_search( topic, k= 5 ) random. shuffle( retrieved_docs) selected_docs = retrieved_docs[ : 3 ] preset += f"""\n **主题**: { topic} **创新要求**:- 使用 { random. choice( [ "轻松幽默" , "专业严谨" , "犀利吐槽" ] ) } 的语气- 加入 { [ "emoji表情" , "热门梗" , "互动提问" ] } 元素""" print ( preset) return preset
def get_deepseek _response ( preset, prompt, api_key) : url = "https://api.deepseek .com/chat/completions" headers = { "Authorization" : f"Bearer { api_key} " , 'Content-Type' : 'application/json' , 'Accept' : 'application/json' , } payload = json. dumps( { "messages" : [ { "content" : preset, "role" : "system" } , { "content" : prompt, "role" : "user" } ] , "model" : "deepseek -reasoner" , "frequency_penalty" : 0 , "max_tokens" : 2048 , "presence_penalty" : 0 , "response_format" : { "type" : "text" } , "stop" : None , "stream" : False , "stream_options" : None , "temperature" : 1 , "top_p" : 1 , "tools" : None , "tool_choice" : "none" , "logprobs" : False , "top_logprobs" : None } ) response = None while not response: try : response = requests. post( url, data= payload, headers= headers, timeout= 100 ) response. raise_for_status( ) if not response. json( ) : response = None print ( "没有收到响应,重试中..." ) else : print ( "收到响应,内容为:\n" + response. json( ) [ 'choices' ] [ 0 ] [ 'message' ] [ 'content' ] ) except requests. exceptions. RequestException as e: print ( f"请求失败: { str ( e) } " ) response = None return response. json( ) [ 'choices' ] [ 0 ] [ 'message' ] [ 'content' ]
def save_copywriting ( content) : base_path = f'./xiaohongshu_drafts/' filename = f"小红书_推广文案_千战系列" + today_date + ".txt" print ( content) with open ( base_path + filename, 'w' , encoding= 'utf-8' ) as f: f. write( content) print ( f"文案已保存至: { filename} " )
def analysis_data ( ) : API_KEY = os. getenv( "DEEPSEEK_API_KEY" ) JSON_FILE = f'goods_ { today_date} .json' items = read_json_file( JSON_FILE) print ( f"正在处理: { JSON_FILE} " ) prompt = build_prompt( items) preset = build_preset( ) try : response = get_deepseek _response( preset, prompt, API_KEY) save_copywriting( response) except Exception as e: print ( f"处理失败: { str ( e) } " ) if __name__ == "__main__" : analysis_data( )