mirror of
https://github.com/djohnlewis/stackdump
synced 2024-12-04 23:17:37 +00:00
Backed out the comments-batching change.
It was causing weird perf issues and errors. Didn't really seem like it made things faster; if anything, things became slower.
This commit is contained in:
parent
bf09e36928
commit
75a216f5a4
@ -119,7 +119,7 @@ class BadgeContentHandler(BaseContentHandler):
|
||||
traceback.print_exc()
|
||||
print('Could not parse the row ' + repr(attrs))
|
||||
|
||||
class CommentContentHandler(xml.sax.ContentHandler):
|
||||
class CommentContentHandler(BaseContentHandler):
|
||||
"""
|
||||
Parses the string -
|
||||
|
||||
@ -130,22 +130,7 @@ class CommentContentHandler(xml.sax.ContentHandler):
|
||||
|
||||
"""
|
||||
def __init__(self, conn, site):
|
||||
self.conn = conn
|
||||
self.site = site
|
||||
self.cur_props = None
|
||||
self.row_count = 0
|
||||
self.db_style = DefaultStyle()
|
||||
self.cur_batch = [ ]
|
||||
self.db_field_names = dict((c.dbName, i) for i, c in enumerate(Comment.sqlmeta.columns.values()))
|
||||
|
||||
db_field_names_ordered = self.db_field_names.items()
|
||||
db_field_names_ordered.sort(key=lambda x: x[1])
|
||||
db_field_names_ordered = [ x[0] for x in db_field_names_ordered ]
|
||||
self.sql = 'INSERT INTO %s (%s) VALUES (%s)' % (
|
||||
Comment.sqlmeta.table,
|
||||
', '.join(db_field_names_ordered),
|
||||
', '.join([ '?' ] * len(db_field_names_ordered))
|
||||
)
|
||||
BaseContentHandler.__init__(self, conn, site, Comment)
|
||||
|
||||
def startElement(self, name, attrs):
|
||||
if name != 'row':
|
||||
@ -168,64 +153,6 @@ class CommentContentHandler(xml.sax.ContentHandler):
|
||||
traceback.print_exc()
|
||||
print('Could not parse the row ' + repr(attrs))
|
||||
|
||||
def endElement(self, name):
|
||||
if name != 'row':
|
||||
return
|
||||
|
||||
if not self.cur_props:
|
||||
return
|
||||
|
||||
# we want to count failed rows as well as successful ones as this is
|
||||
# a count of rows processed.
|
||||
self.row_count += 1
|
||||
|
||||
# the cur_props is now complete. Prepare it for saving. 'None' is the
|
||||
# default value for the database.
|
||||
props_for_db = [ None ] * len(self.db_field_names)
|
||||
props = self.cur_props.copy()
|
||||
for k,v in self.cur_props.items():
|
||||
# if this is a reference to a FK, massage the values to fit
|
||||
if isinstance(v, SQLObject):
|
||||
k += 'Id'
|
||||
v = v.id
|
||||
# need to convert the attr names to DB column names
|
||||
col_name = self.db_style.pythonAttrToDBColumn(k)
|
||||
if col_name in self.db_field_names.keys():
|
||||
props_for_db[self.db_field_names[col_name]] = v
|
||||
del props[k]
|
||||
|
||||
# this shouldn't occur as we created it; just a sanity check
|
||||
if len(props):
|
||||
print('The fields "%s" were unrecognized and ignored from this comment %s' % (', '.join(props.keys()), repr(self.cur_props)))
|
||||
|
||||
self.cur_batch.append(props_for_db)
|
||||
|
||||
# reset ourself for the next comment
|
||||
self.cur_props = None
|
||||
|
||||
if self.row_count % 1000 == 0:
|
||||
print('%-10s Processed %d rows.' % ('[comment]', self.row_count))
|
||||
|
||||
if self.row_count % 10000 == 0:
|
||||
self.commit_comments_batch()
|
||||
|
||||
def commit_comments_batch(self):
|
||||
print('Saving and committing current batch of comments...')
|
||||
trans = self.conn.transaction()
|
||||
|
||||
try:
|
||||
trans._connection.executemany(self.sql, self.cur_batch)
|
||||
trans.commit()
|
||||
except Exception, e:
|
||||
# could not save this batch, abort.
|
||||
trans.rollback()
|
||||
print('Exception: ' + str(e))
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
print('Could not commit transaction for comment batch. Aborting.')
|
||||
raise e
|
||||
|
||||
|
||||
class UserContentHandler(BaseContentHandler):
|
||||
"""
|
||||
Parses the string -
|
||||
@ -676,9 +603,6 @@ def get_file_path(dir_path, filename):
|
||||
|
||||
return os.path.abspath(os.path.join(dir_path, matches[0]))
|
||||
|
||||
def create_comment_indices(conn):
|
||||
# (site_id, post_id) index
|
||||
conn.execute('CREATE INDEX IF NOT EXISTS comment_siteId_postId_index ON comment (site_id, post_id)')
|
||||
|
||||
def import_site(xml_root, site_name, dump_date, site_desc, site_key,
|
||||
site_base_url, answer_yes=False):
|
||||
@ -866,9 +790,11 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
|
||||
|
||||
timing_start = time.time()
|
||||
|
||||
# start a new transaction (comments db transaction are managed elsewhere)
|
||||
# start a new transaction
|
||||
sqlhub.threadConnection = sqlhub.processConnection.transaction()
|
||||
conn = sqlhub.threadConnection
|
||||
comment_db_sqlhub.threadConnection = comment_db_sqlhub.processConnection.transaction()
|
||||
temp_db_conn = comment_db_sqlhub.threadConnection
|
||||
|
||||
# create a new Site
|
||||
site = Site(name=site_name, desc=site_desc, key=site_key, dump_date=dump_date,
|
||||
@ -891,12 +817,9 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
|
||||
print('[comment] PARSING COMMENTS...')
|
||||
xml_path = get_file_path(xml_root, 'comments.xml')
|
||||
print('[comment] start parsing comments.xml...')
|
||||
handler = CommentContentHandler(comment_db_sqlhub.processConnection, site)
|
||||
handler = CommentContentHandler(temp_db_conn, site)
|
||||
xml.sax.parse(xml_path, handler)
|
||||
handler.commit_comments_batch()
|
||||
print('%-10s Processed %d rows.' % ('[comment]', handler.row_count))
|
||||
print('[comment] creating database indices...')
|
||||
create_comment_indices(comment_db_sqlhub.processConnection.getConnection())
|
||||
print('[comment] FINISHED PARSING COMMENTS.\n')
|
||||
|
||||
# USERS
|
||||
@ -913,10 +836,6 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
|
||||
print('[post] PARSING POSTS...')
|
||||
xml_path = get_file_path(xml_root, 'posts.xml')
|
||||
print('[post] start parsing posts.xml...')
|
||||
# set up the thread connection transaction here so all queries by the handler
|
||||
# are inside a transaction; not done earlier as the comment handler manages
|
||||
# transactions itself
|
||||
comment_db_sqlhub.threadConnection = comment_db_sqlhub.processConnection.transaction()
|
||||
handler = PostContentHandler(solr, site)
|
||||
xml.sax.parse(xml_path, handler)
|
||||
handler.commit_all_questions()
|
||||
@ -926,7 +845,7 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
|
||||
|
||||
# DELETE COMMENTS
|
||||
print('[comment] DELETING TEMPORARY COMMENTS DATABASE (they are no longer needed)...')
|
||||
comment_db_sqlhub.threadConnection.commit(close=True)
|
||||
temp_db_conn.commit(close=True)
|
||||
comment_db_sqlhub.processConnection.close()
|
||||
os.remove(temp_db_path)
|
||||
print('[comment] FINISHED DELETING COMMENTS.\n')
|
||||
|
Loading…
Reference in New Issue
Block a user