Merge branch 'feat/alto'

master
Gerber, Mike 2 years ago
commit a2fb3ee387

@ -1 +0,0 @@
from .modstool import *

@ -0,0 +1,202 @@
#!/usr/bin/env python3
import csv
import logging
import os
import re
import warnings
import sys
from xml.dom.expatbuilder import Namespaces
from lxml import etree as ET
from itertools import groupby
from operator import attrgetter
from typing import List
from collections.abc import MutableMapping, Sequence
import click
import pandas as pd
import numpy as np
from tqdm import tqdm
from .lib import TagGroup, sorted_groupby, flatten, ns
logger = logging.getLogger('alto4pandas')
def alto_to_dict(alto, raise_errors=True):
"""Convert ALTO metadata to a nested dictionary"""
value = {}
# Iterate through each group of tags
for tag, group in sorted_groupby(alto, key=attrgetter('tag')):
group = list(group)
localname = ET.QName(tag).localname
if localname == 'Description':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors)
elif localname == 'MeasurementUnit':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'OCRProcessing':
value[localname] = TagGroup(tag, group).is_singleton().descend(raise_errors)
elif localname == 'Processing':
# TODO This enumerated descent is used more than once, DRY!
for n, e in enumerate(group):
value[f'{localname}{n}'] = alto_to_dict(e, raise_errors)
elif localname == 'ocrProcessingStep':
for n, e in enumerate(group):
value[f'{localname}{n}'] = alto_to_dict(e, raise_errors)
elif localname == 'preProcessingStep':
for n, e in enumerate(group):
value[f'{localname}{n}'] = alto_to_dict(e, raise_errors)
elif localname == 'processingDateTime':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'processingSoftware':
value[localname] = TagGroup(tag, group).is_singleton().descend(raise_errors)
elif localname == 'processingAgency':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'processingStepDescription':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'processingStepSettings':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'softwareCreator':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'softwareName':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'softwareVersion':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'sourceImageInformation':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors)
elif localname == 'fileName':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'Layout':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors)
elif localname == 'Page':
alto_namespace = ET.QName(group[0]).namespace
namespaces={"alto": alto_namespace}
value[localname] = {}
value[localname].update(TagGroup(tag, group).is_singleton().attributes())
value[localname].update(TagGroup(tag, group).subelement_counts())
value[localname].update(TagGroup(tag, group).xpath_statistics("//alto:String/@WC", namespaces))
elif localname == 'Styles':
pass
elif localname == 'Tags':
pass
else:
if raise_errors:
print(value)
raise ValueError('Unknown tag "{}"'.format(tag))
else:
pass
return value
def walk(m):
# XXX do this in modstool, too
if os.path.isdir(m):
tqdm.write(f'Scanning directory {m}')
for f in tqdm(os.scandir(m), leave=False):
if f.is_file() and not f.name.startswith('.'):
yield f.path
elif f.is_dir():
try:
yield from walk(f.path)
except PermissionError:
warnings.warn(f"Error walking {f.path}")
else:
yield m.path
@click.command()
@click.argument('alto_files', type=click.Path(exists=True), required=True, nargs=-1)
@click.option('--output', '-o', 'output_file', type=click.Path(), help='Output pickle file',
default='alto_info_df.pkl', show_default=True)
@click.option('--output-csv', type=click.Path(), help='Output CSV file')
@click.option('--output-xlsx', type=click.Path(), help='Output Excel .xlsx file')
def process(alto_files: List[str], output_file: str, output_csv: str, output_xlsx: str):
"""
A tool to convert the ALTO metadata in INPUT to a pandas DataFrame.
INPUT is assumed to be a ALTO document. INPUT may optionally be a directory. The tool then reads
all files in the directory.
alto4pandas writes two output files: A pickled pandas DataFrame and a CSV file with all conversion warnings.
"""
# Extend file list if directories are given
alto_files_real = []
for m in alto_files:
for x in walk(m):
alto_files_real.append(x)
# Process ALTO files
with open(output_file + '.warnings.csv', 'w') as csvfile:
csvwriter = csv.writer(csvfile)
alto_info = []
logger.info('Processing ALTO files')
for alto_file in tqdm(alto_files_real, leave=False):
try:
root = ET.parse(alto_file).getroot()
alto = root # XXX .find('alto:alto', ns) does not work here
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter('always') # do NOT filter double occurrences
# ALTO
d = flatten(alto_to_dict(alto, raise_errors=True))
# "meta"
d['alto_file'] = alto_file
d['alto_xmlns'] = ET.QName(alto).namespace
alto_info.append(d)
if caught_warnings:
# PyCharm thinks caught_warnings is not Iterable:
# noinspection PyTypeChecker
for caught_warning in caught_warnings:
csvwriter.writerow([alto_file, caught_warning.message])
except Exception as e:
logger.error('Exception in {}: {}'.format(alto_file, e))
import traceback; traceback.print_exc()
# Convert the alto_info List[Dict] to a pandas DataFrame
columns = []
for m in alto_info:
for c in m.keys():
if c not in columns:
columns.append(c)
data = [[m.get(c) for c in columns] for m in alto_info]
index = [m['alto_file'] for m in alto_info] # TODO use ppn + page?
alto_info_df = pd.DataFrame(data=data, index=index, columns=columns)
# Pickle the DataFrame
logger.info('Writing DataFrame to {}'.format(output_file))
alto_info_df.to_pickle(output_file)
if output_csv:
logger.info('Writing CSV to {}'.format(output_csv))
alto_info_df.to_csv(output_csv)
if output_xlsx:
logger.info('Writing Excel .xlsx to {}'.format(output_xlsx))
alto_info_df.to_excel(output_xlsx)
def main():
logging.basicConfig(level=logging.INFO)
for prefix, uri in ns.items():
ET.register_namespace(prefix, uri)
process()
if __name__ == '__main__':
main()

