文章目录
- 1 错误重现
- 2 解决
- 3 调用方法写入es
- 4 扩展
1 错误重现
在Mac上调用Python的CMRESHandler进行elasticsearch
的日志写入时,遇到如下错误。一开始还以为是自己的语法出现了错误,排查出发现问题出在库中的代码上。在网上找了一圈都没有发现解决方案,遂记录一下,方便以后他人查找。
解决问题后,成功写入es:
2 解决
出现问题的原因,是库中下面的语句获取ip时,出现了错误(报错地方socket.gethostbyname(socket.gethostname())
)。
self.es_additional_fields.update({'host': socket.gethostname(),'host_ip': socket.gethostbyname(socket.gethostname())})
由于我们不能去修改库中的代码,因此我们只能把库中的代码单独提取出来,然后修改出错的地方,再单独引入这个文件。
修改的后handlers.py
文件如下(其他关联的文件也一并的放到了该文件中):
修改的地方为使用
get_local_ip()
函数,来代替报错的socket.gethostbyname(socket.gethostname())
。
⚠️:这里使用的elasticsearch版本为7.9.1,需要8以下的版本,不然会出现RequestsHttpConnection
库找不到的错误(pip3 install "elasticsearch==7.9.1 -i https://pypi.tuna.tsinghua.edu.cn/simple"
)。
#!/usr/bin/env python3# 链接和初始化elasticsearchimport logging
import datetime
import socket
from threading import Timer, Lock
from enum import Enum
from elasticsearch import helpers as eshelpers
from elasticsearch import Elasticsearch, RequestsHttpConnection# from CMRESSerializer import CMRESSerializer
# from getLocal_ip import get_local_ip
# import settingsfrom elasticsearch.serializer import JSONSerializerclass CMRESSerializer(JSONSerializer):def default(self, data):try:return super(CMRESSerializer, self).default(data)except TypeError:return str(data)def get_local_ip():"""获取本地IP:return:"""try:s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)s.connect(('8.8.8.8', 80))ip = s.getsockname()[0]except Exception as e:print(e)return ''else:return ipclass CMRESHandler(logging.Handler):""" Elasticsearch log handler"""class AuthType(Enum):""" Authentication types supportedThe handler supports- No authentication- Basic authentication"""NO_AUTH = 0BASIC_AUTH = 1DEVOPS_AUTH = 2class IndexNameFrequency(Enum):""" Index type supportedthe handler supports- Daily indices- Weekly indices- Monthly indices- Year indices"""DAILY = 0WEEKLY = 1MONTHLY = 2YEARLY = 3# Defaults for the class__DEFAULT_ELASTICSEARCH_HOST = [{'host': '10.97.138.194', 'port': 9200}]__DEFAULT_AUTH_USER = 'elastic'__DEFAULT_AUTH_PASSWD = 'ES@ynzy2020'__DEFAULT_USE_SSL = False__DEFAULT_VERIFY_SSL = True__DEFAULT_AUTH_TYPE = AuthType.NO_AUTH__DEFAULT_INDEX_FREQUENCY = IndexNameFrequency.DAILY__DEFAULT_BUFFER_SIZE = 1000__DEFAULT_FLUSH_FREQ_INSEC = 1__DEFAULT_ADDITIONAL_FIELDS = {}__DEFAULT_ES_INDEX_NAME = 'python_logger'__DEFAULT_ES_DOC_TYPE = '_doc'__DEFAULT_RAISE_ON_EXCEPTION = False__DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp"__DEFAULT_ISO_TIMESTAMP_FIELD_NAME = "iso_timestamp"__LOGGING_FILTER_FIELDS = ['msecs','relativeCreated','levelno','created']@staticmethoddef _get_daily_index_name(es_index_name):""" Returns elasticearch index name:param: index_name the prefix to be used in the index:return: A srting containing the elasticsearch indexname used which should include the date."""# return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m.%d'))return es_index_name@staticmethoddef _get_weekly_index_name(es_index_name):""" Return elasticsearch index name:param: index_name the prefix to be used in the index:return: A srting containing the elasticsearch indexname used which should include the date and specific week"""# current_date = datetime.datetime.now()# start_of_the_week = current_date - datetime.timedelta(days=current_date.weekday())# return "{0!s}-{1!s}".format(es_index_name, start_of_the_week.strftime('%Y.%m.%d'))return es_index_name@staticmethoddef _get_monthly_index_name(es_index_name):""" Return elasticsearch index name:param: index_name the prefix to be used in the index:return: A srting containing the elasticsearch indexname used which should include the date and specific moth"""# return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m'))return es_index_name@staticmethoddef _get_yearly_index_name(es_index_name):""" Return elasticsearch index name:param: index_name the prefix to be used in the index:return: A srting containing the elasticsearch indexname used which should include the date and specific year"""# return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y'))return es_index_name_INDEX_FREQUENCY_FUNCION_DICT = {IndexNameFrequency.DAILY: _get_daily_index_name,IndexNameFrequency.WEEKLY: _get_weekly_index_name,IndexNameFrequency.MONTHLY: _get_monthly_index_name,IndexNameFrequency.YEARLY: _get_yearly_index_name}def __init__(self,hosts=__DEFAULT_ELASTICSEARCH_HOST,auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD),auth_type=__DEFAULT_AUTH_TYPE,use_ssl=__DEFAULT_USE_SSL,verify_ssl=__DEFAULT_VERIFY_SSL,buffer_size=__DEFAULT_BUFFER_SIZE,flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC,es_index_name=__DEFAULT_ES_INDEX_NAME,index_name_frequency=__DEFAULT_INDEX_FREQUENCY,es_doc_type=__DEFAULT_ES_DOC_TYPE,es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS,raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION,default_iso_timestamp_field_name=__DEFAULT_ISO_TIMESTAMP_FIELD_NAME,default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME):""" Handler constructor:param hosts: The list of hosts that elasticsearch clients will connect. The list can be providedin the format ```[{'host':'host1','port':9200}, {'host':'host2','port':9200}]```tomake sure the client supports failover of one of the instertion nodes:param auth_details: When ```CMRESHandler.AuthType.BASIC_AUTH```is used this argument must containa tuple of string with the user and password that will be used to authenticate againstthe Elasticsearch servers, for example```('User','Password'):param auth_type: The authentication type to be used in the connection ```CMRESHandler.AuthType```Currently, NO_AUTH, BASIC_AUTH, DEVOPS_AUTH are supported:param use_ssl: A boolean that defines if the communications should use SSL encrypted communication:param verify_ssl: A boolean that defines if the SSL certificates are validated or not:param buffer_size: An int, Once this size is reached on the internal buffer results are flushed into ES:param flush_frequency_in_sec: A float representing how often and when the buffer will be flushed, evenif the buffer_size has not been reached yet:param es_index_name: A string with the prefix of the elasticsearch index that will be created. Note adate with YYYY.MM.dd, ```python_logger```used by default:param index_name_frequency: Defines what the date used in the postfix of the name would be. available valuesare selected from the IndexNameFrequency class (IndexNameFrequency.DAILY,IndexNameFrequency.WEEKLY, IndexNameFrequency.MONTHLY, IndexNameFrequency.YEARLY). By defaultit uses daily indices.:param es_doc_type: A string with the name of the document type that will be used ```python_log```usedby default:param es_additional_fields: A dictionary with all the additional fields that you would like to addto the logs, such the application, environment, etc.:param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptionscaused when:return: A ready to be used CMRESHandler."""logging.Handler.__init__(self)self.hosts = hostsself.auth_details = auth_detailsself.auth_type = auth_typeself.use_ssl = use_sslself.verify_certs = verify_sslself.buffer_size = buffer_sizeself.flush_frequency_in_sec = flush_frequency_in_secself.es_index_name = es_index_nameself.index_name_frequency = index_name_frequencyself.es_doc_type = es_doc_typeself.es_additional_fields = es_additional_fields.copy()# 原始的报错:socket.gaierror: [Errno 8] nodename nor servname provided, or not known# self.es_additional_fields.update({'host': socket.gethostname(),# 'host_ip': socket.gethostbyname(socket.gethostname())})# 替换为下面的内容self.es_additional_fields.update({'host': socket.gethostname(),'host_ip': get_local_ip()})self.raise_on_indexing_exceptions = raise_on_indexing_exceptionsself.default_iso_timestamp_field_name = default_iso_timestamp_field_nameself.default_timestamp_field_name = default_timestamp_field_nameself._client = Noneself._buffer = []self._buffer_lock = Lock()self._timer = Noneself._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[self.index_name_frequency]self.serializer = CMRESSerializer()def __schedule_flush(self):if self._timer is None:self._timer = Timer(self.flush_frequency_in_sec, self.flush)self._timer.setDaemon(True)self._timer.start()def __get_es_client(self):if self.auth_type == CMRESHandler.AuthType.NO_AUTH:if self._client is None:self._client = Elasticsearch(hosts=self.hosts,use_ssl=self.use_ssl,verify_certs=self.verify_certs,connection_class=RequestsHttpConnection,serializer=self.serializer)return self._clientif self.auth_type == CMRESHandler.AuthType.BASIC_AUTH:if self._client is None:return Elasticsearch(hosts=self.hosts,http_auth=self.auth_details,use_ssl=self.use_ssl,verify_certs=self.verify_certs,connection_class=RequestsHttpConnection,serializer=self.serializer)return self._clientraise ValueError("Authentication method not supported")def test_es_source(self):""" Returns True if the handler can ping the Elasticsearch servers:return: A boolean, True if the connection against elasticserach host was successful"""return self.__get_es_client().ping()@staticmethoddef __get_es_datetime_str(timestamp):""" Returns elasticsearch utc formatted time for an epoch timestamp:param timestamp: epoch, including milliseconds:return: A string valid for elasticsearch time record"""current_date = datetime.datetime.utcfromtimestamp(timestamp)return "{0!s}.{1}".format(datetime.datetime.strftime(current_date + datetime.timedelta(hours=8), '%Y-%m-%dT%H:%M:%S'),int(current_date.microsecond))def flush(self):""" Flushes the buffer into ES:return: None"""if self._timer is not None and self._timer.is_alive():self._timer.cancel()self._timer = Noneif self._buffer:try:with self._buffer_lock:logs_buffer = self._bufferself._buffer = []actions = ({'_index': self._index_name_func.__func__(self.es_index_name),'_type': self.es_doc_type,'_source': log_record}for log_record in logs_buffer)eshelpers.bulk(client=self.__get_es_client(),actions=actions,stats_only=True)except Exception as exception:if self.raise_on_indexing_exceptions:raise exceptiondef close(self):""" Flushes the buffer and release any outstanding resource:return: None"""if self._timer is not None:self.flush()self._timer = Nonedef emit(self, record):""" Emit overrides the abstract logging.Handler logRecord emit methodFormat and records the log:param record: A class of type ```logging.LogRecord```:return: None"""self.format(record)rec = self.es_additional_fields.copy()for key, value in record.__dict__.items():if key not in CMRESHandler.__LOGGING_FILTER_FIELDS:rec[key] = "" if value is None else valuerec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created)with self._buffer_lock:self._buffer.append(rec)if len(self._buffer) >= self.buffer_size:self.flush()else:self.__schedule_flush()
由于下面的库作用不大,就没有放在上面的文件中:
try:from requests_kerberos import HTTPKerberosAuth, DISABLEDCMR_KERBEROS_SUPPORTED = True
except ImportError:CMR_KERBEROS_SUPPORTED = Falsetry:from requests_aws4auth import AWS4AuthAWS4AUTH_SUPPORTED = True
except ImportError:AWS4AUTH_SUPPORTED = False
3 调用方法写入es
调用的方法如下:
import logging
from handlers import CMRESHandlerLOG_LEVEL = 'DEBUG' # 日志级别
LOG_FORMAT = '%(levelname)s - %(asctime)s - process: %(process)d - %(filename)s - %(name)s - %(lineno)d - %(module)s - %(message)s' # 每条日志输出格式
ELASTIC_SEARCH_HOST = 'localhost' # Elasticsearch Host
ELASTIC_SEARCH_PORT = 9200 # Elasticsearch Port
ELASTIC_SEARCH_INDEX = 'test_log3' # Elasticsearch Index Name # system_log_24172335668239631
APP_ENVIRONMENT = 'dev' # 运行环境,如测试环境还是生产环境ELASTICSEARCH_USER = 'admin'
ELASTICSEARCH_PASSWORD = 'admin'es_handler = CMRESHandler(hosts=[{'host': ELASTIC_SEARCH_HOST, 'port': ELASTIC_SEARCH_PORT}],# 用户名和密码auth_details=(ELASTICSEARCH_USER, ELASTICSEARCH_PASSWORD),# 可以配置对应的认证权限auth_type=CMRESHandler.AuthType.BASIC_AUTH,# 索引值es_index_name=ELASTIC_SEARCH_INDEX,# 额外增加环境标识es_additional_fields={'environment': APP_ENVIRONMENT})# 被注释的格式并未起任何作用
# es_handler.setLevel(level=LOG_LEVEL)
# formatter = logging.Formatter(LOG_FORMAT)
# es_handler.setFormatter(formatter)
logger = logging.getLogger('test')
logger.setLevel(LOG_LEVEL)
logger.addHandler(es_handler)
logger.debug('test write es2')if __name__ == '__main__':pass
4 扩展
使用logging或loguru写入日志到es(三种方法)。