(RAG系列)FastGPT批量添加索引
- 引言
- 版本
- 使用说明
- 脚本代码
引言
索引制作:
-
通过模型对分块内容进行概况
-
根据文本内容划分特点,例如,文档有明显的大小标题,把标题作为索引
…
版本
fastgpt v4.8.10
使用说明
根据知识库文档分块内容,提炼相对应的索引,制作索引文件 (xlsx文件)。注意索引要跟分块数量一一对应,不添加索引的把索引设置成 no,而且,该操作会直接覆盖已存在的索引,适用于还未添加索引的场景
脚本代码
import requests
import json
import pandas as pd
import osdef batch_add_index(headers: dict, datasetid: str, get_collection_url: str, get_chunk_url: str, update_index_url: str,parentid=None, index_path=None):index_filename_list = []if index_path != None:try:index_filename_list = os.listdir(index_path)for filename in index_filename_list:if filename.endswith('.xlsx'):continueelse:return print('index_path必须是文件夹路径,并且文件夹中需要是.xlsx后缀的文件')except:return print('index_path必须是文件夹路径,并且文件夹中需要是.xlsx后缀的文件')# 循环知识库里的每一页collection_pagenum = 1while True:get_collectionId = {"pageNum": collection_pagenum,"pageSize": 20,"datasetId": datasetid,"parentId": parentid,"searchText": ""}collection_response = requests.post(url=get_collection_url, headers=headers, json=get_collectionId).json()# 如果知识库该页码为空时,结束当前知识库if collection_response['data']['data'] == []:break# 循环知识库当前页码下的内容for subset in collection_response['data']['data']:# 跳过 手动录入if subset['name'] == '手动录入':continue# 如果是文件夹if subset['type'] == 'folder':# 递归进去batch_add_index(headers, datasetid, get_collection_url, get_chunk_url, update_index_url,parentid=subset['_id'], index_path=index_path)# 如果是链接elif subset['type'] == 'link':continue# 如果时是文件else:# 特用if index_path != None:if subset['name'].replace(subset['name'][subset['name'].find('.'):], '.xlsx') in index_filename_list:print(subset['name'].replace(subset['name'][subset['name'].find('.'):], '.xlsx') + " start")df_index = pd.read_excel(os.path.join(index_path, subset['name'].replace(subset['name'][subset['name'].find('.'):], '.xlsx')))try:df_index = df_index[['index']]except:return print('xlsx文件中第一列第一行第一个单元格应是单词index')df_index_list = df_index['index'].to_list()else:continuenum = 0# 循环文件下的每一页chunk_pagenum = 1while True:get_chunkId = {"pageNum": chunk_pagenum,"pageSize": 24,"collectionId": subset['_id'],"searchText": ""}chunk_response = requests.post(url=get_chunk_url, headers=headers, json=get_chunkId).json()# 如果文件该页码为空时,结束该文件if chunk_response['data']['data'] == []:break# 循环文件当前页码下的chunkfor chunk in chunk_response['data']['data']:try:print("num" + str(num))print("chunk" + str(chunk['chunkIndex']))if(chunk['chunkIndex'] != num) :print("----------------"+ str(chunk['chunkIndex']) + "------------------------")indexes = []if df_index_list[chunk['chunkIndex']] != 'no':p_l = df_index_list[chunk['chunkIndex']].split('\n')p_l = list(set(p_l))filtered_lst = [item for item in p_l if item != '']#print(filtered_lst)for i in filtered_lst:indexes.append({'text': i})update_data = {"dataId": chunk['_id'],"q": chunk['q'],"a": chunk['a'],"indexes": indexes}except:print("********************"+ subset['name'].replace(subset['name'][subset['name'].find('.'):], '.xlsx') + "有报错***************************")update_response = requests.post(url=update_index_url, headers=headers, json=update_data).json()if update_response['code'] != 200:print(update_response)print(f'集合名称:{subset["name"]}\n集合ID:{subset["_id"]}\nchunkID:{chunk["_id"]}\nchunk页码:{chunk_pagenum}')num += 1chunk_pagenum += 1print(subset['name'].replace(subset['name'][subset['name'].find('.'):], '.xlsx') + " over")collection_pagenum += 1if __name__ == '__main__':#账号->API密钥->填在Authorizationheaders = {'Authorization': 'Bearer ','Content-Type': 'application/json',}#知识库ID->打开知识库看浏览器界面链接datasetId = ''get_collection_url = 'http://xxxx:3000/api/core/dataset/collection/list'get_chunk_url = 'http://xxxx:3000/api/core/dataset/data/list'update_index_url = 'http://xxxx:3000/api/core/dataset/data/update'#文件夹ID(如果没有文件夹此项注释)->打开知识库看浏览器界面链接parentId = ''#索引文件(添加index列)->放置索引文件位置index_path = r'D:\mnt\data\111'batch_add_index(headers, datasetId, get_collection_url, get_chunk_url, update_index_url,parentid=parentId,index_path=index_path)