@ -0,0 +1,285 @@
from itertools import groupby
import re
import warnings
from typing import List, Sequence, MutableMapping
import numpy as np
from lxml import etree as ET
__all__ = ["ns"]
ns = {
'mets': 'http://www.loc.gov/METS/',
'mods': 'http://www.loc.gov/mods/v3',
"alto": "http://www.loc.gov/standards/alto/ns-v2"
}
class TagGroup:
"""Helper class to simplify the parsing and checking of MODS metadata"""
def __init__(self, tag, group: List[ET.Element]):
self.tag = tag
self.group = group
def __str__(self):
return '\n'.join(str(ET.tostring(e), 'utf-8').strip() for e in self.group)
def is_singleton(self):
if len(self.group) != 1:
raise ValueError('More than one instance: {}'.format(self))
return self
def has_no_attributes(self):
return self.has_attributes({})
def has_attributes(self, attrib):
if not isinstance(attrib, Sequence):
attrib = [attrib]
if not all(e.attrib in attrib for e in self.group):
raise ValueError('One or more element has unexpected attributes: {}'.format(self))
return self
def ignore_attributes(self):
# This serves as documentation for now.
return self
def sort(self, key=None, reverse=False):
self.group = sorted(self.group, key=key, reverse=reverse)
return self
def text(self, separator='\n'):
t = ''
for e in self.group:
if t != '':
t += separator
if e.text:
t += e.text
return t
def text_set(self):
return {e.text for e in self.group}
def descend(self, raise_errors):
return _to_dict(self.is_singleton().group[0], raise_errors)
def filter(self, cond, warn=None):
new_group = []
for e in self.group:
if cond(e):
new_group.append(e)
else:
if warn:
warnings.warn('Filtered {} element ({})'.format(self.tag, warn))
return TagGroup(self.tag, new_group)
def force_singleton(self, warn=True):
if len(self.group) == 1:
return self
else:
if warn:
warnings.warn('Forced single instance of {}'.format(self.tag))
return TagGroup(self.tag, self.group[:1])
RE_ISO8601_DATE = r'^\d{2}(\d{2}|XX)(-\d{2}-\d{2})?$' # Note: Includes non-specific century dates like '18XX'
RE_GERMAN_DATE = r'^(?P<dd>\d{2})\.(?P<mm>\d{2})\.(?P<yyyy>\d{4})$'
def fix_date(self):
for e in self.group:
if e.attrib.get('encoding') == 'w3cdtf':
# This should be 'iso8601' according to MODS-AP 2.3.1
warnings.warn('Changed w3cdtf encoding to iso8601')
e.attrib['encoding'] = 'iso8601'
new_group = []
for e in self.group:
if e.attrib.get('encoding') == 'iso8601' and re.match(self.RE_ISO8601_DATE, e.text):
new_group.append(e)
elif re.match(self.RE_ISO8601_DATE, e.text):
warnings.warn('Added iso8601 encoding to date {}'.format(e.text))
e.attrib['encoding'] = 'iso8601'
new_group.append(e)
elif re.match(self.RE_GERMAN_DATE, e.text):
warnings.warn('Converted date {} to iso8601 encoding'.format(e.text))
m = re.match(self.RE_GERMAN_DATE, e.text)
e.text = '{}-{}-{}'.format(m.group('yyyy'), m.group('mm'), m.group('dd'))
e.attrib['encoding'] = 'iso8601'
new_group.append(e)
else:
warnings.warn('Not a iso8601 date: "{}"'.format(e.text))
new_group.append(e)
self.group = new_group
# Notes:
# - There are dates with the misspelled qualifier 'aproximate'
# - Rough periods are sometimes given either by:
# - years like '19xx'
# - or 'approximate' date ranges with point="start"/"end" attributes set
# (this could be correct according to MODS-AP 2.3.1)
# - Some very specific dates like '06.08.1820' are sometimes given the 'approximate' qualifier
# - Sometimes, approximate date ranges are given in the text "1785-1800 (ca.)"
return self
def fix_event_type(self):
# According to MODS-AP 2.3.1, every originInfo should have its eventType set.
# Fix this for special cases.
for e in self.group:
if e.attrib.get('eventType') is None:
try:
if e.find('mods:publisher', ns).text.startswith('Staatsbibliothek zu Berlin') and \
e.find('mods:edition', ns).text == '[Electronic ed.]':
e.attrib['eventType'] = 'digitization'
warnings.warn('Fixed eventType for electronic ed.')
continue
except AttributeError:
pass
try:
if e.find('mods:dateIssued', ns) is not None:
e.attrib['eventType'] = 'publication'
warnings.warn('Fixed eventType for an issued origin')
continue
except AttributeError:
pass
try:
if e.find('mods:dateCreated', ns) is not None:
e.attrib['eventType'] = 'production'
warnings.warn('Fixed eventType for a created origin')
continue
except AttributeError:
pass
return self
def fix_script_term(self):
for e in self.group:
# MODS-AP 2.3.1 is not clear about this, but it looks like that this should be lower case.
if e.attrib['authority'] == 'ISO15924':
e.attrib['authority'] = 'iso15924'
warnings.warn('Changed scriptTerm authority to lower case')
return self
def merge_sub_tags_to_set(self):
from .modstool import mods_to_dict
value = {}
sub_dicts = [mods_to_dict(e) for e in self.group]
sub_tags = {k for d in sub_dicts for k in d.keys()}
for sub_tag in sub_tags:
s = set()
for d in sub_dicts:
v = d.get(sub_tag)
if v:
# There could be multiple scriptTerms in one language element, e.g. Antiqua and Fraktur in a
# German language document.
if isinstance(v, set):
s.update(v)
else:
s.add(v)
value[sub_tag] = s
return value
def attributes(self):
"""
Return a merged dict of all attributes of the tag group.
Probably most useful if used on a singleton, for example:
value['Page'] = TagGroup(tag, group).is_singleton().attributes()
"""
attrib = {}
for e in self.group:
for a, v in e.attrib.items():
a_localname = ET.QName(a).localname
attrib[a_localname] = v
return attrib
def subelement_counts(self):
counts = {}
for e in self.group:
for x in e.iter():
tag = ET.QName(x.tag).localname
key = f"{tag}-count"
counts[key] = counts.get(key, 0) + 1
return counts
def xpath_statistics(self, xpath_expr, namespaces):
"""
Extract values and calculate statistics
Extract values using the given XPath expression, convert them to float and return descriptive
statistics on the values.
"""
values = []
for e in self.group:
r = e.xpath(xpath_expr, namespaces=namespaces)
values += r
values = np.array([float(v) for v in values])
statistics = {}
if values.size > 0:
statistics[f'{xpath_expr}-mean'] = np.mean(values)
statistics[f'{xpath_expr}-median'] = np.median(values)
statistics[f'{xpath_expr}-std'] = np.std(values)
statistics[f'{xpath_expr}-min'] = np.min(values)
statistics[f'{xpath_expr}-max'] = np.max(values)
return statistics
def sorted_groupby(iterable, key=None):
"""
Sort iterable by key and then group by the same key.
itertools.groupby() assumes that the iterable is already sorted. This function
conveniently sorts the iterable first, and then groups its elements.
"""
return groupby(sorted(iterable, key=key), key=key)
def _to_dict(root, raise_errors):
from .modstool import mods_to_dict, mets_to_dict
from .alto4pandas import alto_to_dict
root_name = ET.QName(root.tag)
if root_name.namespace == "http://www.loc.gov/mods/v3":
return mods_to_dict(root, raise_errors)
elif root_name.namespace == "http://www.loc.gov/METS/":
return mets_to_dict(root, raise_errors)
elif root_name.namespace in [
"http://schema.ccs-gmbh.com/ALTO",
"http://www.loc.gov/standards/alto/",
"http://www.loc.gov/standards/alto/ns-v2#",
"http://www.loc.gov/standards/alto/ns-v4#",
]:
return alto_to_dict(root, raise_errors)
else:
raise ValueError(f"Unknown namespace {root_name.namespace}")
def flatten(d: MutableMapping, parent='', separator='_'):
"""
Flatten the given nested dict.
It is assumed that d maps strings to either another dictionary (similarly structured) or some other value.
"""
items = []
for k, v in d.items():
if parent:
new_key = parent + separator + k
else:
new_key = k
if isinstance(v, MutableMapping):
items.extend(flatten(v, new_key, separator=separator).items())
else:
items.append((new_key, v))
return dict(items)

