python操作Elasticsearch

embedded/2024/11/30 0:01:31/

使用elasticsearch 6.x版本,操作es数据。

python">#! -*- coding:utf-8 -*
import timefrom elasticsearch import Elasticsearch, helpersclass EstUtil:_instance = Nonedef __new__(cls, *args, **kwargs):if not cls._instance:cls._instance = super(EstUtil, cls).__new__(cls, *args, **kwargs)return cls._instancedef __init__(self):# todo hosts不传有默认的,可传hosts=[]self.es = Elasticsearch(timeout=300, max_retries=3)def is_exists_index(self, index_name):"""是否存在索引:param index_name: 索引名称:return:"""print(index_name)return self.es.indices.exists(index=index_name)def get_all_indices(self, index_=None):"""获取所有索引:param index_name: 根据名称获取所有的索引:return:"""if not index_:# 只返回所有索引名all_indices = self.es.indices.get_alias().keys()else:# 返回索引的信息,状态等all_indices = self.es.cat.indices(index=index_, format="json")return all_indicesdef index(self, index_name, body):"""插入单条数据:param index_name: 索引名称:param body: 请求体数据dict:return: boolean{"name": "aaa","age": 18}"""# 如果pip安装的elasticsearch版本低于7.x,加入 doc_type='_doc'response = self.es.index(index=index_name, body=body, doc_type='_doc')# 检查响应if response['result'] == 'created':return Trueelse:return Falsedef batch_insert(self, index_name, body_list):"""批量插入数据:param index_name: 索引名称:param body_list: 请求体数据list:return:"""data_list = list()for data in body_list:action = {"_index": index_name,"_type": '_doc','_source': data}data_list.append(action)return helpers.bulk(self.es, data_list)def update(self, index_name, doc_id, update_body):""":param index_name: 索引名:param doc_id: 记录id:param update_body: 更新的dict:return: boolean"""# 如果pip安装的elasticsearch版本低于7.x,加入 doc_type='_doc'response = self.es.update(index=index_name, id=doc_id, body=update_body, doc_type="_doc")# 检查编辑是否成功if response['result'] == 'updated':return Trueelse:return Falsedef batch_update(self, index_name, body_list):"""批量编辑数据:param index_name: 索引名称:param body_list: 请求体数据list:return:"""data_list = list()for data in body_list:_id = data.pop("id")action = {"_op_type": "update","_index": index_name,'doc': data,"_id": _id}data_list.append(action)return helpers.bulk(self.es, data_list)def delete(self, index_name, doc_id):"""删除单条数据:param index_name: 索引名称:param doc_id: 记录id:return:  boolean"""response = self.es.delete(index=index_name, id=doc_id)# 检查删除是否成功if response['result'] == 'deleted':return Trueelse:return Falsedef batch_delete(self, index_name, body_list):"""批量删除数据:param index_name: 索引名称:param body_list: 请求体数据list:return:"""data_list = list()for data in body_list:_id = data.get("id")action = {"_op_type": "delete","_index": index_name,"_id": _id}data_list.append(action)return helpers.bulk(self.es, data_list)def delete_index(self, index_name):"""删除单个索引:param index_name: 索引名称:return:"""return self.es.indices.delete(index=index_name)['acknowledged']def create_index(self, index_name, field_type_dict, number_of_shards, number_of_replicas):"""传入简单的键值对:param index_name 索引名称:param field_type_dict 字段名称,类型字典:param number_of_shards 分片数量:param number_of_replicas 副本数量:return: 创建成功"""if self.is_exists_index(index_name):raise ValueError('索引已存在:%s' % index_name)body = {}settings = {'number_of_shards': number_of_shards,'number_of_replicas': number_of_replicas}mappings = {}index_type = {}properties = {}for key, value in field_type_dict.items():properties[key] = {'type': value}index_type['properties'] = propertiesmappings['_doc'] = index_typebody['settings'] = settingsbody['mappings'] = mappingsresponse = self.es.indices.create(index=index_name, body=body)return responsedef create_index_by_body(self, index_name, body):"""自定义参数创建索引:param index_name 索引名称:param body 组装好的创建dict:return:""""""示例参数:{"settings": {"number_of_shards": 1,"number_of_replicas": 0},"aliases": {"test_log": {}},"mappings": {"properties": {"create_time": {"type": "date"},"status": {"type": "integer"},"dev_ip": {"type": "ip"},"dev_uuid": {"type": "keyword"},"user": {"properties": {"name": {"type": "text"},"age": {"type": "integer"},"email": {"type": "keyword"}}},"infos": {"type": "nested","dynamic": False,"properties": {"v_id": {"type": "keyword"},"v_name": {"type": "keyword"},"v_desc": {"type": "text"}}},"remark": {"type": "text"},"show_num": {"type": "long", "doc_values": False, "index": False}}}}"""if self.is_exists_index(index_name):raise ValueError('索引已存在:%s' % index_name)response = self.es.indices.create(index=index_name, body=body)return responsedef search(self, index_name, request_body):"""查询数据:param index_name: 索引名称:param request_body: 查询dsl:return:"""return self.es.search(index=index_name, body=request_body, timeout="5m")def search_page(self, index_name, request_body, page, size):"""分页查询数据(数据量超过10000条时,直接使用from和size参数可能会导致性能问题):param index_name: 索引名称:param request_body: 查询dsl:param page: 页码:param size: 每页条数:return:"""from_value = (page - 1) * size  # 偏移量return self.es.search(index=index_name, body=request_body, from_=from_value, size=size)def search_after(self, index_name, search_after_body):"""分页查询(基于上一次查询的最后一个文档的排序值来进行下一次查询,查询中必须包含 sort 字段,并且这个字段的值需要唯一):param index_name: 索引名称:param search_after_body 查询dsl:return:"""# search_after_body参数,search_after字段,传入最后一次的排序值,第一次不用传# search_after_body参数中,需要包含size,sort# 排序值必须有唯一性,如果具有相同的排序值,search_after无法正确地定位到下一页的开始位置,导致数据重复或遗漏,如时间排序可能重复# 采用多个字段排序,或使用类似_id的唯一值# eg:{"size": 1, "sort": [{"create_time": {"order": "desc"}}, {"_id": {"order": "asc"}}]}response = self.es.search(index=index_name, body=search_after_body)hits = response.get('hits').get('hits')search_after_body = {'search_after': hits[-1].get('sort') if hits else []}return search_after_body, responsedef search_scroll(self, index_name, scroll, body):"""分页查询,第一次执行(scroll 是一种基于游标的分页方式,每次搜索创建一个快照):param index_name: 索引名称:param scroll: scroll保持时间,1m为1分钟:param body: 查询条件体dict:return:"""# 初始化scroll查询response = self.es.search(index=index_name, scroll=scroll, body=body)scroll_id = response['_scroll_id']return scroll_id, responsedef search_next(self, scroll_id, scroll):"""与上方的search_scroll结合使用:param scroll_id: 上一次的快照id:param scroll: scroll快照保持时间,1m为1分钟,如果查询数据超过1分钟,会报错:return:"""response = self.es.scroll(scroll_id=scroll_id, scroll=scroll)scroll_id = response['_scroll_id']return scroll_id, responseif __name__ == '__main__':es = EstUtil()create_index_body = {"settings": {"number_of_shards": 1,"number_of_replicas": 0},"aliases": {"test_log": {}},"mappings": {"properties": {"create_time": {"type": "date"},"status": {"type": "integer"},"dev_ip": {"type": "ip"},"dev_uuid": {"type": "keyword"},"user": {"properties": {"name": {"type": "text"},"age": {"type": "integer"},"email": {"type": "keyword"}}},"infos": {"type": "nested","dynamic": False,"properties": {"v_id": {"type": "keyword"},"v_name": {"type": "keyword"},"v_desc": {"type": "text"}}},"remark": {"type": "text"},"show_num": {"type": "long", "doc_values": False, "index": False}}}}index_name = "test_data_index"# 创建索引create_respone = es.create_index_by_body(index_name, create_index_body)print(create_respone)# 输出 {u'index': u'test_data_index', u'acknowledged': True, u'shards_acknowledged': True}# 查看所有的索引indices = es.get_all_indices()print(indices)# 插入单条数据insert_body = {"create_time": 1720601022255,"status": 201,"dev_ip": "192.168.1.101","dev_uuid": "123e4567e89b12d3a456426614174000","user": {"name": "战三","age": 30,"email": "zhansan@example.com"},"infos": [{"v_id": "123e4567e89b12d3a456426614174000","v_name": "战三","v_desc": "描述啦啦啦啦啦啦"}],"remark": "描述!!!!!","show_num": 6789}insert_resp = es.index(index_name, insert_body)print(insert_resp)# 更新数据/给某记录动态增加字段数据(send_time建索引时不存在,可更新增加)update_body = {"doc": {"send_time": int(time.time())}}update_resp = es.update(index_name, "OWps6pEBay2ae5Uuwei-", update_body)print(update_resp)# 基础查询resp2 = es.search(index_name, {"size": 10, "from": 1})print(resp2)# 分页查询1search_body = {"query": {"bool": {"must": [{"term": {"send_time": 1726226611}}]}}}resp1 = es.search_page(index_name, search_body, 1, 1)print(resp1)# 分页查询2scroll = "1m"scroll_id, response = es.search_scroll(index_name, scroll, {"size": 10})print(scroll_id, response)while response["hits"]["hits"]:scroll_id, response = es.search_next(scroll_id, scroll)print(scroll_id, response)# after分页两种写法,分页查询3-1after_search = {"size": 1, "sort": [{"create_time": {"order": "desc"}}, {"_id": {"order": "asc"}}]}search_after_body, resp3 = es.search_after(index_name, after_search)print(search_after_body, resp3)while resp3 and resp3["hits"]["hits"]:after_search["search_after"] = search_after_body.get("search_after")search_after_body, resp3 = es.search_after(index_name, after_search)# after分页两种写法,分页查询3-2search_after = Nonewhile True:if search_after:after_search["search_after"] = search_aftersearch_after_body, resp3 = es.search_after(index_name, after_search)if not resp3["hits"]["hits"]:breaksearch_after = search_after_body["search_after"]


