由于python elasticsearch v8 engine的源码包中并未开放对于请求添加proxies的支持,导致在某些环境下无法连通外网的es服务。目前网上暂无相关的修改内容,我这边提供下自己修改的动态运行时替换elasticsearch包的源码方法demo
import gzip
import ssl
import time
import requests
from elastic_transport._node._http_requests import RequestsHttpNode
from typing import Any, Optional, Union
from elastic_transport._compat import warn_stacklevel
from elastic_transport._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError
from elastic_transport._models import ApiResponseMeta, HttpHeaders, NodeConfig
from elastic_transport.client_utils import DEFAULT, DefaultType, client_meta_version
from elastic_transport._node._base import (BUILTIN_EXCEPTIONS,RERAISE_EXCEPTIONS,BaseNode,NodeApiResponse,ssl_context_from_node_config,
)def custom_perform_request(self,method: str,target: str,body: Optional[bytes] = None,headers: Optional[HttpHeaders] = None,request_timeout: Union[DefaultType, Optional[float]] = DEFAULT,) -> NodeApiResponse:url = self.base_url + targetheaders = HttpHeaders(headers or ())request_headers = self._headers.copy()if headers:request_headers.update(headers)body_to_send: Optional[bytes]if body:if self._http_compress:body_to_send = gzip.compress(body)request_headers["content-encoding"] = "gzip"else:body_to_send = bodyelse:body_to_send = Nonestart = time.time()proxies_dict = {"http": "http://xx.xx.xx.xx:xx","http": "http://xx.xx.xx.xx:xx",}request = requests.Request(method=method, headers=request_headers, url=url, data=body_to_send)prepared_request = self.session.prepare_request(request)send_kwargs = {"timeout": (request_timeoutif request_timeout is not DEFAULTelse self.config.request_timeout)}send_kwargs.update(self.session.merge_environment_settings( # type: ignore[arg-type]prepared_request.url, {}, None, None, None))send_kwargs.pop('proxies')try:response = self.session.send(prepared_request, proxies=proxies_dict, **send_kwargs) # type: ignore[arg-type]data = response.contentduration = time.time() - startresponse_headers = HttpHeaders(response.headers)except RERAISE_EXCEPTIONS:raiseexcept Exception as e:err: Exceptionif isinstance(e, requests.Timeout):err = ConnectionTimeout("Connection timed out during request", errors=(e,))elif isinstance(e, (ssl.SSLError, requests.exceptions.SSLError)):err = TlsError(str(e), errors=(e,))elif isinstance(e, BUILTIN_EXCEPTIONS):raiseelse:err = ConnectionError(str(e), errors=(e,))self._log_request(method=method,target=target,headers=request_headers,body=body,exception=err,)raise err from Nonemeta = ApiResponseMeta(node=self.config,duration=duration,http_version="1.1",status=response.status_code,headers=response_headers,)self._log_request(method=method,target=target,headers=request_headers,body=body,meta=meta,response=data,)return NodeApiResponse(meta,data,)RequestsHttpNode.perform_request = custom_perform_requestfrom elasticsearch import Elasticsearches = Elasticsearch(hosts=["http://xx.xx.xxx.xxx:9200"], basic_auth=("elastic", "xxxxxxxxxxxx"), node_class=RequestsHttpNode)
query = {"query": {"match_all": {}}}
response = es.search(index="xxxxxx_prod", body= query)
print(response)