import gzip
import shutil
import typing
import pathlib
import zipfile
import itertools
import contextlib
from xml.etree import ElementTree as et
import collections
import unicodedata
import requests
import termcolor
try:
from odf.opendocument import load as load_odf
except ImportError: # pragma: no cover
load_odf = None
try:
import xlrd
except ImportError: # pragma: no cover
xlrd = None
try:
import openpyxl
except ImportError: # pragma: no cover
openpyxl = None
import pybtex.database
from csvw import dsv
from clldutils.misc import xmlchars, slug
from clldutils.path import TemporaryDirectory
from clldutils import jsonlib
from pycldf.sources import Source
__all__ = ['get_url', 'DataDir']
ODF_NS_TABLE = 'urn:oasis:names:tc:opendocument:xmlns:table:1.0'
ODF_NS_TEXT = 'urn:oasis:names:tc:opendocument:xmlns:text:1.0'
def _real_len(seq, pred=bool):
for index in range(len(seq) - 1, -1, -1):
if pred(seq[index]):
return index + 1
else:
return 0
def _ods_value(cell):
return ' '.join(
str(e).strip()
for e in cell.childNodes
if e.qname == (ODF_NS_TEXT, 'p'))
def _ods_cells(row):
cells = [
(
_ods_value(cell),
int(
cell.attributes.get((ODF_NS_TABLE, 'number-columns-repeated'))
or '1')
)
for cell in row.childNodes
if cell.qname == (ODF_NS_TABLE, 'table-cell')]
real_len = _real_len(cells, pred=lambda pair: bool(pair[0]))
return [
cloned_cell
for cell, number in itertools.islice(cells, real_len)
for cloned_cell in itertools.repeat(cell, number)]
def _pad_list(li, length):
if len(li) >= length:
return li
else:
return [e for e in itertools.chain(li, itertools.repeat('', length - len(li)))]
def _ods_to_list(table):
rows = [
(
_ods_cells(row),
int(
row.attributes.get((ODF_NS_TABLE, 'number-rows-repeated'))
or '1')
)
for row in table.childNodes
if row.qname == (ODF_NS_TABLE, 'table-row')]
real_len = _real_len(rows, pred=lambda pair: bool(pair[0]))
max_width = max(len(row) for row, _ in rows)
rows = ((_pad_list(row, max_width), number) for row, number in rows)
return [
cloned_row
for row, number in itertools.islice(rows, real_len)
for cloned_row in itertools.repeat(row, number)]
def get_url(url: str, log=None, **kw) -> requests.Response:
res = requests.get(url, **kw)
if log:
level = log.info if res.status_code == 200 else log.warning
level('HTTP {0} for {1}'.format(
termcolor.colored(res.status_code, 'blue'), termcolor.colored(url, 'blue')))
return res
[docs]class DataDir(type(pathlib.Path())):
"""
A `pathlib.Path` augmented with functionality to read common data formats.
"""
def _path(self, fname: typing.Union[str, pathlib.Path]) -> pathlib.Path:
"""
Interpret strings without "/" as names of files in `self`.
:param fname:
:return: `pathlib.Path` instance
"""
if isinstance(fname, str) and '/' not in fname:
return self / fname
return pathlib.Path(fname)
[docs] def read(self,
fname: typing.Union[str, pathlib.Path],
aname: str = None,
normalize: str = None,
suffix: str = None,
encoding: str = 'utf8') -> str:
"""
Read text data from a file.
:param fname: Name of a file in `DataDir` or any `pathlib.Path`.
:param aname: "file in archive" name, if a file from a zip archive is to be read.
:param suffix: If `None`, suffix will be inferred from the path to be read. Otherwise \
it can be used to force reading compressed content passing `.gz` or `.zip`.
:param normalize: Any normalization form understood by `unicodedata.normalize`.
"""
p = self._path(fname)
suffix = suffix or p.suffix
if suffix == '.zip':
zip = zipfile.ZipFile(str(p))
text = zip.read(aname or zip.namelist()[0]).decode(encoding)
elif suffix == '.gz':
with gzip.open(p) as fp:
text = fp.read().decode(encoding)
else:
text = p.read_text(encoding=encoding)
if normalize:
text = unicodedata.normalize(normalize, text)
return text
[docs] def write(self, fname: typing.Union[str, pathlib.Path], text: str, encoding='utf8'):
"""
Write text data to a file.
:param fname: Name of a file in `DataDir` or any `pathlib.Path`.
"""
self._path(fname).write_text(text, encoding=encoding)
return fname
[docs] def read_csv(self,
fname: typing.Union[str, pathlib.Path],
normalize=None, **kw) -> typing.List[typing.Union[dict, list]]:
"""
Read CSV data from a file.
"""
if not normalize:
return list(dsv.reader(self._path(fname), **kw))
if kw.get('dicts'):
return [collections.OrderedDict(
[(k, unicodedata.normalize(normalize, v)) for k, v in row.items()]
) for row in dsv.reader(self._path(fname), **kw)]
else:
return [[unicodedata.normalize(normalize, k) for k in row]
for row in dsv.reader(self._path(fname), **kw)]
[docs] def write_csv(self,
fname: typing.Union[str, pathlib.Path],
rows: typing.Iterable[typing.List[str]], **kw):
"""
Write CSV data to a file.
"""
with dsv.UnicodeWriter(self._path(fname), **kw) as writer:
writer.writerows(rows)
[docs] def read_xml(self, fname: typing.Union[str, pathlib.Path], wrap=True) -> et.Element:
"""
Reads and parses XML from a file.
"""
xml = xmlchars(self.read(fname))
if wrap:
xml = '<r>{0}</r>'.format(xml)
return et.fromstring(xml.encode('utf8'))
def read_json(self,
fname: typing.Union[str, pathlib.Path],
**kw) -> typing.Union[str, list, dict]:
return jsonlib.load(self._path(fname))
def read_bib(self,
fname: typing.Union[str, pathlib.Path] = 'sources.bib') -> typing.List[Source]:
bib = pybtex.database.parse_string(self.read(fname), bib_format='bibtex')
return [Source.from_entry(k, e) for k, e in bib.entries.items()]
[docs] def ods2csv(self,
fname: typing.Union[str, pathlib.Path],
outdir: typing.Optional[pathlib.Path] = None) -> typing.Dict[str, pathlib.Path]:
"""
Dump the data from an OpenDocument Spreadsheet (suffix .ODS) file to CSV.
.. note::
Requires `cldfbench` to be installed with extra "odf".
"""
if not load_odf: # pragma: no cover
raise EnvironmentError(
'ods2csv is only available when cldfbench is installed with odf support\n'
'pip install cldfbench[odf]')
fname = self._path(fname)
ods_data = load_odf(fname)
tables = [
e for e in ods_data.spreadsheet.childNodes
if e.qname == (ODF_NS_TABLE, 'table')]
outdir = outdir or self
res = {}
for table in tables:
table_name = table.attributes[ODF_NS_TABLE, 'name']
csv_path = outdir / '{}.{}.csv'.format(
fname.stem,
slug(table_name, lowercase=False))
with dsv.UnicodeWriter(csv_path) as writer:
writer.writerows(_ods_to_list(table))
res[table_name] = csv_path
return res
[docs] def xls2csv(self,
fname: typing.Union[str, pathlib.Path],
outdir: typing.Optional[pathlib.Path] = None) -> typing.Dict[str, pathlib.Path]:
"""
Dump the data from an Excel XLS file to CSV.
.. note::
Requires `cldfbench` to be installed with extra "excel".
"""
if not xlrd: # pragma: no cover
raise EnvironmentError(
'xls2csv is only available when cldfbench is installed with excel support\n'
'pip install cldfbench[excel]')
fname = self._path(fname)
res = {}
outdir = outdir or self
try:
wb = xlrd.open_workbook(str(fname))
except xlrd.biffh.XLRDError as e:
if 'xlsx' in str(e):
raise ValueError('To read xlsx files, call xlsx2csv!')
raise # pragma: no cover
for sname in wb.sheet_names():
sheet = wb.sheet_by_name(sname)
if sheet.nrows:
path = outdir.joinpath(fname.stem + '.' + slug(sname, lowercase=False) + '.csv')
with dsv.UnicodeWriter(path) as writer:
for i in range(sheet.nrows):
writer.writerow([col.value for col in sheet.row(i)])
res[sname] = path
return res
[docs] def xlsx2csv(self,
fname: typing.Union[str, pathlib.Path],
outdir: typing.Optional[pathlib.Path] = None) -> typing.Dict[str, pathlib.Path]:
"""
Dump the data from an Excel XLSX file to CSV.
.. note::
Requires `cldfbench` to be installed with extra "excel".
"""
if not openpyxl: # pragma: no cover
raise EnvironmentError(
'xlsx2csv is only available when cldfbench is installed with excel support\n'
'pip install cldfbench[excel]')
def _excel_value(x):
if x is None:
return ""
if isinstance(x, float) and int(x) == x:
# Since Excel does not have an integer type, integers are rendered as "n.0",
# which in turn confuses type detection of tools like csvkit. Thus, we normalize
# numbers of the form "n.0" to "n".
return '{0}'.format(int(x)) # pragma: no cover
return '{0}'.format(x).strip()
fname = self._path(fname)
res = {}
outdir = outdir or self
wb = openpyxl.load_workbook(str(fname), data_only=True)
for sname in wb.sheetnames:
sheet = wb[sname]
path = outdir.joinpath(fname.stem + '.' + slug(sname, lowercase=False) + '.csv')
with dsv.UnicodeWriter(path) as writer:
for row in sheet.rows:
writer.writerow([_excel_value(col.value) for col in row])
res[sname] = path
return res
[docs] @contextlib.contextmanager
def temp_download(self,
url: str,
fname: typing.Union[str, pathlib.Path],
log=None) -> pathlib.Path:
"""
Context manager to use when downloaded data needs to be manipulated before storage \
(e.g. to anonymize it).
Usage:
.. code-block:: python
with ds.raw_dir.temp_download('http://example.org/data.txt') as p:
ds.raw_dir.write('data.txt', p.read_text(encoding='utf8').split('##')[0])
"""
p = None
try:
p = self.download(url, fname, log=log)
yield p
finally:
if p and p.exists():
p.unlink()
[docs] def download(self,
url: str,
fname: typing.Union[str, pathlib.Path],
log=None,
skip_if_exists=False):
"""
Download data from a URL to the directory.
"""
p = self._path(fname)
if p.exists() and skip_if_exists:
return p
res = get_url(url, log=log, stream=True)
with p.open('wb') as fp:
for chunk in res.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
fp.write(chunk)
return p
[docs] def download_and_unpack(self, url: str, *paths: str, **kw):
"""
Download a zipfile and immediately unpack selected content.
:param url: URL from where to download the archive.
:param paths: Path names to be compared to `ZipInfo.filename`.
:param kw:
"""
with self.temp_download(url, 'ds.zip', log=kw.pop('log', None)) as zipp:
with TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(str(zipp)) as zipf:
for info in zipf.infolist():
if (not paths) or info.filename in paths:
zipf.extract(info, path=str(tmpdir))
shutil.copy(str(tmpdir.joinpath(info.filename)), str(self))