http://www.ppmy.cn/embedded/141603.html

相关文章

英语知识在线平台:Spring Boot技术应用

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统,它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等,非常…

部署kafka-exporter

docker容器部署 docker run -d --namegrafana -v /etc/localtime:/etc/localtime:ro --restartalways -p 23000:3000 grafana/grafanadocker run -d --nameprometheus -p 29090:9090 --restartalways -v /etc/localtime:/etc/localtime:ro -v /home/prometheus/prometheus.ym…

工业物联网网关在设备接入物联网中的核心作用

一、工业物联网网关的定义与功能 工业物联网网关是工业领域中的一种重要设备,它位于工业物联网系统的边缘位置,负责连接、管理和协调工业设备与云平台之间的通信。作为边缘计算的关键组件,工业物联网网关能够实现工业设备、传感器、PLC、DCS…

五天SpringCloud计划——DAY1之mybatis-plus的使用

一、引言 咱也不知道为啥SpringCloud课程会先教mybatis-plus的使用,但是教都教了,就学了吧,学完之后觉得mybatis-plus中的一些方法还是很好用了,本文作为我学习mybatis-plus的总结提升,希望大家看完之后也可以熟悉myba…

Flink--API 之Transformation-转换算子的使用解析

目录 一、常用转换算子详解 (一)map 算子 (二)flatMap 算子 (三)filter 算子 (四)keyBy 算子 元组类型 POJO (五)reduce 算子 二、合并与连接操作 …

ES 和Kibana-v2 带用户登录验证

1. 前言 ElasticSearch、可视化操作工具Kibana。如果你是Linux centos系统的话,下面的指令可以一路CV完成服务的部署。 2. 服务搭建 2.1. 部署ElasticSearch 拉取docker镜像 docker pull elasticsearch:7.17.21 创建挂载卷目录 mkdir /**/es-data -p mkdir /**/…

idea中git的将A分支某次提交记录合并到B分支

一 实操案例 1.1 背景描述 在开发过程中,有时候需要将A分支某次提交记录功能合并到B分支上。主要原理用到git的cherry pick功能。 1.2 案例 实现的功能: master分支的11.24提交记录合并到feature_A分支; 1.master分支提交的记录 2.fea…

一款适用于教育行业的免费word插件

适用于Office和WPS的Word 功能如下: 1、批量生成导航编号,方便拖动排序题目,并可以一键修正自动题目编号 2、批量添加和删除题目来源,删除时如果选中一个区域,只删除选中区域内的题目来源,没有选中则删除…