mirror of
https://github.com/djohnlewis/stackdump
synced 2025-01-22 14:41:39 +00:00
Renamed dataproc management commands to better names.
This commit is contained in:
parent
f075580a2e
commit
adccd41724
@ -1,47 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# This script deletes the site specified by the ID in the first parameter.
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from sqlobject import sqlhub, connectionForURI, AND, OR
|
||||
from pysolr import Solr
|
||||
|
||||
from stackdump.models import Site
|
||||
|
||||
script_dir = os.path.dirname(sys.argv[0])
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print 'The site ID needs to be specified as the first parameter.'
|
||||
sys.exit(1)
|
||||
|
||||
# connect to the data sources
|
||||
db_path = os.path.abspath(os.path.join(script_dir, '../../../../data/stackdump.sqlite'))
|
||||
|
||||
# connect to the database
|
||||
print('Connecting to the database...')
|
||||
conn_str = 'sqlite://' + db_path
|
||||
sqlhub.processConnection = connectionForURI(conn_str)
|
||||
print('Connected.\n')
|
||||
|
||||
# connect to solr
|
||||
print('Connecting to solr...')
|
||||
solr = Solr("http://localhost:8983/solr/")
|
||||
print('Connected.\n')
|
||||
|
||||
site_id = int(sys.argv[1])
|
||||
site = Site.select(Site.q.id==site_id).getOne(None)
|
||||
if not site:
|
||||
print 'Site ID %d does not exist.' % site_id
|
||||
sys.exit(1)
|
||||
|
||||
site_name = site.name
|
||||
print('Deleting site "%s" from the database... ' % site.name)
|
||||
sys.stdout.flush()
|
||||
Site.delete(site.id) # the relationship cascades, so other rows will be deleted
|
||||
print('Deleted.\n')
|
||||
|
||||
print('Deleting site "%s" from solr... ' % site_name)
|
||||
solr.delete(q='siteName:"%s"' % site_name)
|
||||
print('Deleted.\n')
|
@ -6,13 +6,16 @@ from __future__ import with_statement
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import xml.sax
|
||||
from datetime import datetime
|
||||
import re
|
||||
from optparse import OptionParser
|
||||
from xml.etree import ElementTree
|
||||
|
||||
from sqlobject import sqlhub, connectionForURI, AND, OR
|
||||
from sqlobject import sqlhub, connectionForURI, AND, OR, IN, SQLObject
|
||||
from sqlobject.sqlbuilder import Delete, Insert
|
||||
from sqlobject.styles import DefaultStyle
|
||||
from pysolr import Solr
|
||||
|
||||
from stackdump.models import Site, Badge, Comment, User
|
||||
@ -38,6 +41,7 @@ class BaseContentHandler(xml.sax.ContentHandler):
|
||||
self.obj_class = obj_class
|
||||
self.cur_props = None
|
||||
self.row_count = 0
|
||||
self.db_style = DefaultStyle()
|
||||
|
||||
def endElement(self, name):
|
||||
if name != 'row':
|
||||
@ -53,7 +57,21 @@ class BaseContentHandler(xml.sax.ContentHandler):
|
||||
# the cur_props is now complete. Save it.
|
||||
try:
|
||||
# the object is automatically saved to the database on creation
|
||||
self.obj_class(**self.cur_props)
|
||||
# adding records using the SQLObject object takes too long
|
||||
#self.obj_class(**self.cur_props)
|
||||
|
||||
# so we're going to go closer to the metal
|
||||
props_for_db = { }
|
||||
for k,v in self.cur_props.items():
|
||||
# if this is a reference to a FK, massage the values to fit
|
||||
if isinstance(v, SQLObject):
|
||||
k += 'Id'
|
||||
v = v.id
|
||||
# need to convert the attr names to DB column names
|
||||
props_for_db[self.db_style.pythonAttrToDBColumn(k)] = v
|
||||
|
||||
conn.query(conn.sqlrepr(Insert(self.obj_class.sqlmeta.table, values=props_for_db)))
|
||||
|
||||
except Exception, e:
|
||||
# could not insert this, so ignore the row
|
||||
print('Exception: ' + str(e))
|
||||
@ -64,7 +82,9 @@ class BaseContentHandler(xml.sax.ContentHandler):
|
||||
self.cur_props = None
|
||||
|
||||
if self.row_count % 1000 == 0:
|
||||
print('[badge]\t\tProcessed %d rows.' % (self.row_count))
|
||||
print('%-10s Processed %d rows.' % ('[%s]' % self.obj_class.sqlmeta.table,
|
||||
self.row_count)
|
||||
)
|
||||
|
||||
class BadgeContentHandler(BaseContentHandler):
|
||||
"""
|
||||
@ -206,6 +226,7 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
def __init__(self, site):
|
||||
self.site = site
|
||||
self.unfinished_questions = { }
|
||||
self.orphan_answers = { }
|
||||
self.cur_props = None
|
||||
self.row_count = 0
|
||||
|
||||
@ -233,7 +254,7 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
d['answerCount'] = int(attrs.get('AnswerCount', 0))
|
||||
d['viewCount'] = int(attrs.get('ViewCount', 0))
|
||||
else:
|
||||
raise ValueError('Unknown PostTypeId [%s] for row ID [%s]' % (attrs.get('PostTypeId', -1), attrs.get('Id', -1)))
|
||||
raise ValueError('Unknown PostTypeId [%s] for row ID [%s]. Probably a tag wiki page.' % (attrs.get('PostTypeId', -1), attrs.get('Id', -1)))
|
||||
|
||||
if 'AcceptedAnswerId' in attrs:
|
||||
d['acceptedAnswerId'] = int(attrs.get('AcceptedAnswerId', 0))
|
||||
@ -281,24 +302,13 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
try:
|
||||
d = self.cur_props
|
||||
|
||||
# find, convert to JSON and attach any comments for this question
|
||||
comments = Comment.select(AND(Comment.q.site == self.site,
|
||||
Comment.q.postId == int(d['id'])))
|
||||
for comment in comments:
|
||||
c = { }
|
||||
for f in Comment.json_fields:
|
||||
c[f] = getattr(comment, f)
|
||||
d['comments'].append(c)
|
||||
|
||||
if len(d['comments']) != d['commentCount']:
|
||||
print('Post ID [%s] expected to have %d comments, but got %d instead. Ignoring inconsistency.' % (d['id'], d['commentCount'], len(d['comments'])))
|
||||
|
||||
# the cur_props is now complete. Stash it away until question is complete.
|
||||
if d.has_key('parentId'):
|
||||
# this is an answer.
|
||||
if not self.unfinished_questions.has_key(d['parentId']):
|
||||
print('lookup keys: ' + repr(self.unfinished_questions.keys()))
|
||||
raise ValueError("This answer's [ID# %s] question [ID# %s] has not been processed yet. Incorrect order in XML? Ignoring answer." % (d['id'], d['parentId']))
|
||||
if not self.orphan_answers.has_key(d['parentId']):
|
||||
self.orphan_answers[d['parentId']] = [ ]
|
||||
self.orphan_answers[d['parentId']].append(d)
|
||||
else:
|
||||
self.unfinished_questions[d['parentId']]['answers'].append(d)
|
||||
else:
|
||||
@ -309,6 +319,11 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
(d['id'], d['title'], self.unfinished_questions[d['id']]['title']))
|
||||
else:
|
||||
self.unfinished_questions[d['id']] = d
|
||||
# check if any of the orphan answers are for this question
|
||||
if self.orphan_answers.has_key(d['id']):
|
||||
d['answers'].extend(self.orphan_answers[d['id']])
|
||||
# remove orphan answers from the orphan list
|
||||
del self.orphan_answers[d['id']]
|
||||
|
||||
except Exception, e:
|
||||
# could not insert this, so ignore the row
|
||||
@ -320,9 +335,11 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
self.cur_props = None
|
||||
|
||||
if self.row_count % 1000 == 0:
|
||||
print('\tProcessed %d rows.' % (self.row_count))
|
||||
print('%-10s Processed %d rows.' % ('[post]', self.row_count))
|
||||
|
||||
self.commit_finished_questions()
|
||||
# only check for finished questions every 200 rows to speed things up
|
||||
if self.row_count % 200 == 0:
|
||||
self.commit_finished_questions()
|
||||
|
||||
def commit_finished_questions(self):
|
||||
# check if any questions are now complete (answerCount=len(answers))
|
||||
@ -356,6 +373,7 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
the search index in the form that we want.
|
||||
|
||||
Things this does -
|
||||
* fetch comments for question and answers and attach them to the objects
|
||||
* creates the 'text' field for the search index that contains all the
|
||||
text of the question (title, question, answers and all comments).
|
||||
* serialises answers to JSON
|
||||
@ -365,6 +383,46 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
* add question JSON to document
|
||||
* commit document to search index.
|
||||
"""
|
||||
# find and attach any comments for this question and its answers
|
||||
# get the set of post ids
|
||||
post_ids = set()
|
||||
post_ids.add(q['id'])
|
||||
for a in q['answers']:
|
||||
post_ids.add(a['id'])
|
||||
|
||||
# get the comments
|
||||
comment_objs = Comment.select(AND(Comment.q.site == self.site,
|
||||
IN(Comment.q.postId, list(post_ids))))
|
||||
|
||||
# sort the comments out into a dict keyed on the post id
|
||||
comments = { }
|
||||
for c in comment_objs:
|
||||
# convert comment object to a JSON-serialisable object
|
||||
comment_json = { }
|
||||
for f in Comment.json_fields:
|
||||
comment_json[f] = getattr(c, f)
|
||||
|
||||
# we already know that this comment comes from the current site, so
|
||||
# we only need to filter on post ID
|
||||
if not comments.has_key(c.postId):
|
||||
comments[c.postId] = [ ]
|
||||
comments[c.postId].append(comment_json)
|
||||
|
||||
# add comments to the question
|
||||
if comments.has_key(q['id']):
|
||||
q['comments'].extend(comments[q['id']])
|
||||
|
||||
if len(q['comments']) != q['commentCount']:
|
||||
print('Post ID [%s] expected to have %d comments, but got %d instead. Ignoring inconsistency.' % (q['id'], q['commentCount'], len(q['comments'])))
|
||||
|
||||
# add comments to the answers
|
||||
for a in q['answers']:
|
||||
if comments.has_key(a['id']):
|
||||
a['comments'].extend(comments[a['id']])
|
||||
|
||||
if len(a['comments']) != a['commentCount']:
|
||||
print('Post ID [%s] expected to have %d comments, but got %d instead. Ignoring inconsistency.' % (a['id'], a['commentCount'], len(a['comments'])))
|
||||
|
||||
doc = { }
|
||||
|
||||
# create the text field contents
|
||||
@ -442,6 +500,8 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
|
||||
Should be called after all XML has been parsed.
|
||||
"""
|
||||
self.commit_finished_questions()
|
||||
|
||||
for id,q in self.unfinished_questions.items():
|
||||
print('Question [ID# %d] was expected to have %d answers, but got %d instead. Ignoring inconsistency.' % (q['id'], q['answerCount'], len(q['answers'])))
|
||||
|
||||
@ -458,6 +518,10 @@ class PostContentHandler(xml.sax.ContentHandler):
|
||||
|
||||
# we're committing all questions, so nothing is now unfinished
|
||||
self.unfinished_questions.clear()
|
||||
|
||||
# check if there are any orphan answers
|
||||
for question_id, answers in self.orphan_answers.items():
|
||||
print('There are %d answers for missing question [ID# %d]. Ignoring orphan answers.' % (len(answers), question_id))
|
||||
|
||||
# MAIN METHOD
|
||||
parser = OptionParser(usage='usage: %prog [options] xml_root_dir')
|
||||
@ -469,7 +533,7 @@ parser.add_option('-c', '--dump-date', help='Dump date of the site.')
|
||||
(cmd_options, cmd_args) = parser.parse_args()
|
||||
|
||||
if len(cmd_args) < 1:
|
||||
print('The path to the extracted XML files is required.')
|
||||
print('The path to the directory containing the extracted XML files is required.')
|
||||
sys.exit(1)
|
||||
|
||||
xml_root = cmd_args[0]
|
||||
@ -557,65 +621,92 @@ if not (site_name and site_key and site_desc and dump_date):
|
||||
sys.exit(1)
|
||||
|
||||
# check if site is already in database; if so, purge the data.
|
||||
sites = Site.select(Site.q.name==site_name)
|
||||
# the site really shouldn't exist more than once, but just in case
|
||||
for site in sites:
|
||||
site = list(Site.select(Site.q.key==site_key))
|
||||
if len(site) > 0:
|
||||
site = site[0]
|
||||
print('Deleting site "%s" from the database... ' % site.name)
|
||||
sys.stdout.flush()
|
||||
Site.delete(site.id) # the relationship cascades, so other rows will be deleted
|
||||
# Using SQLObject to delete rows takes too long, so we're going to do it directly
|
||||
#Site.delete(site.id) # the relationship cascades, so other rows will be deleted
|
||||
sqlhub.threadConnection = sqlhub.processConnection.transaction()
|
||||
conn = sqlhub.threadConnection
|
||||
# these deletions are done in this order to avoid FK constraint issues
|
||||
print('\tDeleting comments...')
|
||||
conn.query(conn.sqlrepr(Delete(Comment.sqlmeta.table, where=(Comment.q.site==site))))
|
||||
print('\tDeleting badges...')
|
||||
conn.query(conn.sqlrepr(Delete(Badge.sqlmeta.table, where=(Badge.q.site==site))))
|
||||
print('\tDeleting users...')
|
||||
conn.query(conn.sqlrepr(Delete(User.sqlmeta.table, where=(User.q.site==site))))
|
||||
print('\tDeleting site...')
|
||||
conn.query(conn.sqlrepr(Delete(Site.sqlmeta.table, where=(Site.q.id==site.id))))
|
||||
sqlhub.threadConnection.commit(close=True)
|
||||
print('Deleted.\n')
|
||||
|
||||
print('Deleting site "%s" from the solr... ' % site.name)
|
||||
solr.delete(q='siteKey:"%s"' % site.key)
|
||||
print('Deleted.\n')
|
||||
|
||||
timing_start = time.time()
|
||||
|
||||
# start a new transaction
|
||||
sqlhub.threadConnection = sqlhub.processConnection.transaction()
|
||||
conn = sqlhub.threadConnection
|
||||
|
||||
# create a new Site
|
||||
site = Site(name=site_name, desc=site_desc, key=site_key, dump_date=dump_date, import_date=datetime.now())
|
||||
|
||||
# BADGES
|
||||
print('[badge] PARSING BADGES...')
|
||||
sqlhub.threadConnection = sqlhub.processConnection.transaction()
|
||||
xml_path = os.path.join(xml_root, 'badges.xml')
|
||||
print('[badge] start parsing badges.xml...')
|
||||
handler = BadgeContentHandler(site)
|
||||
xml.sax.parse(xml_path, handler)
|
||||
sqlhub.threadConnection.commit(close=True)
|
||||
print('[badge]\t\tProcessed %d rows.' % (handler.row_count))
|
||||
print('[badge] FINISHED PARSING BADGES.\n')
|
||||
# Processing of badges has been disabled because they don't offer any useful
|
||||
# information in the offline situation.
|
||||
#print('[badge] PARSING BADGES...')
|
||||
#xml_path = os.path.join(xml_root, 'badges.xml')
|
||||
#print('[badge] start parsing badges.xml...')
|
||||
#handler = BadgeContentHandler(site)
|
||||
#xml.sax.parse(xml_path, handler)
|
||||
#print('[badge]\tProcessed %d rows.' % (handler.row_count))
|
||||
#print('[badge] FINISHED PARSING BADGES.\n')
|
||||
|
||||
# COMMENTS
|
||||
# comments are temporarily stored in the database for retrieval when parsing
|
||||
# posts only.
|
||||
print('[comment] PARSING COMMENTS...')
|
||||
sqlhub.threadConnection = sqlhub.processConnection.transaction()
|
||||
xml_path = os.path.join(xml_root, 'comments.xml')
|
||||
print('[comment] start parsing comments.xml...')
|
||||
handler = CommentContentHandler(site)
|
||||
xml.sax.parse(xml_path, handler)
|
||||
sqlhub.threadConnection.commit(close=True)
|
||||
print('[comment]\tProcessed %d rows.' % (handler.row_count))
|
||||
print('%-10s Processed %d rows.' % ('[comment]', handler.row_count))
|
||||
print('[comment] FINISHED PARSING COMMENTS.\n')
|
||||
|
||||
# USERS
|
||||
print('[user] PARSING USERS...')
|
||||
sqlhub.threadConnection = sqlhub.processConnection.transaction()
|
||||
xml_path = os.path.join(xml_root, 'users.xml')
|
||||
print('[user] start parsing users.xml...')
|
||||
handler = UserContentHandler(site)
|
||||
xml.sax.parse(xml_path, handler)
|
||||
sqlhub.threadConnection.commit(close=True)
|
||||
print('[user]\t\tProcessed %d rows.' % (handler.row_count))
|
||||
print('%-10s Processed %d rows.' % ('[user]', handler.row_count))
|
||||
print('[user] FINISHED PARSING USERS.\n')
|
||||
|
||||
# POSTS
|
||||
# posts are added directly to the Solr index; they are not added to the database.
|
||||
print('[post] PARSING POSTS...')
|
||||
sqlhub.threadConnection = sqlhub.processConnection.transaction()
|
||||
xml_path = os.path.join(xml_root, 'posts.xml')
|
||||
print('[post] start parsing posts.xml...')
|
||||
handler = PostContentHandler(site)
|
||||
xml.sax.parse(xml_path, handler)
|
||||
handler.commit_all_questions()
|
||||
sqlhub.threadConnection.commit(close=True)
|
||||
print('[post]\tProcessed %d rows.' % (handler.row_count))
|
||||
print('%-10s Processed %d rows.' % ('[post]', handler.row_count))
|
||||
|
||||
print('[post] FINISHED PARSING POSTS.\n')
|
||||
|
||||
# TODO: delete comments?
|
||||
# DELETE COMMENTS
|
||||
print('[comment] DELETING COMMENTS FROM DATABASE (they are no longer needed)...')
|
||||
conn.query(conn.sqlrepr(Delete(Comment.sqlmeta.table, where=(Comment.q.site == site))))
|
||||
print('[comment] FINISHED DELETING COMMENTS.\n')
|
||||
|
||||
# commit transaction
|
||||
sqlhub.threadConnection.commit(close=True)
|
||||
|
||||
timing_end = time.time()
|
||||
|
||||
print('Time taken for site insertion into Stackdump: %f seconds.' % (timing_end - timing_start))
|
||||
print('')
|
85
python/src/stackdump/dataproc/manage_sites.py
Normal file
85
python/src/stackdump/dataproc/manage_sites.py
Normal file
@ -0,0 +1,85 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
##
|
||||
# This script provides the ability to manage sites in Stackdump.
|
||||
##
|
||||
|
||||
import os
|
||||
import sys
|
||||
from optparse import OptionParser
|
||||
|
||||
from sqlobject import sqlhub, connectionForURI
|
||||
from pysolr import Solr
|
||||
|
||||
from stackdump.models import Site
|
||||
|
||||
script_dir = os.path.dirname(sys.argv[0])
|
||||
|
||||
# FUNCTIONS
|
||||
def list_sites():
|
||||
# connect to the data sources
|
||||
db_path = os.path.abspath(os.path.join(script_dir, '../../../../data/stackdump.sqlite'))
|
||||
|
||||
# connect to the database
|
||||
print('Connecting to the database...')
|
||||
conn_str = 'sqlite://' + db_path
|
||||
sqlhub.processConnection = connectionForURI(conn_str)
|
||||
print('Connected.\n')
|
||||
|
||||
sites = list(Site.select()) # force the lazy method to execute
|
||||
|
||||
if len(sites) > 0:
|
||||
print('[site key] site name')
|
||||
print('-' * 80)
|
||||
for site in sites:
|
||||
print('[%s] %s' % (site.key, site.name))
|
||||
|
||||
def delete_site(site_key):
|
||||
# connect to the data sources
|
||||
db_path = os.path.abspath(os.path.join(script_dir, '../../../../data/stackdump.sqlite'))
|
||||
|
||||
# connect to the database
|
||||
print('Connecting to the database...')
|
||||
conn_str = 'sqlite://' + db_path
|
||||
sqlhub.processConnection = connectionForURI(conn_str)
|
||||
print('Connected.\n')
|
||||
|
||||
# connect to solr
|
||||
print('Connecting to solr...')
|
||||
solr = Solr("http://localhost:8983/solr/")
|
||||
print('Connected.\n')
|
||||
|
||||
site = Site.select(Site.q.key==site_key).getOne(None)
|
||||
if not site:
|
||||
print 'Site key %s does not exist.' % site_key
|
||||
sys.exit(1)
|
||||
|
||||
sqlhub.threadConnection = sqlhub.processConnection.transaction()
|
||||
|
||||
print('Deleting site "%s" from the database... ' % site.name)
|
||||
sys.stdout.flush()
|
||||
Site.delete(site.id) # the relationship cascades, so other rows will be deleted
|
||||
print('Deleted.\n')
|
||||
|
||||
print('Deleting site "%s" from solr... ' % site.name)
|
||||
solr.delete(q='siteKey:"%s"' % site_key)
|
||||
print('Deleted.\n')
|
||||
|
||||
sqlhub.threadConnection.commit(close=True)
|
||||
|
||||
# END FUNCTIONS
|
||||
|
||||
# MAIN METHOD
|
||||
if __name__ == '__main__':
|
||||
parser = OptionParser()
|
||||
parser.add_option('-l', '--list-sites', help='List sites imported into Stackdump.', action="store_true")
|
||||
parser.add_option('-d', '--delete-site', help='Delete a site from Stackdump.', metavar='SITE_KEY')
|
||||
|
||||
(cmd_options, cmd_args) = parser.parse_args()
|
||||
|
||||
if cmd_options.list_sites:
|
||||
list_sites()
|
||||
elif cmd_options.delete_site:
|
||||
delete_site(cmd_options.delete_site)
|
||||
else:
|
||||
parser.print_help()
|
Loading…
Reference in New Issue
Block a user