1
0
mirror of https://github.com/djohnlewis/stackdump synced 2025-04-07 18:23:27 +00:00

Merge import-perf-improvements branch to default.

This commit is contained in:
Samuel Lai 2013-11-29 13:01:41 +11:00
commit a597b2e588
5 changed files with 908 additions and 866 deletions

@ -246,6 +246,11 @@ class Solr(object):
Optionally accepts ``timeout`` for wait seconds until giving up on a Optionally accepts ``timeout`` for wait seconds until giving up on a
request. Default is ``60`` seconds. request. Default is ``60`` seconds.
Optionally accepts ``assume_clean`` to skip cleaning request of invalid XML
characters. This offers a slight performance improvement, but only set this
to ``True`` if you know your request is clean (e.g. coming from other XML
data). Bad things will happen otherwise. Default is ``False``.
Usage:: Usage::
solr = pysolr.Solr('http://localhost:8983/solr') solr = pysolr.Solr('http://localhost:8983/solr')
@ -253,10 +258,11 @@ class Solr(object):
solr = pysolr.Solr('http://localhost:8983/solr', timeout=10) solr = pysolr.Solr('http://localhost:8983/solr', timeout=10)
""" """
def __init__(self, url, decoder=None, timeout=60): def __init__(self, url, decoder=None, timeout=60, assume_clean=False):
self.decoder = decoder or json.JSONDecoder() self.decoder = decoder or json.JSONDecoder()
self.url = url self.url = url
self.timeout = timeout self.timeout = timeout
self.assume_clean = assume_clean
self.log = self._get_log() self.log = self._get_log()
self.session = requests.Session() self.session = requests.Session()
self.session.stream = False self.session.stream = False
@ -506,7 +512,10 @@ class Solr(object):
value = "{0}".format(value) value = "{0}".format(value)
return clean_xml_string(value) if self.assume_clean:
return value
else:
return clean_xml_string(value)
def _to_python(self, value): def _to_python(self, value):
""" """

@ -26,7 +26,7 @@ import html5lib
from html5lib.filters._base import Filter as HTML5LibFilterBase from html5lib.filters._base import Filter as HTML5LibFilterBase
import markdown import markdown
from stackdump.models import Site, Badge, Comment, User from stackdump.models import Site, Badge, User
from stackdump import settings from stackdump import settings
# STATIC VARIABLES # STATIC VARIABLES

