mirror of
https://github.com/djohnlewis/stackdump
synced 2024-12-04 23:17:37 +00:00
Improved import speed by ~9-fold by actually committing every 1000 questions.
There was an error made where although questions were only checked for completion every 1000 rows, each completed question was committed separately, resulting in far too many solr calls. Also modified process to only commit entries in solr at the end, after the database transaction is committed. This means if the process is aborted mid-way through, there won't be orphaned data in solr any more.
This commit is contained in:
parent
fdf31a3ef6
commit
26b803e119
@ -338,13 +338,14 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
if self.row_count % 1000 == 0:
|
||||
print('%-10s Processed %d rows.' % ('[post]', self.row_count))
|
||||
|
||||
# only check for finished questions every 200 rows to speed things up
|
||||
if self.row_count % 200 == 0:
|
||||
# only check for finished questions every 1000 rows to speed things up
|
||||
if self.row_count % 1000 == 0:
|
||||
self.commit_finished_questions()
|
||||
|
||||
def commit_finished_questions(self):
|
||||
# check if any questions are now complete (answerCount=len(answers))
|
||||
finished_question_ids = [ ]
|
||||
questions_to_commit = [ ]
|
||||
for id, q in self.unfinished_questions.items():
|
||||
if len(q['answers']) >= q['answerCount']:
|
||||
if len(q['answers']) > q['answerCount']:
|
||||
@ -352,7 +353,7 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
|
||||
try:
|
||||
# question is complete, store it.
|
||||
self.commit_question(q)
|
||||
questions_to_commit.append(self.finalise_question(q))
|
||||
|
||||
except Exception, e:
|
||||
# could not serialise and insert this question, so ignore it
|
||||
@ -364,11 +365,14 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
finally:
|
||||
finished_question_ids.append(id)
|
||||
|
||||
# commit at the end so if cancelled, there won't be orphans in solr.
|
||||
self.commit_questions(questions_to_commit, commit=False)
|
||||
|
||||
# remove any finished questions from the unfinished list
|
||||
for id in finished_question_ids:
|
||||
self.unfinished_questions.pop(id)
|
||||
|
||||
def commit_question(self, q):
|
||||
def finalise_question(self, q):
|
||||
"""
|
||||
Massages and serialises the question object so it can be inserted into
|
||||
the search index in the form that we want.
|
||||
@ -493,7 +497,13 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
|
||||
doc['question-json'] = json.dumps(question_obj, default=self.json_default_handler)
|
||||
|
||||
solr.add([ doc ])
|
||||
return doc
|
||||
|
||||
def commit_questions(self, questions, commit=True):
|
||||
"""
|
||||
Commits the given list of questions to solr.
|
||||
"""
|
||||
solr.add(questions, commit=commit)
|
||||
|
||||
def commit_all_questions(self):
|
||||
"""
|
||||
@ -503,12 +513,13 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
"""
|
||||
self.commit_finished_questions()
|
||||
|
||||
questions_to_commit = [ ]
|
||||
for id,q in self.unfinished_questions.items():
|
||||
print('Question [ID# %d] was expected to have %d answers, but got %d instead. Ignoring inconsistency.' % (q['id'], q['answerCount'], len(q['answers'])))
|
||||
|
||||
try:
|
||||
# question is complete, store it.
|
||||
self.commit_question(q)
|
||||
questions_to_commit.append(self.finalise_question(q))
|
||||
|
||||
except Exception, e:
|
||||
# could not serialise and insert this question, so ignore it
|
||||
@ -517,6 +528,9 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
traceback.print_exc()
|
||||
print('Could not process the question ' + repr(q))
|
||||
|
||||
# commit at the end so if cancelled, there won't be orphans in solr.
|
||||
self.commit_questions(questions_to_commit, commit=False)
|
||||
|
||||
# we're committing all questions, so nothing is now unfinished
|
||||
self.unfinished_questions.clear()
|
||||
|
||||
@ -659,7 +673,8 @@ if len(site) > 0:
|
||||
print('Deleted.\n')
|
||||
|
||||
print('Deleting site "%s" from the solr... ' % site.name)
|
||||
solr.delete(q='siteKey:"%s"' % site.key)
|
||||
solr.delete(q='siteKey:"%s"' % site.key, commit=False)
|
||||
solr.commit(expungeDeletes=True)
|
||||
print('Deleted.\n')
|
||||
|
||||
timing_start = time.time()
|
||||
@ -721,7 +736,10 @@ conn.query(conn.sqlrepr(Delete(Comment.sqlmeta.table, where=(Comment.q.site == s
|
||||
print('[comment] FINISHED DELETING COMMENTS.\n')
|
||||
|
||||
# commit transaction
|
||||
print('COMMITTING IMPORTED DATA TO DISK...')
|
||||
sqlhub.threadConnection.commit(close=True)
|
||||
solr.commit()
|
||||
print('FINISHED COMMITTING IMPORTED DATA TO DISK.\n')
|
||||
|
||||
timing_end = time.time()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user