1
0
mirror of https://github.com/djohnlewis/stackdump synced 2024-12-04 15:07:36 +00:00

Comments are now committed in batches and using a 'prepared' statement via executemany.

Also fixed a Windows compatibility bug with the new temp comments db and a bug with the webapp now that the Comment model has moved. Dates are also no longer parsed from their ISO form for comments; instead left as strings and parsed by SQLObject internally as needed.
This commit is contained in:
Samuel Lai 2013-11-28 23:51:53 +11:00
parent 8e3d21f817
commit cdb8d96508
3 changed files with 89 additions and 13 deletions

View File

@ -26,7 +26,7 @@ import html5lib
from html5lib.filters._base import Filter as HTML5LibFilterBase
import markdown
from stackdump.models import Site, Badge, Comment, User
from stackdump.models import Site, Badge, User
from stackdump import settings
# STATIC VARIABLES

View File

@ -119,7 +119,7 @@ class BadgeContentHandler(BaseContentHandler):
traceback.print_exc()
print('Could not parse the row ' + repr(attrs))
class CommentContentHandler(BaseContentHandler):
class CommentContentHandler(xml.sax.ContentHandler):
"""
Parses the string -
@ -130,7 +130,22 @@ class CommentContentHandler(BaseContentHandler):
"""
def __init__(self, conn, site):
BaseContentHandler.__init__(self, conn, site, Comment)
self.conn = conn
self.site = site
self.cur_props = None
self.row_count = 0
self.db_style = DefaultStyle()
self.cur_batch = [ ]
self.db_field_names = dict((c.dbName, i) for i, c in enumerate(Comment.sqlmeta.columns.values()))
db_field_names_ordered = self.db_field_names.items()
db_field_names_ordered.sort(key=lambda x: x[1])
db_field_names_ordered = [ x[0] for x in db_field_names_ordered ]
self.sql = 'INSERT INTO %s (%s) VALUES (%s)' % (
Comment.sqlmeta.table,
', '.join(db_field_names_ordered),
', '.join([ '?' ] * len(db_field_names_ordered))
)
def startElement(self, name, attrs):
if name != 'row':
@ -142,7 +157,7 @@ class CommentContentHandler(BaseContentHandler):
d['postId'] = int(attrs.get('PostId', 0))
d['score'] = int(attrs.get('Score', 0))
d['text'] = attrs.get('Text', '')
d['creationDate'] = datetime.strptime(attrs.get('CreationDate'), ISO_DATE_FORMAT)
d['creationDate'] = attrs.get('CreationDate')
d['userId'] = int(attrs.get('UserId', 0))
except Exception, e:
@ -153,6 +168,64 @@ class CommentContentHandler(BaseContentHandler):
traceback.print_exc()
print('Could not parse the row ' + repr(attrs))
def endElement(self, name):
if name != 'row':
return
if not self.cur_props:
return
# we want to count failed rows as well as successful ones as this is
# a count of rows processed.
self.row_count += 1
# the cur_props is now complete. Prepare it for saving. 'None' is the
# default value for the database.
props_for_db = [ None ] * len(self.db_field_names)
props = self.cur_props.copy()
for k,v in self.cur_props.items():
# if this is a reference to a FK, massage the values to fit
if isinstance(v, SQLObject):
k += 'Id'
v = v.id
# need to convert the attr names to DB column names
col_name = self.db_style.pythonAttrToDBColumn(k)
if col_name in self.db_field_names.keys():
props_for_db[self.db_field_names[col_name]] = v
del props[k]
# this shouldn't occur as we created it; just a sanity check
if len(props):
print('The fields "%s" were unrecognized and ignored from this comment %s' % (', '.join(props.keys()), repr(self.cur_props)))
self.cur_batch.append(props_for_db)
# reset ourself for the next comment
self.cur_props = None
if self.row_count % 1000 == 0:
print('%-10s Processed %d rows.' % ('[comment]', self.row_count))
if self.row_count % 10000 == 0:
self.commit_comments_batch()
def commit_comments_batch(self):
print('Saving and committing current batch of comments...')
trans = self.conn.transaction()
try:
trans._connection.executemany(self.sql, self.cur_batch)
trans.commit()
except Exception, e:
# could not save this batch, abort.
trans.rollback()
print('Exception: ' + str(e))
import traceback
traceback.print_exc()
print('Could not commit transaction for comment batch. Aborting.')
raise e
class UserContentHandler(BaseContentHandler):
"""
Parses the string -
@ -563,7 +636,7 @@ class Comment(SQLObject):
postId = IntCol()
score = IntCol()
text = UnicodeCol()
creationDate = DateTimeCol()
creationDate = DateTimeCol(datetimeFormat=ISO_DATE_FORMAT)
userId = IntCol()
siteId_postId_index = DatabaseIndex(siteId, postId)
@ -779,9 +852,9 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
# create the temporary comments database
print('Connecting to the temporary comments database...')
temp_db_file, temp_db_path = tempfile.mkstemp('.sqlite', 'temp_comment_db-' + re.sub(r'[^\w]', '_', site_key), settings.TEMP_COMMENTS_DATABASE_DIR)
temp_db_file, temp_db_path = tempfile.mkstemp('.sqlite', 'temp_comment_db-' + re.sub(r'[^\w]', '_', site_key) + '-', settings.TEMP_COMMENTS_DATABASE_DIR)
os.close(temp_db_file)
conn_str = 'sqlite://' + temp_db_path
conn_str = 'sqlite:///' + temp_db_path
comment_db_sqlhub.processConnection = connectionForURI(conn_str)
print('Connected.')
Comment.createTable()
@ -789,11 +862,9 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
timing_start = time.time()
# start a new transaction
# start a new transaction (comments db transaction are managed elsewhere)
sqlhub.threadConnection = sqlhub.processConnection.transaction()
conn = sqlhub.threadConnection
comment_db_sqlhub.threadConnection = comment_db_sqlhub.processConnection.transaction()
temp_db_conn = comment_db_sqlhub.threadConnection
# create a new Site
site = Site(name=site_name, desc=site_desc, key=site_key, dump_date=dump_date,
@ -816,8 +887,9 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
print('[comment] PARSING COMMENTS...')
xml_path = get_file_path(xml_root, 'comments.xml')
print('[comment] start parsing comments.xml...')
handler = CommentContentHandler(temp_db_conn, site)
handler = CommentContentHandler(comment_db_sqlhub.processConnection, site)
xml.sax.parse(xml_path, handler)
handler.commit_comments_batch()
print('%-10s Processed %d rows.' % ('[comment]', handler.row_count))
print('[comment] FINISHED PARSING COMMENTS.\n')
@ -835,6 +907,10 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
print('[post] PARSING POSTS...')
xml_path = get_file_path(xml_root, 'posts.xml')
print('[post] start parsing posts.xml...')
# set up the thread connection transaction here so all queries by the handler
# are inside a transaction; not done earlier as the comment handler manages
# transactions itself
comment_db_sqlhub.threadConnection = comment_db_sqlhub.processConnection.transaction()
handler = PostContentHandler(solr, site)
xml.sax.parse(xml_path, handler)
handler.commit_all_questions()
@ -844,7 +920,7 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
# DELETE COMMENTS
print('[comment] DELETING TEMPORARY COMMENTS DATABASE (they are no longer needed)...')
temp_db_conn.commit(close=True)
comment_db_sqlhub.threadConnection.commit(close=True)
comment_db_sqlhub.processConnection.close()
os.remove(temp_db_path)
print('[comment] FINISHED DELETING COMMENTS.\n')

View File

@ -19,7 +19,7 @@ SOLR_URL = 'http://localhost:8983/solr/stackdump/'
import os
DATABASE_CONN_STR = 'sqlite:///' + os.path.join(os.path.dirname(__file__), '..', '..', '..', 'data', 'stackdump.sqlite')
TEMP_COMMENTS_DATABASE_DIR = '%s/../../../data' % os.path.dirname(__file__)
TEMP_COMMENTS_DATABASE_DIR = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'data')
# if the website is hosted under a subpath, specify it here. It must end with a
# slash.