@ -12,15 +12,17 @@ from datetime import datetime
import re import re
import urllib2 import urllib2
import socket import socket
import tempfile
from optparse import OptionParser from optparse import OptionParser
from xml.etree import ElementTree from xml.etree import ElementTree
from sqlobject import sqlhub, connectionForURI, AND, OR, IN, SQLObject from sqlobject import sqlhub, connectionForURI, AND, IN, SQLObject, \
UnicodeCol, DateTimeCol, IntCol, DatabaseIndex, dbconnection
from sqlobject.sqlbuilder import Delete, Insert from sqlobject.sqlbuilder import Delete, Insert
from sqlobject.styles import DefaultStyle from sqlobject.styles import DefaultStyle
from pysolr import Solr from pysolr import Solr
from stackdump.models import Site, Badge, Comment, User from stackdump.models import Site, Badge, User
from stackdump import settings from stackdump import settings
try: try:
@ -108,7 +110,7 @@ class BadgeContentHandler(BaseContentHandler):
d['sourceId'] = int(attrs['Id']) d['sourceId'] = int(attrs['Id'])
d['userId'] = int(attrs.get('UserId', 0)) d['userId'] = int(attrs.get('UserId', 0))
d['name'] = attrs.get('Name', '') d['name'] = attrs.get('Name', '')
d['date'] = datetime.strptime(attrs.get('Date'), ISO_DATE_FORMAT) d['date'] = attrs.get('Date')
except Exception, e: except Exception, e:
# could not parse this, so ignore the row completely # could not parse this, so ignore the row completely
self.cur_props = None self.cur_props = None
@ -135,12 +137,12 @@ class CommentContentHandler(BaseContentHandler):
return return
try: try:
d = self.cur_props = { 'site' : self.site } d = self.cur_props = { 'siteId' : self.site.id }
d['sourceId'] = int(attrs['Id']) d['sourceId'] = int(attrs['Id'])
d['postId'] = int(attrs.get('PostId', 0)) d['postId'] = int(attrs.get('PostId', 0))
d['score'] = int(attrs.get('Score', 0)) d['score'] = int(attrs.get('Score', 0))
d['text'] = attrs.get('Text', '') d['text'] = attrs.get('Text', '')
d['creationDate'] = datetime.strptime(attrs.get('CreationDate'), ISO_DATE_FORMAT) d['creationDate'] = attrs.get('CreationDate')
d['userId'] = int(attrs.get('UserId', 0)) d['userId'] = int(attrs.get('UserId', 0))
except Exception, e: except Exception, e:
@ -181,10 +183,10 @@ class UserContentHandler(BaseContentHandler):
d = self.cur_props = { 'site' : self.site } d = self.cur_props = { 'site' : self.site }
d['sourceId'] = int(attrs['Id']) d['sourceId'] = int(attrs['Id'])
d['reputation'] = int(attrs.get('Reputation', 0)) d['reputation'] = int(attrs.get('Reputation', 0))
d['creationDate'] = datetime.strptime(attrs.get('CreationDate'), ISO_DATE_FORMAT) d['creationDate'] = attrs.get('CreationDate')
d['displayName'] = attrs.get('DisplayName', '') d['displayName'] = attrs.get('DisplayName', '')
d['emailHash'] = attrs.get('EmailHash', '') d['emailHash'] = attrs.get('EmailHash', '')
d['lastAccessDate'] = datetime.strptime(attrs.get('LastAccessDate'), ISO_DATE_FORMAT) d['lastAccessDate'] = attrs.get('LastAccessDate')
d['websiteUrl'] = attrs.get('WebsiteUrl', '') d['websiteUrl'] = attrs.get('WebsiteUrl', '')
d['location'] = attrs.get('Location', '') d['location'] = attrs.get('Location', '')
d['age'] = int(attrs.get('Age', 0)) d['age'] = int(attrs.get('Age', 0))
@ -342,8 +344,9 @@ class PostContentHandler(xml.sax.ContentHandler):
if self.row_count % 1000 == 0: if self.row_count % 1000 == 0:
print('%-10s Processed %d rows.' % ('[post]', self.row_count)) print('%-10s Processed %d rows.' % ('[post]', self.row_count))
# only check for finished questions every 1000 rows to speed things up # only check for finished questions every 10000 rows to speed things up
if self.row_count % 1000 == 0: if self.row_count % 10000 == 0:
print('Committing completed questions...')
self.commit_finished_questions() self.commit_finished_questions()
def commit_finished_questions(self): def commit_finished_questions(self):
@ -400,7 +403,7 @@ class PostContentHandler(xml.sax.ContentHandler):
post_ids.add(a['id']) post_ids.add(a['id'])
# get the comments # get the comments
comment_objs = Comment.select(AND(Comment.q.site == self.site, comment_objs = Comment.select(AND(Comment.q.siteId == self.site.id,
IN(Comment.q.postId, list(post_ids)))) IN(Comment.q.postId, list(post_ids))))
# sort the comments out into a dict keyed on the post id # sort the comments out into a dict keyed on the post id
@ -514,7 +517,10 @@ class PostContentHandler(xml.sax.ContentHandler):
def commit_questions(self, questions, commit=True): def commit_questions(self, questions, commit=True):
""" """
Commits the given list of questions to solr. Adds the given list of questions to solr.
By default, they are committed immediately. Set the ``commit`` argument
to False to disable this behaviour.
""" """
self.solr.add(questions, commit=commit) self.solr.add(questions, commit=commit)
@ -551,6 +557,25 @@ class PostContentHandler(xml.sax.ContentHandler):
for question_id, answers in self.orphan_answers.items(): for question_id, answers in self.orphan_answers.items():
print('There are %d answers for missing question [ID# %d]. Ignoring orphan answers.' % (len(answers), question_id)) print('There are %d answers for missing question [ID# %d]. Ignoring orphan answers.' % (len(answers), question_id))
# TEMP COMMENT DATABASE DEFINITION
comment_db_sqlhub = dbconnection.ConnectionHub()
class Comment(SQLObject):
sourceId = IntCol()
siteId = IntCol()
postId = IntCol()
score = IntCol()
text = UnicodeCol()
creationDate = DateTimeCol(datetimeFormat=ISO_DATE_FORMAT)
userId = IntCol()
siteId_postId_index = DatabaseIndex(siteId, postId)
_connection = comment_db_sqlhub
json_fields = [ 'id', 'score', 'text', 'creationDate', 'userId' ]
# METHODS # METHODS
def get_file_path(dir_path, filename): def get_file_path(dir_path, filename):
""" """
@ -593,14 +618,14 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
sys.exit(1) sys.exit(1)
# connect to the database # connect to the database
print('Connecting to the database...') print('Connecting to the Stackdump database...')
conn_str = settings.DATABASE_CONN_STR conn_str = settings.DATABASE_CONN_STR
sqlhub.processConnection = connectionForURI(conn_str) sqlhub.processConnection = connectionForURI(conn_str)
print('Connected.\n') print('Connected.\n')
# connect to solr # connect to solr
print('Connecting to solr...') print('Connecting to solr...')
solr = Solr(settings.SOLR_URL) solr = Solr(settings.SOLR_URL, assume_clean=True)
# pysolr doesn't try to connect until a request is made, so we'll make a ping request # pysolr doesn't try to connect until a request is made, so we'll make a ping request
try: try:
solr._send_request('GET', 'admin/ping') solr._send_request('GET', 'admin/ping')
@ -614,7 +639,6 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
print("Creating tables if they don't exist...") print("Creating tables if they don't exist...")
Site.createTable(ifNotExists=True) Site.createTable(ifNotExists=True)
Badge.createTable(ifNotExists=True) Badge.createTable(ifNotExists=True)
Comment.createTable(ifNotExists=True)
User.createTable(ifNotExists=True) User.createTable(ifNotExists=True)
print('Created.\n') print('Created.\n')
@ -742,8 +766,6 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
sqlhub.threadConnection = sqlhub.processConnection.transaction() sqlhub.threadConnection = sqlhub.processConnection.transaction()
conn = sqlhub.threadConnection conn = sqlhub.threadConnection
# these deletions are done in this order to avoid FK constraint issues # these deletions are done in this order to avoid FK constraint issues
print('\tDeleting comments...')
conn.query(conn.sqlrepr(Delete(Comment.sqlmeta.table, where=(Comment.q.site==site))))
print('\tDeleting badges...') print('\tDeleting badges...')
conn.query(conn.sqlrepr(Delete(Badge.sqlmeta.table, where=(Badge.q.site==site)))) conn.query(conn.sqlrepr(Delete(Badge.sqlmeta.table, where=(Badge.q.site==site))))
print('\tDeleting users...') print('\tDeleting users...')
@ -758,11 +780,26 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
solr.commit(expungeDeletes=True) solr.commit(expungeDeletes=True)
print('Deleted.\n') print('Deleted.\n')
# create the temporary comments database
print('Connecting to the temporary comments database...')
temp_db_file, temp_db_path = tempfile.mkstemp('.sqlite', 'temp_comment_db-' + re.sub(r'[^\w]', '_', site_key) + '-', settings.TEMP_COMMENTS_DATABASE_DIR)
os.close(temp_db_file)
conn_str = 'sqlite:///' + temp_db_path
comment_db_sqlhub.processConnection = connectionForURI(conn_str)
print('Connected.')
Comment.createTable()
print('Schema created.')
comment_db_sqlhub.processConnection.getConnection().execute('PRAGMA synchronous = OFF')
comment_db_sqlhub.processConnection.getConnection().execute('PRAGMA journal_mode = MEMORY')
print('Pragma configured.\n')
timing_start = time.time() timing_start = time.time()
# start a new transaction # start a new transaction
sqlhub.threadConnection = sqlhub.processConnection.transaction() sqlhub.threadConnection = sqlhub.processConnection.transaction()
conn = sqlhub.threadConnection conn = sqlhub.threadConnection
comment_db_sqlhub.threadConnection = comment_db_sqlhub.processConnection.transaction()
temp_db_conn = comment_db_sqlhub.threadConnection
# create a new Site # create a new Site
site = Site(name=site_name, desc=site_desc, key=site_key, dump_date=dump_date, site = Site(name=site_name, desc=site_desc, key=site_key, dump_date=dump_date,
@ -785,7 +822,7 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
print('[comment] PARSING COMMENTS...') print('[comment] PARSING COMMENTS...')
xml_path = get_file_path(xml_root, 'comments.xml') xml_path = get_file_path(xml_root, 'comments.xml')
print('[comment] start parsing comments.xml...') print('[comment] start parsing comments.xml...')
handler = CommentContentHandler(conn, site) handler = CommentContentHandler(temp_db_conn, site)
xml.sax.parse(xml_path, handler) xml.sax.parse(xml_path, handler)
print('%-10s Processed %d rows.' % ('[comment]', handler.row_count)) print('%-10s Processed %d rows.' % ('[comment]', handler.row_count))
print('[comment] FINISHED PARSING COMMENTS.\n') print('[comment] FINISHED PARSING COMMENTS.\n')
@ -812,8 +849,10 @@ def import_site(xml_root, site_name, dump_date, site_desc, site_key,
print('[post] FINISHED PARSING POSTS.\n') print('[post] FINISHED PARSING POSTS.\n')
# DELETE COMMENTS # DELETE COMMENTS
print('[comment] DELETING COMMENTS FROM DATABASE (they are no longer needed)...') print('[comment] DELETING TEMPORARY COMMENTS DATABASE (they are no longer needed)...')
conn.query(conn.sqlrepr(Delete(Comment.sqlmeta.table, where=(Comment.q.site == site)))) temp_db_conn.commit(close=True)
comment_db_sqlhub.processConnection.close()
os.remove(temp_db_path)
print('[comment] FINISHED DELETING COMMENTS.\n') print('[comment] FINISHED DELETING COMMENTS.\n')
# commit transaction # commit transaction

@ -19,6 +19,7 @@ SOLR_URL = 'http://localhost:8983/solr/stackdump/'
import os import os
DATABASE_CONN_STR = 'sqlite:///' + os.path.join(os.path.dirname(__file__), '..', '..', '..', 'data', 'stackdump.sqlite') DATABASE_CONN_STR = 'sqlite:///' + os.path.join(os.path.dirname(__file__), '..', '..', '..', 'data', 'stackdump.sqlite')
TEMP_COMMENTS_DATABASE_DIR = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'data')
# if the website is hosted under a subpath, specify it here. It must end with a # if the website is hosted under a subpath, specify it here. It must end with a
# slash. # slash.

@ -5,6 +5,10 @@
from sqlobject import SQLObject, UnicodeCol, DateTimeCol, IntCol, ForeignKey, \ from sqlobject import SQLObject, UnicodeCol, DateTimeCol, IntCol, ForeignKey, \
DatabaseIndex DatabaseIndex
ISO_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
class Site(SQLObject): class Site(SQLObject):
name = UnicodeCol() name = UnicodeCol()
desc = UnicodeCol() desc = UnicodeCol()
@ -15,34 +19,23 @@ class Site(SQLObject):
siteKey_index = DatabaseIndex(key, unique=True) siteKey_index = DatabaseIndex(key, unique=True)
class Badge(SQLObject): class Badge(SQLObject):
sourceId = IntCol() sourceId = IntCol()
site = ForeignKey('Site', cascade=True) site = ForeignKey('Site', cascade=True)
userId = IntCol() userId = IntCol()
name = UnicodeCol() name = UnicodeCol()
date = DateTimeCol() date = DateTimeCol(datetimeFormat=ISO_DATE_FORMAT)
class Comment(SQLObject):
sourceId = IntCol()
site = ForeignKey('Site', cascade=True)
postId = IntCol()
score = IntCol()
text = UnicodeCol()
creationDate = DateTimeCol()
userId = IntCol()
siteId_postId_index = DatabaseIndex(site, postId)
json_fields = [ 'id', 'score', 'text', 'creationDate', 'userId' ]
class User(SQLObject): class User(SQLObject):
sourceId = IntCol() sourceId = IntCol()
site = ForeignKey('Site', cascade=True) site = ForeignKey('Site', cascade=True)
reputation = IntCol() reputation = IntCol()
creationDate = DateTimeCol() creationDate = DateTimeCol(datetimeFormat=ISO_DATE_FORMAT)
displayName = UnicodeCol() displayName = UnicodeCol()
emailHash = UnicodeCol() emailHash = UnicodeCol()
lastAccessDate = DateTimeCol() lastAccessDate = DateTimeCol(datetimeFormat=ISO_DATE_FORMAT)
websiteUrl = UnicodeCol() websiteUrl = UnicodeCol()
location = UnicodeCol() location = UnicodeCol()
age = IntCol() age = IntCol()