Python 批量寫(xiě)入 Elasticsearch 腳本
來(lái)源:易賢網(wǎng) 閱讀:4756 次 日期:2015-05-04 14:38:07
溫馨提示:易賢網(wǎng)小編為您整理了“Python 批量寫(xiě)入 Elasticsearch 腳本”,方便廣大網(wǎng)友查閱!

Elasticsearch 官方和社區(qū)提供了各種各樣的客戶(hù)端庫(kù),在之前的博客中,我陸陸續(xù)續(xù)提到和演示過(guò) Perl 的,Javascript 的,Ruby 的。上周寫(xiě)了一版 Python 的,考慮到好像很難找到現(xiàn)成的示例,如何用 python 批量寫(xiě)數(shù)據(jù)進(jìn) Elasticsearch,今天一并貼上來(lái)。

#!/usr/bin/env pypy

#coding:utf-8

import re

import sys

import time

import datetime

import logging

from elasticsearch import Elasticsearch

from elasticsearch import helpers

from elasticsearch import ConnectionTimeout

es = Elasticsearch(['192.168.0.2', '192.168.0.3'], sniff_on_start=True, sniff_on_connection_fail=True, max_retries=3, retry_on_timeout=True)

logging.basicConfig()

logging.getLogger('elasticsearch').setLevel(logging.WARN)

logging.getLogger('urllib3').setLevel(logging.WARN)

def parse_www(logline):

try:

time_local, request, http_user_agent, staTus, remote_addr, http_referer, request_time, body_bytes_sent, http_x_forwarded_proto, http_x_forwarded_for, http_host, http_cookie, upstream_response_time = logline.split('`')

try:

upstream_response_time = float(upstream_response_time)

except:

upstream_response_time = None

method, uri, verb = request.split(' ')

arg = {}

try:

url_path, url_args = uri.split('?')

for args in url_args.split('&'):

k, v = args.split('=')

arg[k] = v

except:

url_path = uri

# Why %z do not implement?

date = datetime.datetime.strptime(time_local, '[%d/%b/%Y:%H:%M:%S +0800]')

ret = {

"@timestamp": date.strftime('%FT%T+0800'),

"host": "127.0.0.1",

"method": method.lstrip('"'),

"url_path": url_path,

"url_args": arg,

"verb": verb.rstrip('"'),

"http_user_agent": http_user_agent,

"status": int(staTus),

"remote_addr": remote_addr.strip('[]'),

"http_referer": http_referer,

"request_time": float(request_time),

"body_bytes_sent": int(body_bytes_sent),

"http_x_forwarded_proto": http_x_forwarded_proto,

"http_x_forwarded_for": http_x_forwarded_for,

"http_host": http_host,

"http_cookie": http_cookie,

"upstream_response_time": upstream_response_time

}

return {"_index":"logstash-mweibo-www-"+date.strftime('%Y.%m.%d'), "_type":"nginx","_source":ret}

except:

return {"_index":"logstash-mweibo-www-"+datetime.datetime.now().strftime('%Y.%m.%d'), "_type":"nginx","_source":{"message":logline}}

def get_log():

start_time = time.time()

log_buffer = []

while True:

try:

line = sys.stdin.readline()

except:

break

if not line:

helpers.bulk(es, log_buffer)

del log_buffer[0:len(log_buffer)]

break

if line:

ret = parse_www(line.rstrip())

log_buffer.append(ret)

while ( len(log_buffer) > 2000 and len(log_buffer) % 2000 == 0 ):

try:

helpers.bulk(es, log_buffer)

except ConnectionTimeout:

print("try again")

continue

del log_buffer[0:len(log_buffer)]

break

else:

if (time.time() - startime > timeout ):

helpers.bulk(es, log_buffer)

start_time = time.time()

del log_buffer[0:len(log_buffer)]

time.sleep(1)

if __name__ == '__main__':

get_log()

和 Perl、Ruby 的客戶(hù)端不同,Python 的客戶(hù)端只支持兩種 transport 方式,urllib3 或者 thrift。也就是說(shuō),木有像事件驅(qū)動(dòng)啊之類(lèi)的辦法。

