python操作Elasticsearch

devtools/2024/11/26 7:23:29/

使用elasticsearch 6.x版本,操作es数据。

python">#! -*- coding:utf-8 -*
import timefrom elasticsearch import Elasticsearch, helpersclass EstUtil:_instance = Nonedef __new__(cls, *args, **kwargs):if not cls._instance:cls._instance = super(EstUtil, cls).__new__(cls, *args, **kwargs)return cls._instancedef __init__(self):# todo hosts不传有默认的,可传hosts=[]self.es = Elasticsearch(timeout=300, max_retries=3)def is_exists_index(self, index_name):"""是否存在索引:param index_name: 索引名称:return:"""print(index_name)return self.es.indices.exists(index=index_name)def get_all_indices(self, index_=None):"""获取所有索引:param index_name: 根据名称获取所有的索引:return:"""if not index_:# 只返回所有索引名all_indices = self.es.indices.get_alias().keys()else:# 返回索引的信息,状态等all_indices = self.es.cat.indices(index=index_, format="json")return all_indicesdef index(self, index_name, body):"""插入单条数据:param index_name: 索引名称:param body: 请求体数据dict:return: boolean{"name": "aaa","age": 18}"""# 如果pip安装的elasticsearch版本低于7.x,加入 doc_type='_doc'response = self.es.index(index=index_name, body=body, doc_type='_doc')# 检查响应if response['result'] == 'created':return Trueelse:return Falsedef batch_insert(self, index_name, body_list):"""批量插入数据:param index_name: 索引名称:param body_list: 请求体数据list:return:"""data_list = list()for data in body_list:action = {"_index": index_name,"_type": '_doc','_source': data}data_list.append(action)return helpers.bulk(self.es, data_list)def update(self, index_name, doc_id, update_body):""":param index_name: 索引名:param doc_id: 记录id:param update_body: 更新的dict:return: boolean"""# 如果pip安装的elasticsearch版本低于7.x,加入 doc_type='_doc'response = self.es.update(index=index_name, id=doc_id, body=update_body, doc_type="_doc")# 检查编辑是否成功if response['result'] == 'updated':return Trueelse:return Falsedef batch_update(self, index_name, body_list):"""批量编辑数据:param index_name: 索引名称:param body_list: 请求体数据list:return:"""data_list = list()for data in body_list:_id = data.pop("id")action = {"_op_type": "update","_index": index_name,'doc': data,"_id": _id}data_list.append(action)return helpers.bulk(self.es, data_list)def delete(self, index_name, doc_id):"""删除单条数据:param index_name: 索引名称:param doc_id: 记录id:return:  boolean"""response = self.es.delete(index=index_name, id=doc_id)# 检查删除是否成功if response['result'] == 'deleted':return Trueelse:return Falsedef batch_delete(self, index_name, body_list):"""批量删除数据:param index_name: 索引名称:param body_list: 请求体数据list:return:"""data_list = list()for data in body_list:_id = data.get("id")action = {"_op_type": "delete","_index": index_name,"_id": _id}data_list.append(action)return helpers.bulk(self.es, data_list)def delete_index(self, index_name):"""删除单个索引:param index_name: 索引名称:return:"""return self.es.indices.delete(index=index_name)['acknowledged']def create_index(self, index_name, field_type_dict, number_of_shards, number_of_replicas):"""传入简单的键值对:param index_name 索引名称:param field_type_dict 字段名称,类型字典:param number_of_shards 分片数量:param number_of_replicas 副本数量:return: 创建成功"""if self.is_exists_index(index_name):raise ValueError('索引已存在:%s' % index_name)body = {}settings = {'number_of_shards': number_of_shards,'number_of_replicas': number_of_replicas}mappings = {}index_type = {}properties = {}for key, value in field_type_dict.items():properties[key] = {'type': value}index_type['properties'] = propertiesmappings['_doc'] = index_typebody['settings'] = settingsbody['mappings'] = mappingsresponse = self.es.indices.create(index=index_name, body=body)return responsedef create_index_by_body(self, index_name, body):"""自定义参数创建索引:param index_name 索引名称:param body 组装好的创建dict:return:""""""示例参数:{"settings": {"number_of_shards": 1,"number_of_replicas": 0},"aliases": {"test_log": {}},"mappings": {"properties": {"create_time": {"type": "date"},"status": {"type": "integer"},"dev_ip": {"type": "ip"},"dev_uuid": {"type": "keyword"},"user": {"properties": {"name": {"type": "text"},"age": {"type": "integer"},"email": {"type": "keyword"}}},"infos": {"type": "nested","dynamic": False,"properties": {"v_id": {"type": "keyword"},"v_name": {"type": "keyword"},"v_desc": {"type": "text"}}},"remark": {"type": "text"},"show_num": {"type": "long", "doc_values": False, "index": False}}}}"""if self.is_exists_index(index_name):raise ValueError('索引已存在:%s' % index_name)response = self.es.indices.create(index=index_name, body=body)return responsedef search(self, index_name, request_body):"""查询数据:param index_name: 索引名称:param request_body: 查询dsl:return:"""return self.es.search(index=index_name, body=request_body, timeout="5m")def search_page(self, index_name, request_body, page, size):"""分页查询数据(数据量超过10000条时,直接使用from和size参数可能会导致性能问题):param index_name: 索引名称:param request_body: 查询dsl:param page: 页码:param size: 每页条数:return:"""from_value = (page - 1) * size  # 偏移量return self.es.search(index=index_name, body=request_body, from_=from_value, size=size)def search_after(self, index_name, search_after_body):"""分页查询(基于上一次查询的最后一个文档的排序值来进行下一次查询,查询中必须包含 sort 字段,并且这个字段的值需要唯一):param index_name: 索引名称:param search_after_body 查询dsl:return:"""# search_after_body参数,search_after字段,传入最后一次的排序值,第一次不用传# search_after_body参数中,需要包含size,sort# 排序值必须有唯一性,如果具有相同的排序值,search_after无法正确地定位到下一页的开始位置,导致数据重复或遗漏,如时间排序可能重复# 采用多个字段排序,或使用类似_id的唯一值# eg:{"size": 1, "sort": [{"create_time": {"order": "desc"}}, {"_id": {"order": "asc"}}]}response = self.es.search(index=index_name, body=search_after_body)hits = response.get('hits').get('hits')search_after_body = {'search_after': hits[-1].get('sort') if hits else []}return search_after_body, responsedef search_scroll(self, index_name, scroll, body):"""分页查询,第一次执行(scroll 是一种基于游标的分页方式,每次搜索创建一个快照):param index_name: 索引名称:param scroll: scroll保持时间,1m为1分钟:param body: 查询条件体dict:return:"""# 初始化scroll查询response = self.es.search(index=index_name, scroll=scroll, body=body)scroll_id = response['_scroll_id']return scroll_id, responsedef search_next(self, scroll_id, scroll):"""与上方的search_scroll结合使用:param scroll_id: 上一次的快照id:param scroll: scroll快照保持时间,1m为1分钟,如果查询数据超过1分钟,会报错:return:"""response = self.es.scroll(scroll_id=scroll_id, scroll=scroll)scroll_id = response['_scroll_id']return scroll_id, responseif __name__ == '__main__':es = EstUtil()create_index_body = {"settings": {"number_of_shards": 1,"number_of_replicas": 0},"aliases": {"test_log": {}},"mappings": {"properties": {"create_time": {"type": "date"},"status": {"type": "integer"},"dev_ip": {"type": "ip"},"dev_uuid": {"type": "keyword"},"user": {"properties": {"name": {"type": "text"},"age": {"type": "integer"},"email": {"type": "keyword"}}},"infos": {"type": "nested","dynamic": False,"properties": {"v_id": {"type": "keyword"},"v_name": {"type": "keyword"},"v_desc": {"type": "text"}}},"remark": {"type": "text"},"show_num": {"type": "long", "doc_values": False, "index": False}}}}index_name = "test_data_index"# 创建索引create_respone = es.create_index_by_body(index_name, create_index_body)print(create_respone)# 输出 {u'index': u'test_data_index', u'acknowledged': True, u'shards_acknowledged': True}# 查看所有的索引indices = es.get_all_indices()print(indices)# 插入单条数据insert_body = {"create_time": 1720601022255,"status": 201,"dev_ip": "192.168.1.101","dev_uuid": "123e4567e89b12d3a456426614174000","user": {"name": "战三","age": 30,"email": "zhansan@example.com"},"infos": [{"v_id": "123e4567e89b12d3a456426614174000","v_name": "战三","v_desc": "描述啦啦啦啦啦啦"}],"remark": "描述!!!!!","show_num": 6789}insert_resp = es.index(index_name, insert_body)print(insert_resp)# 更新数据/给某记录动态增加字段数据(send_time建索引时不存在,可更新增加)update_body = {"doc": {"send_time": int(time.time())}}update_resp = es.update(index_name, "OWps6pEBay2ae5Uuwei-", update_body)print(update_resp)# 基础查询resp2 = es.search(index_name, {"size": 10, "from": 1})print(resp2)# 分页查询1search_body = {"query": {"bool": {"must": [{"term": {"send_time": 1726226611}}]}}}resp1 = es.search_page(index_name, search_body, 1, 1)print(resp1)# 分页查询2scroll = "1m"scroll_id, response = es.search_scroll(index_name, scroll, {"size": 10})print(scroll_id, response)while response["hits"]["hits"]:scroll_id, response = es.search_next(scroll_id, scroll)print(scroll_id, response)# after分页两种写法,分页查询3-1after_search = {"size": 1, "sort": [{"create_time": {"order": "desc"}}, {"_id": {"order": "asc"}}]}search_after_body, resp3 = es.search_after(index_name, after_search)print(search_after_body, resp3)while resp3 and resp3["hits"]["hits"]:after_search["search_after"] = search_after_body.get("search_after")search_after_body, resp3 = es.search_after(index_name, after_search)# after分页两种写法,分页查询3-2search_after = Nonewhile True:if search_after:after_search["search_after"] = search_aftersearch_after_body, resp3 = es.search_after(index_name, after_search)if not resp3["hits"]["hits"]:breaksearch_after = search_after_body["search_after"]


