diff --git a/python/src/stackdump/app.py b/python/src/stackdump/app.py index 8675d84..1163d84 100644 --- a/python/src/stackdump/app.py +++ b/python/src/stackdump/app.py @@ -26,7 +26,7 @@ import html5lib from html5lib.filters._base import Filter as HTML5LibFilterBase import markdown -from stackdump.models import Site, Badge, Comment, User +from stackdump.models import Site, Badge, User from stackdump import settings # STATIC VARIABLES diff --git a/python/src/stackdump/commands/import_site.py b/python/src/stackdump/commands/import_site.py index 43e0bae..7f942e8 100644 --- a/python/src/stackdump/commands/import_site.py +++ b/python/src/stackdump/commands/import_site.py @@ -119,7 +119,7 @@ class BadgeContentHandler(BaseContentHandler): traceback.print_exc() print('Could not parse the row ' + repr(attrs)) -class CommentContentHandler(BaseContentHandler): +class CommentContentHandler(xml.sax.ContentHandler): """ Parses the string - @@ -130,7 +130,22 @@ class CommentContentHandler(BaseContentHandler): """ def __init__(self, conn, site): - BaseContentHandler.__init__(self, conn, site, Comment) + self.conn = conn + self.site = site + self.cur_props = None + self.row_count = 0 + self.db_style = DefaultStyle() + self.cur_batch = [ ] + self.db_field_names = dict((c.dbName, i) for i, c in enumerate(Comment.sqlmeta.columns.values())) + + db_field_names_ordered = self.db_field_names.items() + db_field_names_ordered.sort(key=lambda x: x[1]) + db_field_names_ordered = [ x[0] for x in db_field_names_ordered ] + self.sql = 'INSERT INTO %s (%s) VALUES (%s)' % ( + Comment.sqlmeta.table, + ', '.join(db_field_names_ordered), + ', '.join([ '?' ] * len(db_field_names_ordered)) + ) def startElement(self, name, attrs): if name != 'row': @@ -142,7 +157,7 @@ class CommentContentHandler(BaseContentHandler): d['postId'] = int(attrs.get('PostId', 0)) d['score'] = int(attrs.get('Score', 0)) d['text'] = attrs.get('Text', '') - d['creationDate'] = datetime.strptime(attrs.get('CreationDate'), ISO_DATE_FORMAT) + d['creationDate'] = attrs.get('CreationDate') d['userId'] = int(attrs.get('UserId', 0)) except Exception, e: @@ -153,6 +168,64 @@ class CommentContentHandler(BaseContentHandler): traceback.print_exc() print('Could not parse the row ' + repr(attrs)) + def endElement(self, name): + if name != 'row': + return + + if not self.cur_props: + return + + # we want to count failed rows as well as successful ones as this is + # a count of rows processed. + self.row_count += 1 + + # the cur_props is now complete. Prepare it for saving. 'None' is the + # default value for the database. + props_for_db = [ None ] * len(self.db_field_names) + props = self.cur_props.copy() + for k,v in self.cur_props.items(): + # if this is a reference to a FK, massage the values to fit + if isinstance(v, SQLObject): + k += 'Id' + v = v.id + # need to convert the attr names to DB column names + col_name = self.db_style.pythonAttrToDBColumn(k) + if col_name in self.db_field_names.keys(): + props_for_db[self.db_field_names[col_name]] = v + del props[k] + + # this shouldn't occur as we created it; just a sanity check + if len(props): + print('The fields "%s" were unrecognized and ignored from this comment %s' % (', '.join(props.keys()), repr(self.cur_props))) + + self.cur_batch.append(props_for_db) + + # reset ourself for the next comment + self.cur_props = None + + if self.row_count % 1000 == 0: + print('%-10s Processed %d rows.' % ('[comment]', self.row_count)) + + if self.row_count % 10000 == 0: + self.commit_comments_batch() + + def commit_comments_batch(self): + print('Saving and committing current batch of comments...') + trans = self.conn.transaction() + + try: + trans._connection.executemany(self.sql, self.cur_batch) + trans.commit() + except Exception, e: + # could not save this batch, abort. + trans.rollback() + print('Exception: ' + str(e)) + import traceback + traceback.print_exc() + print('Could not commit transaction for comment batch. Aborting.') + raise e + + class UserContentHandler(BaseContentHandler): """ Parses the string - @@ -563,7 +636,7 @@ class Comment(SQLObject): postId = IntCol() score = IntCol() text = UnicodeCol() - creationDate = DateTimeCol() + creationDate = DateTimeCol(datetimeFormat=ISO_DATE_FORMAT) userId = IntCol() siteId_postId_index = DatabaseIndex(siteId, postId) @@ -779,9 +852,9 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key, # create the temporary comments database print('Connecting to the temporary comments database...') - temp_db_file, temp_db_path = tempfile.mkstemp('.sqlite', 'temp_comment_db-' + re.sub(r'[^\w]', '_', site_key), settings.TEMP_COMMENTS_DATABASE_DIR) + temp_db_file, temp_db_path = tempfile.mkstemp('.sqlite', 'temp_comment_db-' + re.sub(r'[^\w]', '_', site_key) + '-', settings.TEMP_COMMENTS_DATABASE_DIR) os.close(temp_db_file) - conn_str = 'sqlite://' + temp_db_path + conn_str = 'sqlite:///' + temp_db_path comment_db_sqlhub.processConnection = connectionForURI(conn_str) print('Connected.') Comment.createTable() @@ -789,11 +862,9 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key, timing_start = time.time() - # start a new transaction + # start a new transaction (comments db transaction are managed elsewhere) sqlhub.threadConnection = sqlhub.processConnection.transaction() conn = sqlhub.threadConnection - comment_db_sqlhub.threadConnection = comment_db_sqlhub.processConnection.transaction() - temp_db_conn = comment_db_sqlhub.threadConnection # create a new Site site = Site(name=site_name, desc=site_desc, key=site_key, dump_date=dump_date, @@ -816,8 +887,9 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key, print('[comment] PARSING COMMENTS...') xml_path = get_file_path(xml_root, 'comments.xml') print('[comment] start parsing comments.xml...') - handler = CommentContentHandler(temp_db_conn, site) + handler = CommentContentHandler(comment_db_sqlhub.processConnection, site) xml.sax.parse(xml_path, handler) + handler.commit_comments_batch() print('%-10s Processed %d rows.' % ('[comment]', handler.row_count)) print('[comment] FINISHED PARSING COMMENTS.\n') @@ -835,6 +907,10 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key, print('[post] PARSING POSTS...') xml_path = get_file_path(xml_root, 'posts.xml') print('[post] start parsing posts.xml...') + # set up the thread connection transaction here so all queries by the handler + # are inside a transaction; not done earlier as the comment handler manages + # transactions itself + comment_db_sqlhub.threadConnection = comment_db_sqlhub.processConnection.transaction() handler = PostContentHandler(solr, site) xml.sax.parse(xml_path, handler) handler.commit_all_questions() @@ -844,7 +920,7 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key, # DELETE COMMENTS print('[comment] DELETING TEMPORARY COMMENTS DATABASE (they are no longer needed)...') - temp_db_conn.commit(close=True) + comment_db_sqlhub.threadConnection.commit(close=True) comment_db_sqlhub.processConnection.close() os.remove(temp_db_path) print('[comment] FINISHED DELETING COMMENTS.\n') diff --git a/python/src/stackdump/default_settings.py b/python/src/stackdump/default_settings.py index 8484e73..cfa600e 100644 --- a/python/src/stackdump/default_settings.py +++ b/python/src/stackdump/default_settings.py @@ -19,7 +19,7 @@ SOLR_URL = 'http://localhost:8983/solr/stackdump/' import os DATABASE_CONN_STR = 'sqlite:///' + os.path.join(os.path.dirname(__file__), '..', '..', '..', 'data', 'stackdump.sqlite') -TEMP_COMMENTS_DATABASE_DIR = '%s/../../../data' % os.path.dirname(__file__) +TEMP_COMMENTS_DATABASE_DIR = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'data') # if the website is hosted under a subpath, specify it here. It must end with a # slash.