@ -14,196 +14,11 @@ import click
import pandas as pd
from tqdm import tqdm
from .lib import sorted_groupby, TagGroup, ns, flatten
ns = {
'mets': 'http://www.loc.gov/METS/',
'mods': 'http://www.loc.gov/mods/v3'
}
logger = logging.getLogger('modstool')
class TagGroup:
"""Helper class to simplify the parsing and checking of MODS metadata"""
def __init__(self, tag, group: List[ET.Element]):
self.tag = tag
self.group = group
def __str__(self):
return '\n'.join(str(ET.tostring(e), 'utf-8').strip() for e in self.group)
def is_singleton(self):
if len(self.group) != 1:
raise ValueError('More than one instance: {}'.format(self))
return self
def has_no_attributes(self):
return self.has_attributes({})
def has_attributes(self, attrib):
if not isinstance(attrib, Sequence):
attrib = [attrib]
if not all(e.attrib in attrib for e in self.group):
raise ValueError('One or more element has unexpected attributes: {}'.format(self))
return self
def ignore_attributes(self):
# This serves as documentation for now.
return self
def sort(self, key=None, reverse=False):
self.group = sorted(self.group, key=key, reverse=reverse)
return self
def text(self, separator='\n'):
t = ''
for e in self.group:
if t != '':
t += separator
t += e.text
return t
def text_set(self):
return {e.text for e in self.group}
def descend(self, raise_errors):
return _to_dict(self.is_singleton().group[0], raise_errors)
def filter(self, cond, warn=None):
new_group = []
for e in self.group:
if cond(e):
new_group.append(e)
else:
if warn:
warnings.warn('Filtered {} element ({})'.format(self.tag, warn))
return TagGroup(self.tag, new_group)
def force_singleton(self, warn=True):
if len(self.group) == 1:
return self
else:
if warn:
warnings.warn('Forced single instance of {}'.format(self.tag))
return TagGroup(self.tag, self.group[:1])
RE_ISO8601_DATE = r'^\d{2}(\d{2}|XX)(-\d{2}-\d{2})?$' # Note: Includes non-specific century dates like '18XX'
RE_GERMAN_DATE = r'^(?P<dd>\d{2})\.(?P<mm>\d{2})\.(?P<yyyy>\d{4})$'
def fix_date(self):
for e in self.group:
if e.attrib.get('encoding') == 'w3cdtf':
# This should be 'iso8601' according to MODS-AP 2.3.1
warnings.warn('Changed w3cdtf encoding to iso8601')
e.attrib['encoding'] = 'iso8601'
new_group = []
for e in self.group:
if e.attrib.get('encoding') == 'iso8601' and re.match(self.RE_ISO8601_DATE, e.text):
new_group.append(e)
elif re.match(self.RE_ISO8601_DATE, e.text):
warnings.warn('Added iso8601 encoding to date {}'.format(e.text))
e.attrib['encoding'] = 'iso8601'
new_group.append(e)
elif re.match(self.RE_GERMAN_DATE, e.text):
warnings.warn('Converted date {} to iso8601 encoding'.format(e.text))
m = re.match(self.RE_GERMAN_DATE, e.text)
e.text = '{}-{}-{}'.format(m.group('yyyy'), m.group('mm'), m.group('dd'))
e.attrib['encoding'] = 'iso8601'
new_group.append(e)
else:
warnings.warn('Not a iso8601 date: "{}"'.format(e.text))
new_group.append(e)
self.group = new_group
# Notes:
# - There are dates with the misspelled qualifier 'aproximate'
# - Rough periods are sometimes given either by:
# - years like '19xx'
# - or 'approximate' date ranges with point="start"/"end" attributes set
# (this could be correct according to MODS-AP 2.3.1)
# - Some very specific dates like '06.08.1820' are sometimes given the 'approximate' qualifier
# - Sometimes, approximate date ranges are given in the text "1785-1800 (ca.)"
return self
def fix_event_type(self):
# According to MODS-AP 2.3.1, every originInfo should have its eventType set.
# Fix this for special cases.
for e in self.group:
if e.attrib.get('eventType') is None:
try:
if e.find('mods:publisher', ns).text.startswith('Staatsbibliothek zu Berlin') and \
e.find('mods:edition', ns).text == '[Electronic ed.]':
e.attrib['eventType'] = 'digitization'
warnings.warn('Fixed eventType for electronic ed.')
continue
except AttributeError:
pass
try:
if e.find('mods:dateIssued', ns) is not None:
e.attrib['eventType'] = 'publication'
warnings.warn('Fixed eventType for an issued origin')
continue
except AttributeError:
pass
try:
if e.find('mods:dateCreated', ns) is not None:
e.attrib['eventType'] = 'production'
warnings.warn('Fixed eventType for a created origin')
continue
except AttributeError:
pass
return self
def fix_script_term(self):
for e in self.group:
# MODS-AP 2.3.1 is not clear about this, but it looks like that this should be lower case.
if e.attrib['authority'] == 'ISO15924':
e.attrib['authority'] = 'iso15924'
warnings.warn('Changed scriptTerm authority to lower case')
return self
def merge_sub_tags_to_set(self):
value = {}
sub_dicts = [mods_to_dict(e) for e in self.group]
sub_tags = {k for d in sub_dicts for k in d.keys()}
for sub_tag in sub_tags:
s = set()
for d in sub_dicts:
v = d.get(sub_tag)
if v:
# There could be multiple scriptTerms in one language element, e.g. Antiqua and Fraktur in a
# German language document.
if isinstance(v, set):
s.update(v)
else:
s.add(v)
value[sub_tag] = s
return value
def sorted_groupby(iterable, key=None):
"""
Sort iterable by key and then group by the same key.
itertools.groupby() assumes that the iterable is already sorted. This function
conveniently sorts the iterable first, and then groups its elements.
"""
return groupby(sorted(iterable, key=key), key=key)
def _to_dict(root, raise_errors):
root_name = ET.QName(root.tag)
if root_name.namespace == "http://www.loc.gov/mods/v3":
return mods_to_dict(root, raise_errors)
elif root_name.namespace == "http://www.loc.gov/METS/":
return mets_to_dict(root, raise_errors)
else:
raise ValueError(f"Unknown namespace {root_name.namespace}")
logger = logging.getLogger('modstool')
def mods_to_dict(mods, raise_errors=True):
"""Convert MODS metadata to a nested dictionary"""
@ -427,28 +242,6 @@ def mets_to_dict(mets, raise_errors=True):
return value
def flatten(d: MutableMapping, parent='', separator='_'):
"""
Flatten the given nested dict.
It is assumed that d maps strings to either another dictionary (similarly structured) or some other value.
"""
items = []
for k, v in d.items():
if parent:
new_key = parent + separator + k
else:
new_key = k
if isinstance(v, MutableMapping):
items.extend(flatten(v, new_key, separator=separator).items())
else:
items.append((new_key, v))
return dict(items)
@click.command()
@click.argument('mets_files', type=click.Path(exists=True), required=True, nargs=-1)
@click.option('--output', '-o', 'output_file', type=click.Path(), help='Output pickle file',

@ -0,0 +1,39 @@
import xml.etree.ElementTree as ET
from qurator.modstool.alto4pandas import alto_to_dict
from qurator.modstool.lib import flatten
def dict_fromstring(x):
return flatten(alto_to_dict(ET.fromstring(x)))
def test_Page_counts():
"""
Elements below Layout/Page should be counted
"""
d = dict_fromstring("""
<alto xmlns="http://www.loc.gov/standards/alto/ns-v2#">
<Layout>
<Page ID="Page1" PHYSICAL_IMG_NR="1">
<TextBlock ID="Page1_Block1">
<TextLine>
<String STYLE="bold" WC="0.8937500119" CONTENT="Staatsbibliothek" />
</TextLine>
<TextLine>
<String STYLE="bold" WC="0.8899999857" CONTENT="zu" />
<String STYLE="bold" WC="0.9866666794" CONTENT="Berlin" />
</TextLine>
<TextLine>
<String STYLE="bold" WC="1." CONTENT="WM" />
<String STYLE="bold" WC="0.8927272558" CONTENT="Preußischer" />
<String STYLE="bold" WC="0.9058333039" CONTENT="Kulturbesitz" />
</TextLine>
</TextBlock>
</Page>
</Layout>
</alto>
""")
assert d['Layout_Page_TextBlock-count'] == 1
assert d['Layout_Page_TextLine-count'] == 3
assert d['Layout_Page_String-count'] == 6

@ -1,8 +1,8 @@
import pytest
import xml.etree.ElementTree as ET
from .. import mets_to_dict, flatten
from qurator.modstool.modstool import mets_to_dict
from qurator.modstool.lib import flatten
def dict_fromstring(x):

@ -1,8 +1,10 @@
from tkinter import W
import pytest
import xml.etree.ElementTree as ET
from .. import mods_to_dict, flatten
from qurator.modstool.modstool import mods_to_dict
from qurator.modstool.lib import flatten
def dict_fromstring(x):

@ -3,6 +3,8 @@ from setuptools import find_packages, setup
with open('requirements.txt') as fp:
install_requires = fp.read()
with open('requirements-test.txt') as fp:
tests_requires = fp.read()
setup(
name='modstool',
@ -19,8 +21,9 @@ setup(
entry_points={
'console_scripts': [
'modstool=qurator.modstool.modstool:main',
'alto4pandas=qurator.modstool.alto4pandas:main',
]
},
python_requires='>=3.0.0',
tests_require=['pytest'],
tests_requires=tests_requires,
)

Loading…
Cancel
Save