http://www.ppmy.cn/devtools/137057.html

相关文章

【论文笔记】Number it: Temporal Grounding Videos like Flipping Manga

🍎个人主页:小嗷犬的个人主页 🍊个人网站:小嗷犬的技术小站 🥭个人信条:为天地立心,为生民立命,为往圣继绝学,为万世开太平。 基本信息 标题: Number it: Temporal Grou…

STM32H7开发笔记(2)——H7外设之多路定时器中断

STM32H7开发笔记(2)——H7外设之多路定时器中断 文章目录 STM32H7开发笔记(2)——H7外设之多路定时器中断0.引言1.CubeMX配置2.软件编写 0.引言 本文PC端采用Win11STM32CubeMX4.1.0.0Keil5.24.2的配置,硬件使用STM32H…

java charAt()返回数值型 详解

Java 中 charAt() 返回数值型详解 在 Java 中,charAt() 方法返回的是 char 类型,它代表的是字符,但字符在计算机中也有对应的数值表示(ASCII 或 Unicode 编码)。通过将 char 类型转换或直接参与计算,我们可…

emacs入门命令、android-studio和Android Gradle plugin(AGP)版本对照、zulu网页查找jdk11最新版下载脚本

emacs入门命令 ubuntu 22.04下emacs基本操作 sudo apt install -y emacs 图形化emacs | 文本化emacs --no-window-system, 快捷键Altx(M x)执行命令: shell #打开交互式shell终端 #emacs控制shell更精确 term #打开交互式终端(默认/bin/bash) #emacs几乎不能控制…

