We have introduced many tools and tricks in my previous post Struggling in importing wikipedia into Elasticsearch. In that post, we successfully import Wikipedia into elasticsearch with logstash. However, things are not settled. The biggest problem of logstash is that it’s extremely user-unfriendly. Even filter an html tag will waste you half days searching google how to do modify the config file. It made very annoy sine I totally forget all the tricks of logstash. Therefore, we will explore how to use python to import the Wikipedia into elasticsearch directly.
The first step is to convert the Wikipedia source file into a more formatted one. I choose to use gensim’s Wikipedia tool to do this. It can be run like this:
It will convert the latest Wikipedia dump to a .json.gz file, in which each contains a JSON dict representation a page. The detail can be obtained from the official sites of gensim.
Afterwards, we get a well-formated file enwiki-20190320-pages-articles-multistream.json.gz . We will then import this line by line into elasticsearch.
Before we begin to write our code. We should firstly set some parameters for elasticsearch to make it more capable to hold large data and query. There are two modifications:
in config/elasticsearch.yml, add:
1 2 3 4
thread_pool: bulk: size: 32 queue_size: 50000
This will make the thread pool much larger than the default one. It makes elasticsearch capable to handle multiple queries.
config/jvm.options:
change -Xmx1g to -Xmx32g to make it have a larger jvm heap size. If the jvm heap size is not large enough, the elasticsearch will throw errors.
Finally, we can read from the new Wikipedia data file and import into elasticsearch, the script is as follows:
from multiprocessing import Process,Queue, Pool import bz2, sys import xmltodict from tqdm.auto import tqdm from elasticsearch import Elasticsearch from elasticsearch import helpers from multiprocessing import Pool import io, os import json import time
for i in range(consumer_n): q.put(None) while q.qsize() > 0: time.sleep(1) pbar.update(max(cnt - q.qsize() - pbar.n, 0)) pbar.set_description("Queue Remain: %s" % q.qsize()) print("End procducer")
def consumer(q): es=Elasticsearch([{'host':'localhost','port':9200}], timeout=50, max_retries=10, retry_on_timeout=True) pages = [] while True: res=q.get() if res == None or len(pages) >= buck_size: import_es(es, pages) pages = [] if res is None: break pages.append(res)
print("End consumer")
if __name__ == '__main__': q=Queue() p=Process(target=procducer,args=(q, wiki, pbar)) p.start()
cs = [] for _ in range(consumer_n): c = Process(target=consumer,args=(q,)) c.start() cs.append(c)
p.join()
for c in cs: c.join() pbar.close()
print('Finish Processing.')
In this script, we use a producer-consumer model to read and import the data into elasticsearch.