測(cè)試一下,這個(gè)腳本如果不發(fā)送數(shù)據(jù),一秒處理日志條數(shù)在15k,發(fā)送數(shù)據(jù),一秒只有2k。確實(shí)比較讓人失望,于是決定換成 pypy 試試——我司不少日志處理腳本都是用 pypy 運(yùn)行的。

服務(wù)器上使用 pypy ,是通過(guò) EPEL 安裝的,之前都只用核心模塊,這次需要安裝 elasticsearch 模塊。所以需要先給 pypy 加上 pip:

wget

pypy get-pip.py

網(wǎng)上大多說(shuō)之前還要下載一個(gè)叫 distribute_setup.py 的腳本來(lái)運(yùn)行,實(shí)測(cè)不需要,而且這個(gè)腳本的下載鏈接也失效了。

然后通過(guò) pip 安裝 elasticsearch 包即可:

/usr/lib64/pypy-2.0.2/bin/pip install elasticsearch

測(cè)試,pypy 比 python 處理日志速度快一倍,寫(xiě) ES 速度快一半。不過(guò) 3300eps 依然很慢就是了。

測(cè)試中碰到的其他問(wèn)題

可以看到腳本里已經(jīng)設(shè)置了多次重試和超時(shí)重連,不過(guò)依然會(huì)收到寫(xiě)入超時(shí)和失敗的返回,原來(lái) Elasticsearch 默認(rèn)對(duì)每個(gè) node 做 segment merge 的時(shí)候,有磁盤(pán)保護(hù)措施,速度上限限制在 20MB/s。這在壓測(cè)的時(shí)候就容易觸發(fā)。

[2015-01-10 09:41:51,273][INFO ][index.engine.internal ] [node1][logstash-2015.01.10][2] now throttling indexing: numMergesInFlight=6,maxNumMerges=5

修改配置重啟即可:

indices.store.throttle.type:merge

indices.store.throttle.max_bytes_per_sec:500mb

關(guān)于這個(gè)問(wèn)題,ES 也有討論:Should we lower the default merge IO throttle rate??;蛟S未來(lái)會(huì)有更靈活的策略。

更多 ES 性能測(cè)試和優(yōu)化建議,參

更多信息請(qǐng)查看IT技術(shù)專(zhuān)欄

更多信息請(qǐng)查看技術(shù)文章
易賢網(wǎng)手機(jī)網(wǎng)站地址:Python 批量寫(xiě)入 Elasticsearch 腳本
由于各方面情況的不斷調(diào)整與變化,易賢網(wǎng)提供的所有考試信息和咨詢(xún)回復(fù)僅供參考,敬請(qǐng)考生以權(quán)威部門(mén)公布的正式信息和咨詢(xún)?yōu)闇?zhǔn)!

2025國(guó)考·省考課程試聽(tīng)報(bào)名

  • 報(bào)班類(lèi)型
  • 姓名
  • 手機(jī)號(hào)
  • 驗(yàn)證碼
關(guān)于我們 | 聯(lián)系我們 | 人才招聘 | 網(wǎng)站聲明 | 網(wǎng)站幫助 | 非正式的簡(jiǎn)要咨詢(xún) | 簡(jiǎn)要咨詢(xún)須知 | 加入群交流 | 手機(jī)站點(diǎn) | 投訴建議
工業(yè)和信息化部備案號(hào):滇ICP備2023014141號(hào)-1 云南省教育廳備案號(hào):云教ICP備0901021 滇公網(wǎng)安備53010202001879號(hào) 人力資源服務(wù)許可證:(云)人服證字(2023)第0102001523號(hào)
云南網(wǎng)警備案專(zhuān)用圖標(biāo)
聯(lián)系電話:0871-65099533/13759567129 獲取招聘考試信息及咨詢(xún)關(guān)注公眾號(hào):hfpxwx
咨詢(xún)QQ:526150442(9:00—18:00)版權(quán)所有:易賢網(wǎng)
云南網(wǎng)警報(bào)警專(zhuān)用圖標(biāo)