1
0
Fork 0
mirror of https://github.com/qurator-spk/modstool.git synced 2025-08-14 03:59:53 +02:00

🎨 Reformat (Black)

This commit is contained in:
Mike Gerber 2025-06-12 09:51:02 +02:00
parent 5c9858a061
commit 212df99436
7 changed files with 639 additions and 355 deletions

View file

@ -18,7 +18,14 @@ import click
import numpy as np
from tqdm import tqdm
from .lib import TagGroup, convert_db_to_parquet, sorted_groupby, flatten, ns, insert_into_db
from .lib import (
TagGroup,
convert_db_to_parquet,
sorted_groupby,
flatten,
ns,
insert_into_db,
)
with warnings.catch_warnings():
# Filter warnings on WSL
@ -27,8 +34,7 @@ with warnings.catch_warnings():
import pandas as pd
logger = logging.getLogger('alto4pandas')
logger = logging.getLogger("alto4pandas")
def alto_to_dict(alto, raise_errors=True):
@ -37,56 +43,91 @@ def alto_to_dict(alto, raise_errors=True):
value = {}
# Iterate through each group of tags
for tag, group in sorted_groupby(alto, key=attrgetter('tag')):
for tag, group in sorted_groupby(alto, key=attrgetter("tag")):
group = list(group)
localname = ET.QName(tag).localname
alto_namespace = ET.QName(tag).namespace
namespaces={"alto": alto_namespace}
namespaces = {"alto": alto_namespace}
if localname == 'Description':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors)
elif localname == 'MeasurementUnit':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'OCRProcessing':
if localname == "Description":
value[localname] = (
TagGroup(tag, group)
.is_singleton()
.has_no_attributes()
.descend(raise_errors)
)
elif localname == "MeasurementUnit":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "OCRProcessing":
value[localname] = TagGroup(tag, group).is_singleton().descend(raise_errors)
elif localname == 'Processing':
elif localname == "Processing":
# TODO This enumerated descent is used more than once, DRY!
for n, e in enumerate(group):
value[f'{localname}{n}'] = alto_to_dict(e, raise_errors)
elif localname == 'ocrProcessingStep':
value[f"{localname}{n}"] = alto_to_dict(e, raise_errors)
elif localname == "ocrProcessingStep":
for n, e in enumerate(group):
value[f'{localname}{n}'] = alto_to_dict(e, raise_errors)
elif localname == 'preProcessingStep':
value[f"{localname}{n}"] = alto_to_dict(e, raise_errors)
elif localname == "preProcessingStep":
for n, e in enumerate(group):
value[f'{localname}{n}'] = alto_to_dict(e, raise_errors)
elif localname == 'processingDateTime':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'processingSoftware':
value[f"{localname}{n}"] = alto_to_dict(e, raise_errors)
elif localname == "processingDateTime":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "processingSoftware":
value[localname] = TagGroup(tag, group).is_singleton().descend(raise_errors)
elif localname == 'processingAgency':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'processingStepDescription':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'processingStepSettings':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'softwareCreator':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'softwareName':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'softwareVersion':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == "processingAgency":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "processingStepDescription":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "processingStepSettings":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "softwareCreator":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "softwareName":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "softwareVersion":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == 'sourceImageInformation':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors)
elif localname == 'fileName':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == 'fileIdentifier':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif localname == "sourceImageInformation":
value[localname] = (
TagGroup(tag, group)
.is_singleton()
.has_no_attributes()
.descend(raise_errors)
)
elif localname == "fileName":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "fileIdentifier":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == 'Layout':
value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors)
elif localname == 'Page':
elif localname == "Layout":
value[localname] = (
TagGroup(tag, group)
.is_singleton()
.has_no_attributes()
.descend(raise_errors)
)
elif localname == "Page":
value[localname] = {}
value[localname].update(TagGroup(tag, group).is_singleton().attributes())
for attr in ("WIDTH", "HEIGHT"):
@ -96,14 +137,18 @@ def alto_to_dict(alto, raise_errors=True):
except ValueError:
del value[localname][attr]
value[localname].update(TagGroup(tag, group).subelement_counts())
value[localname].update(TagGroup(tag, group).xpath_statistics("//alto:String/@WC", namespaces))
value[localname].update(
TagGroup(tag, group).xpath_statistics("//alto:String/@WC", namespaces)
)
# Count all alto:String elements with TAGREFS attribute
value[localname].update(TagGroup(tag, group).xpath_count("//alto:String[@TAGREFS]", namespaces))
value[localname].update(
TagGroup(tag, group).xpath_count("//alto:String[@TAGREFS]", namespaces)
)
elif localname == 'Styles':
elif localname == "Styles":
pass
elif localname == 'Tags':
elif localname == "Tags":
value[localname] = {}
value[localname].update(TagGroup(tag, group).subelement_counts())
else:
@ -116,13 +161,12 @@ def alto_to_dict(alto, raise_errors=True):
return value
def walk(m):
# XXX do this in mods4pandas, too
if os.path.isdir(m):
tqdm.write(f'Scanning directory {m}')
tqdm.write(f"Scanning directory {m}")
for f in tqdm(os.scandir(m), leave=False):
if f.is_file() and not f.name.startswith('.'):
if f.is_file() and not f.name.startswith("."):
yield f.path
elif f.is_dir():
try:
@ -133,11 +177,17 @@ def walk(m):
yield m.path
@click.command()
@click.argument('alto_files', type=click.Path(exists=True), required=True, nargs=-1)
@click.option('--output', '-o', 'output_file', type=click.Path(), help='Output Parquet file',
default='alto_info_df.parquet', show_default=True)
@click.argument("alto_files", type=click.Path(exists=True), required=True, nargs=-1)
@click.option(
"--output",
"-o",
"output_file",
type=click.Path(),
help="Output Parquet file",
default="alto_info_df.parquet",
show_default=True,
)
def process_command(alto_files: List[str], output_file: str):
"""
A tool to convert the ALTO metadata in INPUT to a pandas DataFrame.
@ -153,6 +203,7 @@ def process_command(alto_files: List[str], output_file: str):
process(alto_files, output_file)
def process(alto_files: List[str], output_file: str):
# Extend file list if directories are given
alto_files_real = []
@ -167,26 +218,26 @@ def process(alto_files: List[str], output_file: str):
with contextlib.suppress(FileNotFoundError):
os.remove(output_file_sqlite3)
logger.info('Writing SQLite DB to {}'.format(output_file_sqlite3))
logger.info("Writing SQLite DB to {}".format(output_file_sqlite3))
con = sqlite3.connect(output_file_sqlite3)
# Process ALTO files
with open(output_file + '.warnings.csv', 'w') as csvfile:
with open(output_file + ".warnings.csv", "w") as csvfile:
csvwriter = csv.writer(csvfile)
logger.info('Processing ALTO files')
logger.info("Processing ALTO files")
for alto_file in tqdm(alto_files_real, leave=False):
try:
root = ET.parse(alto_file).getroot()
alto = root # XXX .find('alto:alto', ns) does not work here
alto = root # XXX .find('alto:alto', ns) does not work here
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter('always') # do NOT filter double occurrences
warnings.simplefilter("always") # do NOT filter double occurrences
# ALTO
d = flatten(alto_to_dict(alto, raise_errors=True))
# "meta"
d['alto_file'] = alto_file
d['alto_xmlns'] = ET.QName(alto).namespace
d["alto_file"] = alto_file
d["alto_xmlns"] = ET.QName(alto).namespace
# Save
insert_into_db(con, "alto_info", d)
@ -198,11 +249,13 @@ def process(alto_files: List[str], output_file: str):
for caught_warning in caught_warnings:
csvwriter.writerow([alto_file, caught_warning.message])
except Exception as e:
logger.error('Exception in {}: {}'.format(alto_file, e))
import traceback; traceback.print_exc()
logger.error("Exception in {}: {}".format(alto_file, e))
import traceback
traceback.print_exc()
# Convert the alto_info SQL to a pandas DataFrame
logger.info('Writing DataFrame to {}'.format(output_file))
logger.info("Writing DataFrame to {}".format(output_file))
convert_db_to_parquet(con, "alto_info", "alto_file", output_file)
@ -215,5 +268,5 @@ def main():
process()
if __name__ == '__main__':
if __name__ == "__main__":
main()