#!/usr/bin/env python # This script takes extracted site files and inserts them into the database. from __future__ import with_statement import sys import os import xml.sax from datetime import datetime import platform import re from sqlobject import * from pysolr import Solr try: # For Python < 2.6 or people using a newer version of simplejson import simplejson as json except ImportError: # For Python >= 2.6 import json is_jython = 'Java' in platform.system() script_dir = os.path.dirname(sys.argv[0]) # MODELS # use UnicodeCol instead of StringCol; StringCol defaults to ascii when encoding # is unspecified, as it is with Jython zxJDBC. class Site(SQLObject): name = UnicodeCol() desc = UnicodeCol() class Badge(SQLObject): sourceId = IntCol() site = ForeignKey('Site', cascade=True) userId = IntCol() name = UnicodeCol() date = DateTimeCol() class Comment(SQLObject): sourceId = IntCol() site = ForeignKey('Site', cascade=True) postId = IntCol() score = IntCol() text = UnicodeCol() creationDate = DateTimeCol() userId = IntCol() json_fields = [ 'id', 'score', 'text', 'creationDate', 'userId' ] class User(SQLObject): sourceId = IntCol() site = ForeignKey('Site', cascade=True) reputation = IntCol() creationDate = DateTimeCol() displayName = UnicodeCol() emailHash = UnicodeCol() lastAccessDate = DateTimeCol() websiteUrl = UnicodeCol() location = UnicodeCol() age = IntCol() aboutMe = UnicodeCol() views = IntCol() upVotes = IntCol() downVotes = IntCol() # SAX HANDLERS # Jython can't handle the %f format specifier if is_jython: ISO_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S' else: ISO_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' class BaseContentHandler(xml.sax.ContentHandler): """ Base content handler. """ def __init__(self, site, obj_class): self.site = site self.obj_class = obj_class self.cur_props = None self.row_count = 0 def endElement(self, name): if name != 'row': return if not self.cur_props: return # we want to count failed rows as well as successful ones as this is # a count of rows processed. self.row_count += 1 # the cur_props is now complete. Save it. try: # the object is automatically saved to the database on creation self.obj_class(**self.cur_props) except Exception, e: # could not insert this, so ignore the row print('Exception: ' + str(e)) import traceback traceback.print_exc() print('Could not insert the row ' + repr(self.cur_props)) self.cur_props = None if self.row_count % 1000 == 0: print('[badge]\t\tProcessed %d rows.' % (self.row_count)) class BadgeContentHandler(BaseContentHandler): """ Parses the string - """ def __init__(self, site): BaseContentHandler.__init__(self, site, Badge) def startElement(self, name, attrs): if name != 'row': return try: d = self.cur_props = { 'site' : self.site } # this hack to get the Id attr is needed due to Jython bug #1768 d['sourceId'] = is_jython and int(attrs._attrs.getValue('Id')) or int(attrs['Id']) d['userId'] = int(attrs.get('UserId', 0)) d['name'] = attrs.get('Name', '') d['date'] = datetime.strptime(attrs.get('Date'), ISO_DATE_FORMAT) except Exception, e: # could not parse this, so ignore the row completely self.cur_props = None print('Exception: ' + str(e)) import traceback traceback.print_exc() print('Could not parse the row ' + repr(attrs)) class CommentContentHandler(BaseContentHandler): """ Parses the string - """ def __init__(self, site): BaseContentHandler.__init__(self, site, Comment) def startElement(self, name, attrs): if name != 'row': return try: d = self.cur_props = { 'site' : self.site } # this hack to get the Id attr is needed due to Jython bug #1768 d['sourceId'] = is_jython and int(attrs._attrs.getValue('Id')) or int(attrs['Id']) d['postId'] = int(attrs.get('PostId', 0)) d['score'] = int(attrs.get('Score', 0)) d['text'] = attrs.get('Text', '') d['creationDate'] = datetime.strptime(attrs.get('CreationDate'), ISO_DATE_FORMAT) d['userId'] = int(attrs.get('UserId', 0)) except Exception, e: # could not parse this, so ignore the row completely self.cur_props = None print('Exception: ' + str(e)) import traceback traceback.print_exc() print('Could not parse the row ' + repr(attrs)) class UserContentHandler(BaseContentHandler): """ Parses the string - """ def __init__(self, site): BaseContentHandler.__init__(self, site, User) def startElement(self, name, attrs): if name != 'row': return try: d = self.cur_props = { 'site' : site } # this hack to get the Id attr is needed due to Jython bug #1768 d['sourceId'] = is_jython and int(attrs._attrs.getValue('Id')) or int(attrs['Id']) d['reputation'] = int(attrs.get('Reputation', 0)) d['creationDate'] = datetime.strptime(attrs.get('CreationDate'), ISO_DATE_FORMAT) d['displayName'] = attrs.get('DisplayName', '') d['emailHash'] = attrs.get('EmailHash', '') d['lastAccessDate'] = datetime.strptime(attrs.get('LastAccessDate'), ISO_DATE_FORMAT) d['websiteUrl'] = attrs.get('WebsiteUrl', '') d['location'] = attrs.get('Location', '') d['age'] = int(attrs.get('Age', 0)) d['aboutMe'] = attrs.get('AboutMe', '') d['views'] = int(attrs.get('Views', 0)) d['upVotes'] = int(attrs.get('UpVotes', 0)) d['downVotes'] = int(attrs.get('DownVotes', 0)) except Exception, e: # could not parse this, so ignore the row completely self.cur_props = None print('Exception: ' + str(e)) import traceback traceback.print_exc() print('Could not parse the row ' + repr(attrs)) class PostContentHandler(xml.sax.ContentHandler): """ Parses the string - """ TAGS_RE = re.compile(u'<([\w\d\-]+)>') def __init__(self, site): self.site = site self.unfinished_questions = { } self.cur_props = None self.row_count = 0 def json_default_handler(self, obj): # for date object handling if hasattr(obj, 'isoformat'): return obj.isoformat() else: raise TypeError, 'Object of type %s with value of %s is not JSON serializable' % (type(obj), repr(obj)) def startElement(self, name, attrs): if name != 'row': return try: d = self.cur_props = { } # this hack to get the Id attr is needed due to Jython bug #1768 d['id'] = is_jython and int(attrs._attrs.getValue('Id')) or int(attrs['Id']) if attrs['PostTypeId'] == '2': # I am an answer. d['parentId'] = int(attrs['ParentId']) elif attrs['PostTypeId'] == '1': # I am a question. d['answers'] = [ ] d['answerCount'] = int(attrs.get('AnswerCount', 0)) d['viewCount'] = int(attrs.get('ViewCount', 0)) else: raise ValueError('Unknown PostTypeId [%s] for row ID [%s]' % (attrs.get('PostTypeId', -1), attrs.get('Id', -1))) if 'AcceptedAnswerId' in attrs: d['acceptedAnswerId'] = int(attrs.get('AcceptedAnswerId', 0)) d['creationDate'] = datetime.strptime(attrs.get('CreationDate'), ISO_DATE_FORMAT) d['score'] = int(attrs.get('Score', 0)) d['body'] = attrs.get('Body', '') d['ownerUserId'] = int(attrs.get('OwnerUserId', 0)) if 'LastEditorUserId' in attrs: d['lastEditorUserId'] = int(attrs.get('LastEditorUserId', '')) if 'LastEditDate' in attrs: d['lastEditDate'] = datetime.strptime(attrs.get('LastEditDate'), ISO_DATE_FORMAT) d['lastActivityDate'] = datetime.strptime(attrs.get('LastActivityDate'), ISO_DATE_FORMAT) if 'CommunityOwnedDate' in attrs: d['communityOwnedDate'] = datetime.strptime(attrs.get('CommunityOwnedDate'), ISO_DATE_FORMAT) if 'ClosedDate' in attrs: d['closedDate'] = datetime.strptime(attrs.get('ClosedDate'), ISO_DATE_FORMAT) d['title'] = attrs.get('Title', '') if 'Tags' in attrs: d['tags'] = attrs.get('Tags', '') d['commentCount'] = int(attrs.get('CommentCount', 0)) d['favoriteCount'] = int(attrs.get('FavoriteCount', 0)) d['comments'] = [ ] except Exception, e: # could not parse this, so ignore the row completely self.cur_props = None print('Exception: ' + str(e)) # TODO: enable these in verbose/debug output mode #import traceback #traceback.print_exc() #print('Could not parse the row ' + repr(dict([(k,attrs[k]) for k in attrs.getNames()]))) def endElement(self, name): if name != 'row': return if not self.cur_props: return # we want to count failed rows as well as successful ones as this is # a count of rows processed. self.row_count += 1 try: d = self.cur_props # find, convert to JSON and attach any comments for this question comments = Comment.select(AND(Comment.q.site == self.site, Comment.q.postId == int(d['id']))) for comment in comments: c = { } for f in Comment.json_fields: c[f] = getattr(comment, f) d['comments'].append(c) if len(d['comments']) != d['commentCount']: print('Post ID [%s] expected to have %d comments, but got %d instead. Ignoring inconsistency.' % (d['id'], d['commentCount'], len(d['comments']))) # the cur_props is now complete. Stash it away until question is complete. if d.has_key('parentId'): # this is an answer. if not self.unfinished_questions.has_key(d['parentId']): print('lookup keys: ' + repr(self.unfinished_questions.keys())) raise ValueError("This answer's [ID# %s] question [ID# %s] has not been processed yet. Incorrect order in XML? Ignoring answer." % (d['id'], d['parentId'])) else: self.unfinished_questions[d['parentId']]['answers'].append(d) else: # this is a question. if self.unfinished_questions.has_key(d['id']): # this should not occur; duplicate question id. raise ValueError('Question ID [%s] already exists.\nThis title: %s\nDuplicate title:%s\nIgnoring duplicate.' % (d['id'], d['title'], self.unfinished_questions[d['id']]['title'])) else: self.unfinished_questions[d['id']] = d except Exception, e: # could not insert this, so ignore the row print('Exception: ' + str(e)) import traceback traceback.print_exc() print('Could not process the row ' + repr(self.cur_props)) self.cur_props = None if self.row_count % 1000 == 0: print('\tProcessed %d rows.' % (self.row_count)) self.commit_finished_questions() def commit_finished_questions(self): # check if any questions are now complete (answerCount=len(answers)) finished_question_ids = [ ] for id, q in self.unfinished_questions.items(): if len(q['answers']) >= q['answerCount']: if len(q['answers']) > q['answerCount']: print('Question ID [%s] expected to have %d answers, but got %d instead. Ignoring inconsistency.' % (q['id'], q['answerCount'], len(q['answers']))) try: # question is complete, store it. self.commit_question(q) except Exception, e: # could not serialise and insert this question, so ignore it print('Exception: ' + str(e)) import traceback traceback.print_exc() print('Could not process the completed question ' + repr(q)) finally: finished_question_ids.append(id) # remove any finished questions from the unfinished list for id in finished_question_ids: self.unfinished_questions.pop(id) def commit_question(self, q): """ Massages and serialises the question object so it can be inserted into the search index in the form that we want. Things this does - * creates the 'text' field for the search index that contains all the text of the question (title, question, answers and all comments). * serialises answers to JSON * creates dict that maps to the search index document schema * remove unwanted attributes from the q object and serialise question to JSON * add question JSON to document * commit document to search index. """ doc = { } # create the text field contents search_text = [ ] # question bits search_text.append(q['title']) search_text.append(q['body']) for c in q['comments']: search_text.append(c['text']) # answer bits for a in q['answers']: search_text.append(a['body']) for c in a['comments']: search_text.append(c['text']) search_text = ' '.join(search_text) doc['text'] = search_text # serialise answers to JSON doc['answer-json'] = [ json.dumps(a, default=self.json_default_handler) for a in q['answers'] ] # map other fields to search index doc doc['id'] = str(q['id']) doc['siteName'] = self.site.name doc['creationDate'] = q['creationDate'] doc['score'] = q['score'] doc['viewCount'] = q['viewCount'] doc['title'] = q['title'] doc['ownerUserId'] = q['ownerUserId'] if 'lastEditorUserId' in q: doc['lastEditorUserId'] = q['lastEditorUserId'] doc['lastActivityDate'] = q['lastActivityDate'] if 'communityOwnedDate' in q: doc['communityOwnedDate'] = q['communityOwnedDate'] if 'closedDate' in q: doc['closedDate'] = q['closedDate'] if 'tags' in q: # parse tags into a list doc['tags'] = PostContentHandler.TAGS_RE.findall(q['tags']) # serialise question to JSON (the q object has cruft we don't want) question_obj = { } question_obj['id'] = q['id'] if 'acceptedAnswerId' in q: question_obj['acceptedAnswerId'] = q['acceptedAnswerId'] question_obj['creationDate'] = q['creationDate'] question_obj['score'] = q['score'] question_obj['viewCount'] = q['viewCount'] question_obj['body'] = q['body'] question_obj['ownerUserId'] = q['ownerUserId'] if 'lastEditorUserId' in q: question_obj['lastEditorUserId'] = q['lastEditorUserId'] if 'LastEditDate' in q: question_obj['lastEditDate'] = q['lastEditDate'] question_obj['lastActivityDate'] = q['lastActivityDate'] if 'communityOwnedDate' in q: question_obj['communityOwnedDate'] = q['communityOwnedDate'] if 'closedDate' in q: question_obj['closedDate'] = q['closedDate'] question_obj['title'] = q['title'] if 'tags' in q: question_obj['tags'] = q['tags'] question_obj['favoriteCount'] = q['favoriteCount'] question_obj['comments'] = q['comments'] doc['question-json'] = json.dumps(question_obj, default=self.json_default_handler) solr.add([ doc ]) def commit_all_questions(self): """ Commits all questions, regardless of whether they're completed or not. Should be called after all XML has been parsed. """ for id,q in self.unfinished_questions.items(): print('Question [ID# %d] was expected to have %d answers, but got %d instead. Ignoring inconsistency.' % (q['id'], q['answerCount'], len(q['answers']))) try: # question is complete, store it. self.commit_question(q) except Exception, e: # could not serialise and insert this question, so ignore it print('Exception: ' + str(e)) import traceback traceback.print_exc() print('Could not process the question ' + repr(q)) # we're committing all questions, so nothing is now unfinished self.unfinished_questions.clear() # MAIN METHOD if len(sys.argv) != 2: print('One argument is expected - the path to the extracted XML files.') sys.exit(1) xml_root = sys.argv[1] print('Using the XML root path: ' + xml_root + '\n') if not os.path.exists(xml_root): print('The given XML root path does not exist.') sys.exit(1) db_path = os.path.abspath(os.path.join(script_dir, '../../data/stackdump.sqlite')) # connect to the database print('Connecting to the database...') if is_jython: conn_str = 'jython_sqlite://' + db_path else: # assume cPython conn_str = 'sqlite://' + db_path sqlhub.processConnection = connectionForURI(conn_str) #sqlhub.processConnection = connectionForURI('jython_sqlite://:memory:') print('Connected.\n') # connect to solr print('Connecting to solr...') solr = Solr("http://localhost:8983/solr/") print('Connected.\n') # ensure required tables exist print("Creating tables if they don't exist...") Site.createTable(ifNotExists=True) Badge.createTable(ifNotExists=True) Comment.createTable(ifNotExists=True) User.createTable(ifNotExists=True) print('Created.\n') # SITE NAME # get the site name from the first line of readme.txt. This could be fragile. with open(os.path.join(xml_root, 'readme.txt')) as f: site_desc = f.readline().strip() # assume if there's a colon in the name, the name part is before, and the date # part is after. if ':' in site_desc: site_name, site_date = site_desc.split(':') else: site_name = site_desc site_date = '' print('Site name is %s\n' % site_name) # check if site is already in database; if so, purge the data. sites = Site.select(Site.q.name==site_name) # the site really shouldn't exist more than once, but just in case for site in sites: print('Deleting site "%s" from the database... ' % site.desc) sys.stdout.flush() Site.delete(site.id) # the relationship cascades, so other rows will be deleted print('Deleted.\n') print('Deleting site "%s" from the solr... ' % site_desc) solr.delete(q='siteName:"%s"' % site_name) print('Deleted.\n') # create a new Site site = Site(name=site_name, desc=site_desc) # BADGES print('[badge] PARSING BADGES...') sqlhub.threadConnection = sqlhub.processConnection.transaction() xml_path = os.path.join(xml_root, 'badges.xml') print('[badge] start parsing badges.xml...') handler = BadgeContentHandler(site) xml.sax.parse(xml_path, handler) sqlhub.threadConnection.commit(close=True) print('[badge]\t\tProcessed %d rows.' % (handler.row_count)) print('[badge] FINISHED PARSING BADGES.\n') # COMMENTS print('[comment] PARSING COMMENTS...') sqlhub.threadConnection = sqlhub.processConnection.transaction() xml_path = os.path.join(xml_root, 'comments.xml') print('[comment] start parsing comments.xml...') handler = CommentContentHandler(site) xml.sax.parse(xml_path, handler) sqlhub.threadConnection.commit(close=True) print('[comment]\tProcessed %d rows.' % (handler.row_count)) print('[comment] FINISHED PARSING COMMENTS.\n') # USERS print('[user] PARSING USERS...') sqlhub.threadConnection = sqlhub.processConnection.transaction() xml_path = os.path.join(xml_root, 'users.xml') print('[user] start parsing users.xml...') handler = UserContentHandler(site) xml.sax.parse(xml_path, handler) sqlhub.threadConnection.commit(close=True) print('[user]\t\tProcessed %d rows.' % (handler.row_count)) print('[user] FINISHED PARSING USERS.\n') # POSTS print('[post] PARSING POSTS...') sqlhub.threadConnection = sqlhub.processConnection.transaction() xml_path = os.path.join(xml_root, 'posts.xml') print('[post] start parsing posts.xml...') handler = PostContentHandler(site) xml.sax.parse(xml_path, handler) handler.commit_all_questions() sqlhub.threadConnection.commit(close=True) print('[post]\tProcessed %d rows.' % (handler.row_count)) print('[post] FINISHED PARSING POSTS.\n') # TODO: delete comments?