1
0
Fork 0
mirror of https://github.com/qurator-spk/modstool.git synced 2025-08-13 11:39:53 +02:00
modstool/src/mods4pandas/alto4pandas.py

256 lines
8.6 KiB
Python
Raw Normal View History

2022-05-04 20:02:27 +02:00
#!/usr/bin/env python3
2025-06-13 19:20:48 +02:00
import contextlib
2022-05-04 20:02:27 +02:00
import csv
import os
import sqlite3
2025-06-13 19:20:48 +02:00
import warnings
2022-05-04 20:02:27 +02:00
from operator import attrgetter
from typing import List
import click
from loguru import logger
2025-06-13 19:20:48 +02:00
from lxml import etree as ET
2022-05-04 20:02:27 +02:00
from tqdm import tqdm
2025-06-12 09:51:02 +02:00
from .lib import (
TagGroup,
convert_db_to_parquet,
flatten,
insert_into_db,
2025-06-13 19:20:48 +02:00
ns,
sorted_groupby,
2025-06-12 09:51:02 +02:00
)
2022-05-04 20:02:27 +02:00
def alto_to_dict(alto, raise_errors=True):
"""Convert ALTO metadata to a nested dictionary"""
value = {}
# Iterate through each group of tags
2025-06-12 09:51:02 +02:00
for tag, group in sorted_groupby(alto, key=attrgetter("tag")):
2022-05-04 20:02:27 +02:00
group = list(group)
2022-05-10 17:46:50 +02:00
localname = ET.QName(tag).localname
alto_namespace = ET.QName(tag).namespace
2025-06-12 09:51:02 +02:00
namespaces = {"alto": alto_namespace}
if localname == "Description":
value[localname] = (
TagGroup(tag, group)
.is_singleton()
.has_no_attributes()
.descend(raise_errors)
)
elif localname == "MeasurementUnit":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "OCRProcessing":
2022-05-10 17:46:50 +02:00
value[localname] = TagGroup(tag, group).is_singleton().descend(raise_errors)
2025-06-12 09:51:02 +02:00
elif localname == "Processing":
2022-05-10 19:32:26 +02:00
# TODO This enumerated descent is used more than once, DRY!
for n, e in enumerate(group):
2025-06-12 09:51:02 +02:00
value[f"{localname}{n}"] = alto_to_dict(e, raise_errors)
elif localname == "ocrProcessingStep":
2022-05-04 20:02:27 +02:00
for n, e in enumerate(group):
2025-06-12 09:51:02 +02:00
value[f"{localname}{n}"] = alto_to_dict(e, raise_errors)
elif localname == "preProcessingStep":
for n, e in enumerate(group):
2025-06-12 09:51:02 +02:00
value[f"{localname}{n}"] = alto_to_dict(e, raise_errors)
elif localname == "processingDateTime":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "processingSoftware":
2022-05-10 17:46:50 +02:00
value[localname] = TagGroup(tag, group).is_singleton().descend(raise_errors)
2025-06-12 09:51:02 +02:00
elif localname == "processingAgency":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "processingStepDescription":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "processingStepSettings":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "softwareCreator":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "softwareName":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "softwareVersion":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "sourceImageInformation":
value[localname] = (
TagGroup(tag, group)
.is_singleton()
.has_no_attributes()
.descend(raise_errors)
)
elif localname == "fileName":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "fileIdentifier":
value[localname] = (
TagGroup(tag, group).is_singleton().has_no_attributes().text()
)
elif localname == "Layout":
value[localname] = (
TagGroup(tag, group)
.is_singleton()
.has_no_attributes()
.descend(raise_errors)
)
elif localname == "Page":
value[localname] = {}
value[localname].update(TagGroup(tag, group).is_singleton().attributes())
for attr in ("WIDTH", "HEIGHT"):
if attr in value[localname]:
try:
value[localname][attr] = int(value[localname][attr])
except ValueError:
del value[localname][attr]
value[localname].update(TagGroup(tag, group).subelement_counts())
2025-06-12 09:51:02 +02:00
value[localname].update(
TagGroup(tag, group).xpath_statistics("//alto:String/@WC", namespaces)
)
2022-05-23 19:12:39 +02:00
# Count all alto:String elements with TAGREFS attribute
2025-06-12 09:51:02 +02:00
value[localname].update(
TagGroup(tag, group).xpath_count("//alto:String[@TAGREFS]", namespaces)
)
2025-06-12 09:51:02 +02:00
elif localname == "Styles":
2022-05-04 20:02:27 +02:00
pass
2025-06-12 09:51:02 +02:00
elif localname == "Tags":
2022-06-17 17:32:17 +02:00
value[localname] = {}
value[localname].update(TagGroup(tag, group).subelement_counts())
2022-05-04 20:02:27 +02:00
else:
if raise_errors:
print(value)
raise ValueError('Unknown tag "{}"'.format(tag))
else:
pass
return value
def walk(m):
# XXX do this in mods4pandas, too
2022-05-04 20:02:27 +02:00
if os.path.isdir(m):
logger.info(f"Scanning directory {m}")
for f in os.scandir(m):
2025-06-12 09:51:02 +02:00
if f.is_file() and not f.name.startswith("."):
2022-05-04 20:02:27 +02:00
yield f.path
elif f.is_dir():
2022-05-09 18:28:31 +02:00
try:
yield from walk(f.path)
except PermissionError:
warnings.warn(f"Error walking {f.path}")
2022-05-04 20:02:27 +02:00
else:
yield m.path
@click.command()
2025-06-12 09:51:02 +02:00
@click.argument("alto_files", type=click.Path(exists=True), required=True, nargs=-1)
@click.option(
"--output",
"-o",
"output_file",
type=click.Path(),
help="Output Parquet file",
default="alto_info_df.parquet",
show_default=True,
)
def process_command(alto_files: List[str], output_file: str):
2022-05-04 20:02:27 +02:00
"""
A tool to convert the ALTO metadata in INPUT to a pandas DataFrame.
INPUT is assumed to be a ALTO document. INPUT may optionally be a directory. The tool then reads
all files in the directory.
alto4pandas writes multiple output files:
- A Parquet DataFrame
- A SQLite database
- and a CSV file with all conversion warnings.
2022-05-04 20:02:27 +02:00
"""
process(alto_files, output_file)
2025-06-12 09:51:02 +02:00
def process(alto_files: List[str], output_file: str):
2022-05-04 20:02:27 +02:00
# Extend file list if directories are given
alto_files_real = []
for m in alto_files:
for x in walk(m):
alto_files_real.append(x)
# Prepare output files
with contextlib.suppress(FileNotFoundError):
os.remove(output_file)
output_file_sqlite3 = output_file + ".sqlite3"
with contextlib.suppress(FileNotFoundError):
os.remove(output_file_sqlite3)
2025-06-12 09:51:02 +02:00
logger.info("Writing SQLite DB to {}".format(output_file_sqlite3))
con = sqlite3.connect(output_file_sqlite3)
2022-05-04 20:02:27 +02:00
# Process ALTO files
2025-06-12 09:51:02 +02:00
with open(output_file + ".warnings.csv", "w") as csvfile:
2022-05-04 20:02:27 +02:00
csvwriter = csv.writer(csvfile)
2025-06-12 09:51:02 +02:00
logger.info("Processing ALTO files")
2022-05-04 20:02:27 +02:00
for alto_file in tqdm(alto_files_real, leave=False):
try:
root = ET.parse(alto_file).getroot()
2025-06-12 09:51:02 +02:00
alto = root # XXX .find('alto:alto', ns) does not work here
2022-05-04 20:02:27 +02:00
with warnings.catch_warnings(record=True) as caught_warnings:
2025-06-12 09:51:02 +02:00
warnings.simplefilter("always") # do NOT filter double occurrences
2022-05-04 20:02:27 +02:00
2022-05-06 19:36:50 +02:00
# ALTO
2022-05-04 20:02:27 +02:00
d = flatten(alto_to_dict(alto, raise_errors=True))
# "meta"
2025-06-12 09:51:02 +02:00
d["alto_file"] = alto_file
d["alto_xmlns"] = ET.QName(alto).namespace
2022-05-04 20:02:27 +02:00
# Save
insert_into_db(con, "alto_info", d)
con.commit()
2022-05-04 20:02:27 +02:00
if caught_warnings:
# PyCharm thinks caught_warnings is not Iterable:
# noinspection PyTypeChecker
for caught_warning in caught_warnings:
csvwriter.writerow([alto_file, caught_warning.message])
except Exception as e:
2025-06-12 09:51:02 +02:00
logger.error("Exception in {}: {}".format(alto_file, e))
import traceback
traceback.print_exc()
2022-05-04 20:02:27 +02:00
# Convert the alto_info SQL to a pandas DataFrame
2025-06-12 09:51:02 +02:00
logger.info("Writing DataFrame to {}".format(output_file))
convert_db_to_parquet(con, "alto_info", "alto_file", output_file)
2022-05-04 20:02:27 +02:00
def main():
for prefix, uri in ns.items():
ET.register_namespace(prefix, uri)
2025-06-13 20:29:12 +02:00
process_command()
2022-05-04 20:02:27 +02:00
2025-06-12 09:51:02 +02:00
if __name__ == "__main__":
2022-05-04 20:02:27 +02:00
main()