python操作elasticsearch,在用helper.bulk时,数据插入到四千多条的时候会报超时,请问如何解决?

何法 发布于 2018/09/09 15:41
阅读 920
收藏 0

从接口里面读到数据,处理之后将数据批量插入es,但是会报超时,我已经设置超时时间为120秒了,环境的linux

批量插入代码如下:

elasticsearch.helpers.bulk(es, actions, request_timeout=120)

具体的代码如下:

if __name__ == "__main__":
    #基础参数
    es = Elasticsearch([{"host": "*****", "port": 9200}])  # 测试地址,应用地址为******:9200
    key_value = "*****"
    # starttime_value, endtime_value = get_data_time_section()
    starttime_value = get_max_datetime()
    endtime_value = get_now_datetime()
    timestamp_value = endtime_value
    print(starttime_value)

    str_value = get_str(starttime=starttime_value, endtime=endtime_value, timestamp=timestamp_value, pageno="1",
                        pagesize="10")
    sign_value = get_sign(key=key_value, str1=str_value)

    #获取数据总条数
    url_value = "http://*****:**/***/**/**/**/**"  # 测试地址,数据实际位置为*****
    data_dict_01 = {"starttime": starttime_value,
                    "endtime": endtime_value,
                    "timestamp": timestamp_value,
                    "sign": sign_value,
                    "pageno": 1,
                    "pagesize": 10}
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    total_num_01 = get_data(url=url_value, data=data_dict_01, headers=headers)[0]
    print("初始总数据条数%d。" % total_num_01)

    # 循环页码读取导入数据
    PAR_PAGE_SIZE_02 = 1000
    #es = Elasticsearch([{"host": "*****", "port": 9200}])  #测试地址,应用地址为******:9200
    sno_lists = get_sno()
    redundance_data_lists = get_basicinfo_seven()

    max_page = math.ceil(total_num_01 / PAR_PAGE_SIZE_02)
    for page in range(1, max_page + 1):
        s_time = datetime.now()
        str_value = get_str(starttime=starttime_value, endtime=endtime_value, timestamp=timestamp_value,
                            pageno=str(page),
                            pagesize=str(PAR_PAGE_SIZE_02))
        sign_value = get_sign(key=key_value, str1=str_value)
        data_dict_02 = {"starttime": starttime_value,
                        "endtime": endtime_value,
                        "timestamp": timestamp_value,
                        "sign": sign_value,
                        "pageno": page,
                        "pagesize": PAR_PAGE_SIZE_02}
        data_list = get_data(url=url_value, data=data_dict_02, headers=headers)[1]
        total_num_02 = get_data(url=url_value, data=data_dict_01, headers=headers)[0]
        print("第%d次循环时的数据总数:%d。" % (page, total_num_02))
        print("当前网页:%d,当前页面数据长度:%d。" % (page, data_list.__len__()))

        data_lists = filter_data(data=data_list, sno_list=sno_lists)
        data_lists_hanld = hanld_data(data_lists=data_lists)
        create_index(es=es, index_name="consumption")
        data_insert(data_list=data_lists_hanld, es=es, seven_lists=redundance_data_lists)
        e_time = datetime.now()
        print("当前页面花费时间为%d" % (e_time-s_time).seconds)

报错描述如下:

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/urllib3/connectionpool.py", line 384, in _make_request
    six.raise_from(e, None)
  File "<string>", line 2, in raise_from
  File "/usr/local/lib/python3.5/dist-packages/urllib3/connectionpool.py", line 380, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib/python3.5/http/client.py", line 1198, in getresponse
    response.begin()
  File "/usr/lib/python3.5/http/client.py", line 297, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python3.5/http/client.py", line 258, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/lib/python3.5/socket.py", line 576, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/elasticsearch/connection/http_urllib3.py", line 172, in perform_request
    response = self.pool.urlopen(method, url, body, retries=Retry(False), headers=request_headers, **kw)
  File "/usr/local/lib/python3.5/dist-packages/urllib3/connectionpool.py", line 638, in urlopen
    _stacktrace=sys.exc_info()[2])
  File "/usr/local/lib/python3.5/dist-packages/urllib3/util/retry.py", line 343, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.5/dist-packages/urllib3/packages/six.py", line 686, in reraise
    raise value
  File "/usr/local/lib/python3.5/dist-packages/urllib3/connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "/usr/local/lib/python3.5/dist-packages/urllib3/connectionpool.py", line 386, in _make_request
    self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
  File "/usr/local/lib/python3.5/dist-packages/urllib3/connectionpool.py", line 306, in _raise_timeout
    raise ReadTimeoutError(self, url, "Read timed out. (read timeout=%s)" % timeout_value)
urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='****', port=9200): Read timed out. (read timeout=120)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/yeeyun/uestc_data_trans/cosumption_to_es_update_20180907.py", line 391, in <module>
    data_insert(data_list=data_lists_hanld, es=es, seven_lists=redundance_data_lists)
  File "/home/yeeyun/uestc_data_trans/cosumption_to_es_update_20180907.py", line 307, in data_insert
    elasticsearch.helpers.bulk(es, actions, request_timeout=120)
  File "/usr/local/lib/python3.5/dist-packages/elasticsearch/helpers/__init__.py", line 257, in bulk
    for ok, item in streaming_bulk(client, actions, *args, **kwargs):
  File "/usr/local/lib/python3.5/dist-packages/elasticsearch/helpers/__init__.py", line 192, in streaming_bulk
    raise_on_error, *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/elasticsearch/helpers/__init__.py", line 99, in _process_bulk_chunk
    raise e
  File "/usr/local/lib/python3.5/dist-packages/elasticsearch/helpers/__init__.py", line 95, in _process_bulk_chunk
    resp = client.bulk('\n'.join(bulk_actions) + '\n', *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/elasticsearch/client/utils.py", line 76, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/elasticsearch/client/__init__.py", line 1155, in bulk
    headers={'content-type': 'application/x-ndjson'})
  File "/usr/local/lib/python3.5/dist-packages/elasticsearch/transport.py", line 318, in perform_request
    status, headers_response, data = connection.perform_request(method, url, params, body, headers=headers, ignore=ignore, timeout=timeout)
  File "/usr/local/lib/python3.5/dist-packages/elasticsearch/connection/http_urllib3.py", line 180, in perform_request
    raise ConnectionTimeout('TIMEOUT', str(e), e)
elasticsearch.exceptions.ConnectionTimeout: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host='****', port=9200): Read timed out. (read timeout=120))

加载中
返回顶部
顶部