智控水利:道品科技农业灌区自动化闸门引领农业灌溉新变革

一、引言 农业灌溉作为农业生产的关键环节,直接影响着农作物的生长、产量与质量。在传统农业灌区中,闸门的操作主要依赖人工,这种方式在当今科技飞速发展的时代背景下,暴露出诸多弊端。道品科技农业灌区自动化闸门的出现&#xff…

GoZero对接GPT接口的设计与实现:问题分析与解决

在本篇文章中,我们将探讨如何在GoZero框架下对接GPT接口,并详细讨论在实现过程中遇到的一些常见问题及其解决方案。特别是遇到的错误信息,如 parse parameter fail,recover: interface conversion: interface {} is nil, not string 和 获取历…

网络基础 - 地址篇

一、IP 地址 IP 协议有两个版本,IPv4 和 IPv6IP 地址(IPv4 地址)是一个 4 字节,32 位的正整数,通常使用 “点分十进制” 的字符串进行表示,例如 192.168.0.1,用点分割的每一个数字表示一个字节,范围是 0 ~…

Altium Designer学习笔记 21.PCB板框的评估及叠层设置

基于Altium Designer 23学习版,四层板智能小车PCB 更多AD学习笔记:Altium Designer学习笔记 1-5 工程创建_元件库创建Altium Designer学习笔记 6-10 异性元件库创建_原理图绘制Altium Designer学习笔记 11-15 原理图的封装 编译 检查 _PCB封装库的创建Al…