mirror of
https://github.com/djohnlewis/stackdump
synced 2025-01-23 15:11:36 +00:00
350 lines
11 KiB
Python
350 lines
11 KiB
Python
"""
|
|
Import from a CSV file or directory of files.
|
|
|
|
CSV files should have a header line that lists columns. Headers can
|
|
also be appended with ``:type`` to indicate the type of the field.
|
|
``escaped`` is the default, though it can be overridden by the importer.
|
|
Supported types:
|
|
|
|
``:python``:
|
|
A python expression, run through ``eval()``. This can be a
|
|
security risk, pass in ``allow_python=False`` if you don't want to
|
|
allow it.
|
|
|
|
``:int``:
|
|
Integer
|
|
|
|
``:float``:
|
|
Float
|
|
|
|
``:str``:
|
|
String
|
|
|
|
``:escaped``:
|
|
A string with backslash escapes (note that you don't put quotation
|
|
marks around the value)
|
|
|
|
``:base64``:
|
|
A base64-encoded string
|
|
|
|
``:date``:
|
|
ISO date, like YYYY-MM-DD; this can also be ``NOW+days`` or
|
|
``NOW-days``
|
|
|
|
``:datetime``:
|
|
ISO date/time like YYYY-MM-DDTHH:MM:SS (either T or a space can be
|
|
used to separate the time, and seconds are optional). This can
|
|
also be ``NOW+seconds`` or ``NOW-seconds``
|
|
|
|
``:bool``:
|
|
Converts true/false/yes/no/on/off/1/0 to boolean value
|
|
|
|
``:ref``:
|
|
This will be resolved to the ID of the object named in this column
|
|
(None if the column is empty). @@: Since there's no ordering,
|
|
there's no way to promise the object already exists.
|
|
|
|
You can also get back references to the objects if you have a special
|
|
``[name]`` column.
|
|
|
|
Any column named ``[comment]`` or with no name will be ignored.
|
|
|
|
In any column you can put ``[default]`` to exclude the value and use
|
|
whatever default the class wants. ``[null]`` will use NULL.
|
|
|
|
Lines that begin with ``[comment]`` are ignored.
|
|
"""
|
|
|
|
from datetime import datetime, date, timedelta
|
|
import os
|
|
import csv
|
|
import types
|
|
|
|
__all__ = ['load_csv_from_directory',
|
|
'load_csv',
|
|
'create_data']
|
|
|
|
DEFAULT_TYPE = 'escaped'
|
|
|
|
def create_data(data, class_getter, keyorder=None):
|
|
"""
|
|
Create the ``data``, which is the return value from
|
|
``load_csv()``. Classes will be resolved with the callable
|
|
``class_getter``; or if ``class_getter`` is a module then the
|
|
class names will be attributes of that.
|
|
|
|
Returns a dictionary of ``{object_name: object(s)}``, using the
|
|
names from the ``[name]`` columns (if there are any). If a name
|
|
is used multiple times, you get a list of objects, not a single
|
|
object.
|
|
|
|
If ``keyorder`` is given, then the keys will be retrieved in that
|
|
order. It can be a list/tuple of names, or a sorting function.
|
|
If not given and ``class_getter`` is a module and has a
|
|
``soClasses`` function, then that will be used for the order.
|
|
"""
|
|
objects = {}
|
|
classnames = data.keys()
|
|
if (not keyorder and isinstance(class_getter, types.ModuleType)
|
|
and hasattr(class_getter, 'soClasses')):
|
|
keyorder = [c.__name__ for c in class_getter.soClasses]
|
|
if not keyorder:
|
|
classnames.sort()
|
|
elif isinstance(keyorder, (list, tuple)):
|
|
all = classnames
|
|
classnames = [name for name in keyorder if name in classnames]
|
|
for name in all:
|
|
if name not in classnames:
|
|
classnames.append(name)
|
|
else:
|
|
classnames.sort(keyorder)
|
|
for classname in classnames:
|
|
items = data[classname]
|
|
if not items:
|
|
continue
|
|
if isinstance(class_getter, types.ModuleType):
|
|
soClass = getattr(class_getter, classname)
|
|
else:
|
|
soClass = class_getter(classname)
|
|
for item in items:
|
|
for key, value in item.items():
|
|
if isinstance(value, Reference):
|
|
resolved = objects.get(value.name)
|
|
if not resolved:
|
|
raise ValueError(
|
|
"Object reference to %r does not have target"
|
|
% value.name)
|
|
elif (isinstance(resolved, list)
|
|
and len(resolved) > 1):
|
|
raise ValueError(
|
|
"Object reference to %r is ambiguous (got %r)"
|
|
% (value.name, resolved))
|
|
item[key] = resolved.id
|
|
if '[name]' in item:
|
|
name = item.pop('[name]').strip()
|
|
else:
|
|
name = None
|
|
inst = soClass(**item)
|
|
if name:
|
|
if name in objects:
|
|
if isinstance(objects[name], list):
|
|
objects[name].append(inst)
|
|
else:
|
|
objects[name] = [objects[name], inst]
|
|
else:
|
|
objects[name] = inst
|
|
return objects
|
|
|
|
|
|
def load_csv_from_directory(directory,
|
|
allow_python=True, default_type=DEFAULT_TYPE,
|
|
allow_multiple_classes=True):
|
|
"""
|
|
Load the data from all the files in a directory. Filenames
|
|
indicate the class, with ``general.csv`` for data not associated
|
|
with a class. Return data just like ``load_csv`` does.
|
|
|
|
This might cause problems on case-insensitive filesystems.
|
|
"""
|
|
results = {}
|
|
for filename in os.listdir(directory):
|
|
base, ext = os.path.splitext(filename)
|
|
if ext.lower() != '.csv':
|
|
continue
|
|
f = open(os.path.join(directory, filename), 'rb')
|
|
csvreader = csv.reader(f)
|
|
data = load_csv(csvreader, allow_python=allow_python,
|
|
default_type=default_type,
|
|
default_class=base,
|
|
allow_multiple_classes=allow_multiple_classes)
|
|
f.close()
|
|
for classname, items in data.items():
|
|
results.setdefault(classname, []).extend(items)
|
|
return results
|
|
|
|
def load_csv(csvreader, allow_python=True, default_type=DEFAULT_TYPE,
|
|
default_class=None, allow_multiple_classes=True):
|
|
"""
|
|
Loads the CSV file, returning a list of dictionaries with types
|
|
coerced.
|
|
"""
|
|
current_class = default_class
|
|
current_headers = None
|
|
results = {}
|
|
|
|
for row in csvreader:
|
|
if not [cell for cell in row if cell.strip()]:
|
|
# empty row
|
|
continue
|
|
|
|
if row and row[0].strip() == 'CLASS:':
|
|
if not allow_multiple_classes:
|
|
raise ValueError(
|
|
"CLASS: line in CSV file, but multiple classes are not allowed in this file (line: %r)"
|
|
% row)
|
|
if not row[1:]:
|
|
raise ValueError(
|
|
"CLASS: in line in CSV file, with no class name in next column (line: %r)"
|
|
% row)
|
|
current_class = row[1]
|
|
current_headers = None
|
|
continue
|
|
|
|
if not current_class:
|
|
raise ValueError(
|
|
"No CLASS: line given, and there is no default class for this file (line: %r"
|
|
% row)
|
|
|
|
if current_headers is None:
|
|
current_headers = _parse_headers(row, default_type)
|
|
continue
|
|
|
|
if row[0] == '[comment]':
|
|
continue
|
|
|
|
# Pad row with empty strings:
|
|
row += ['']*(len(current_headers) - len(row))
|
|
row_converted = {}
|
|
for value, (name, coercer, args) in zip(row, current_headers):
|
|
if name is None:
|
|
# Comment
|
|
continue
|
|
if value == '[default]':
|
|
continue
|
|
if value == '[null]':
|
|
row_converted[name] = None
|
|
continue
|
|
args = (value,) + args
|
|
row_converted[name] = coercer(*args)
|
|
|
|
results.setdefault(current_class, []).append(row_converted)
|
|
|
|
return results
|
|
|
|
def _parse_headers(header_row, default_type):
|
|
headers = []
|
|
for name in header_row:
|
|
original_name = name
|
|
if ':' in name:
|
|
name, type = name.split(':', 1)
|
|
else:
|
|
type = default_type
|
|
if type == 'python' and not allow_python:
|
|
raise ValueError(
|
|
":python header given when python headers are not allowed (with header %r"
|
|
% original_name)
|
|
name = name.strip()
|
|
if name == '[comment]' or not name:
|
|
headers.append((None, None, None))
|
|
continue
|
|
type = type.strip().lower()
|
|
if '(' in type:
|
|
type, arg = type.split('(', 1)
|
|
if not arg.endswith(')'):
|
|
raise ValueError(
|
|
"Arguments (in ()'s) do not end with ): %r"
|
|
% original_name)
|
|
args = (arg[:-1],)
|
|
else:
|
|
args = ()
|
|
if name == '[name]':
|
|
type = 'str'
|
|
coercer, args = get_coercer(type)
|
|
headers.append((name, coercer, args))
|
|
return headers
|
|
|
|
_coercers = {}
|
|
def get_coercer(type):
|
|
if type not in _coercers:
|
|
raise ValueError(
|
|
"Coercion type %r not known (I know: %s)"
|
|
% (type, ', '.join(_coercers.keys())))
|
|
return _coercers[type]
|
|
|
|
def register_coercer(type, coercer, *args):
|
|
_coercers[type] = (coercer, args)
|
|
|
|
def identity(v):
|
|
return v
|
|
|
|
register_coercer('str', identity)
|
|
register_coercer('string', identity)
|
|
|
|
def decode_string(v, encoding):
|
|
return v.decode(encoding)
|
|
|
|
register_coercer('escaped', decode_string, 'string_escape')
|
|
register_coercer('strescaped', decode_string, 'string_escape')
|
|
register_coercer('base64', decode_string, 'base64')
|
|
|
|
register_coercer('int', int)
|
|
register_coercer('float', float)
|
|
|
|
def parse_python(v):
|
|
return eval(v, {}, {})
|
|
|
|
register_coercer('python', parse_python)
|
|
|
|
def parse_date(v):
|
|
v = v.strip()
|
|
if not v:
|
|
return None
|
|
if v.startswith('NOW-') or v.startswith('NOW+'):
|
|
days = int(v[3:])
|
|
now = date.today()
|
|
return now+timedelta(days)
|
|
else:
|
|
parsed = time.strptime(v, '%Y-%m-%d')
|
|
return date.fromtimestamp(time.mktime(parsed))
|
|
|
|
register_coercer('date', parse_date)
|
|
|
|
def parse_datetime(v):
|
|
v = v.strip()
|
|
if not v:
|
|
return None
|
|
if v.startswith('NOW-') or v.startswith('NOW+'):
|
|
seconds = int(v[3:])
|
|
now = datetime.now()
|
|
return now+timedelta(0, seconds)
|
|
else:
|
|
fmts = ['%Y-%m-%dT%H:%M:%S',
|
|
'%Y-%m-%d %H:%M:%S',
|
|
'%Y-%m-%dT%H:%M',
|
|
'%Y-%m-%d %H:%M']
|
|
for fmt in fmts[:-1]:
|
|
try:
|
|
parsed = time.strptime(v, fmt)
|
|
break
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
parsed = time.strptime(v, fmts[-1])
|
|
return datetime.fromtimestamp(time.mktime(parsed))
|
|
|
|
register_coercer('datetime', parse_datetime)
|
|
|
|
class Reference(object):
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
def parse_ref(v):
|
|
if not v.strip():
|
|
return None
|
|
else:
|
|
return Reference(v)
|
|
|
|
register_coercer('ref', parse_ref)
|
|
|
|
def parse_bool(v):
|
|
v = v.strip().lower()
|
|
if v in ('y', 'yes', 't', 'true', 'on', '1'):
|
|
return True
|
|
elif v in ('n', 'no', 'f', 'false', 'off', '0'):
|
|
return False
|
|
raise ValueError(
|
|
"Value is not boolean-like: %r" % value)
|
|
|
|
register_coercer('bool', parse_bool)
|
|
register_coercer('boolean', parse_bool)
|