From 212df9943630dfb98ee59644f2c6b8c2bf651e92 Mon Sep 17 00:00:00 2001 From: Mike Gerber Date: Thu, 12 Jun 2025 09:51:02 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=20Reformat=20(Black)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/mods4pandas/alto4pandas.py | 177 +++++--- src/mods4pandas/lib.py | 120 +++--- src/mods4pandas/mods4pandas.py | 500 ++++++++++++++-------- src/mods4pandas/tests/test_alto.py | 45 +- src/mods4pandas/tests/test_mets.py | 15 +- src/mods4pandas/tests/test_mods4pandas.py | 111 +++-- src/mods4pandas/tests/test_page_info.py | 26 +- 7 files changed, 639 insertions(+), 355 deletions(-) diff --git a/src/mods4pandas/alto4pandas.py b/src/mods4pandas/alto4pandas.py index 359a26e..27166c9 100755 --- a/src/mods4pandas/alto4pandas.py +++ b/src/mods4pandas/alto4pandas.py @@ -18,7 +18,14 @@ import click import numpy as np from tqdm import tqdm -from .lib import TagGroup, convert_db_to_parquet, sorted_groupby, flatten, ns, insert_into_db +from .lib import ( + TagGroup, + convert_db_to_parquet, + sorted_groupby, + flatten, + ns, + insert_into_db, +) with warnings.catch_warnings(): # Filter warnings on WSL @@ -27,8 +34,7 @@ with warnings.catch_warnings(): import pandas as pd -logger = logging.getLogger('alto4pandas') - +logger = logging.getLogger("alto4pandas") def alto_to_dict(alto, raise_errors=True): @@ -37,56 +43,91 @@ def alto_to_dict(alto, raise_errors=True): value = {} # Iterate through each group of tags - for tag, group in sorted_groupby(alto, key=attrgetter('tag')): + for tag, group in sorted_groupby(alto, key=attrgetter("tag")): group = list(group) localname = ET.QName(tag).localname alto_namespace = ET.QName(tag).namespace - namespaces={"alto": alto_namespace} + namespaces = {"alto": alto_namespace} - if localname == 'Description': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors) - elif localname == 'MeasurementUnit': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif localname == 'OCRProcessing': + if localname == "Description": + value[localname] = ( + TagGroup(tag, group) + .is_singleton() + .has_no_attributes() + .descend(raise_errors) + ) + elif localname == "MeasurementUnit": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif localname == "OCRProcessing": value[localname] = TagGroup(tag, group).is_singleton().descend(raise_errors) - elif localname == 'Processing': + elif localname == "Processing": # TODO This enumerated descent is used more than once, DRY! for n, e in enumerate(group): - value[f'{localname}{n}'] = alto_to_dict(e, raise_errors) - elif localname == 'ocrProcessingStep': + value[f"{localname}{n}"] = alto_to_dict(e, raise_errors) + elif localname == "ocrProcessingStep": for n, e in enumerate(group): - value[f'{localname}{n}'] = alto_to_dict(e, raise_errors) - elif localname == 'preProcessingStep': + value[f"{localname}{n}"] = alto_to_dict(e, raise_errors) + elif localname == "preProcessingStep": for n, e in enumerate(group): - value[f'{localname}{n}'] = alto_to_dict(e, raise_errors) - elif localname == 'processingDateTime': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif localname == 'processingSoftware': + value[f"{localname}{n}"] = alto_to_dict(e, raise_errors) + elif localname == "processingDateTime": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif localname == "processingSoftware": value[localname] = TagGroup(tag, group).is_singleton().descend(raise_errors) - elif localname == 'processingAgency': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif localname == 'processingStepDescription': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif localname == 'processingStepSettings': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif localname == 'softwareCreator': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif localname == 'softwareName': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif localname == 'softwareVersion': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() + elif localname == "processingAgency": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif localname == "processingStepDescription": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif localname == "processingStepSettings": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif localname == "softwareCreator": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif localname == "softwareName": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif localname == "softwareVersion": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) - elif localname == 'sourceImageInformation': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors) - elif localname == 'fileName': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif localname == 'fileIdentifier': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().text() + elif localname == "sourceImageInformation": + value[localname] = ( + TagGroup(tag, group) + .is_singleton() + .has_no_attributes() + .descend(raise_errors) + ) + elif localname == "fileName": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif localname == "fileIdentifier": + value[localname] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) - elif localname == 'Layout': - value[localname] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors) - elif localname == 'Page': + elif localname == "Layout": + value[localname] = ( + TagGroup(tag, group) + .is_singleton() + .has_no_attributes() + .descend(raise_errors) + ) + elif localname == "Page": value[localname] = {} value[localname].update(TagGroup(tag, group).is_singleton().attributes()) for attr in ("WIDTH", "HEIGHT"): @@ -96,14 +137,18 @@ def alto_to_dict(alto, raise_errors=True): except ValueError: del value[localname][attr] value[localname].update(TagGroup(tag, group).subelement_counts()) - value[localname].update(TagGroup(tag, group).xpath_statistics("//alto:String/@WC", namespaces)) + value[localname].update( + TagGroup(tag, group).xpath_statistics("//alto:String/@WC", namespaces) + ) # Count all alto:String elements with TAGREFS attribute - value[localname].update(TagGroup(tag, group).xpath_count("//alto:String[@TAGREFS]", namespaces)) + value[localname].update( + TagGroup(tag, group).xpath_count("//alto:String[@TAGREFS]", namespaces) + ) - elif localname == 'Styles': + elif localname == "Styles": pass - elif localname == 'Tags': + elif localname == "Tags": value[localname] = {} value[localname].update(TagGroup(tag, group).subelement_counts()) else: @@ -116,13 +161,12 @@ def alto_to_dict(alto, raise_errors=True): return value - def walk(m): # XXX do this in mods4pandas, too if os.path.isdir(m): - tqdm.write(f'Scanning directory {m}') + tqdm.write(f"Scanning directory {m}") for f in tqdm(os.scandir(m), leave=False): - if f.is_file() and not f.name.startswith('.'): + if f.is_file() and not f.name.startswith("."): yield f.path elif f.is_dir(): try: @@ -133,11 +177,17 @@ def walk(m): yield m.path - @click.command() -@click.argument('alto_files', type=click.Path(exists=True), required=True, nargs=-1) -@click.option('--output', '-o', 'output_file', type=click.Path(), help='Output Parquet file', - default='alto_info_df.parquet', show_default=True) +@click.argument("alto_files", type=click.Path(exists=True), required=True, nargs=-1) +@click.option( + "--output", + "-o", + "output_file", + type=click.Path(), + help="Output Parquet file", + default="alto_info_df.parquet", + show_default=True, +) def process_command(alto_files: List[str], output_file: str): """ A tool to convert the ALTO metadata in INPUT to a pandas DataFrame. @@ -153,6 +203,7 @@ def process_command(alto_files: List[str], output_file: str): process(alto_files, output_file) + def process(alto_files: List[str], output_file: str): # Extend file list if directories are given alto_files_real = [] @@ -167,26 +218,26 @@ def process(alto_files: List[str], output_file: str): with contextlib.suppress(FileNotFoundError): os.remove(output_file_sqlite3) - logger.info('Writing SQLite DB to {}'.format(output_file_sqlite3)) + logger.info("Writing SQLite DB to {}".format(output_file_sqlite3)) con = sqlite3.connect(output_file_sqlite3) # Process ALTO files - with open(output_file + '.warnings.csv', 'w') as csvfile: + with open(output_file + ".warnings.csv", "w") as csvfile: csvwriter = csv.writer(csvfile) - logger.info('Processing ALTO files') + logger.info("Processing ALTO files") for alto_file in tqdm(alto_files_real, leave=False): try: root = ET.parse(alto_file).getroot() - alto = root # XXX .find('alto:alto', ns) does not work here + alto = root # XXX .find('alto:alto', ns) does not work here with warnings.catch_warnings(record=True) as caught_warnings: - warnings.simplefilter('always') # do NOT filter double occurrences + warnings.simplefilter("always") # do NOT filter double occurrences # ALTO d = flatten(alto_to_dict(alto, raise_errors=True)) # "meta" - d['alto_file'] = alto_file - d['alto_xmlns'] = ET.QName(alto).namespace + d["alto_file"] = alto_file + d["alto_xmlns"] = ET.QName(alto).namespace # Save insert_into_db(con, "alto_info", d) @@ -198,11 +249,13 @@ def process(alto_files: List[str], output_file: str): for caught_warning in caught_warnings: csvwriter.writerow([alto_file, caught_warning.message]) except Exception as e: - logger.error('Exception in {}: {}'.format(alto_file, e)) - import traceback; traceback.print_exc() + logger.error("Exception in {}: {}".format(alto_file, e)) + import traceback + + traceback.print_exc() # Convert the alto_info SQL to a pandas DataFrame - logger.info('Writing DataFrame to {}'.format(output_file)) + logger.info("Writing DataFrame to {}".format(output_file)) convert_db_to_parquet(con, "alto_info", "alto_file", output_file) @@ -215,5 +268,5 @@ def main(): process() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/mods4pandas/lib.py b/src/mods4pandas/lib.py index 68050b1..803c3cd 100644 --- a/src/mods4pandas/lib.py +++ b/src/mods4pandas/lib.py @@ -21,14 +21,13 @@ __all__ = ["ns"] ns = { - 'mets': 'http://www.loc.gov/METS/', - 'mods': 'http://www.loc.gov/mods/v3', + "mets": "http://www.loc.gov/METS/", + "mods": "http://www.loc.gov/mods/v3", "alto": "http://www.loc.gov/standards/alto/ns-v2", "xlink": "http://www.w3.org/1999/xlink", } - class TagGroup: """Helper class to simplify the parsing and checking of MODS metadata""" @@ -37,14 +36,14 @@ class TagGroup: self.group = group def to_xml(self) -> str: - return '\n'.join(str(ET.tostring(e), 'utf-8').strip() for e in self.group) + return "\n".join(str(ET.tostring(e), "utf-8").strip() for e in self.group) def __str__(self) -> str: return f"TagGroup with content:\n{self.to_xml()}" def is_singleton(self) -> TagGroup: if len(self.group) != 1: - raise ValueError('More than one instance: {}'.format(self)) + raise ValueError("More than one instance: {}".format(self)) return self def has_no_attributes(self) -> TagGroup: @@ -54,7 +53,9 @@ class TagGroup: if not isinstance(attrib, Sequence): attrib = [attrib] if not all(e.attrib in attrib for e in self.group): - raise ValueError('One or more element has unexpected attributes: {}'.format(self)) + raise ValueError( + "One or more element has unexpected attributes: {}".format(self) + ) return self def ignore_attributes(self) -> TagGroup: @@ -65,10 +66,10 @@ class TagGroup: self.group = sorted(self.group, key=key, reverse=reverse) return self - def text(self, separator='\n') -> str: - t = '' + def text(self, separator="\n") -> str: + t = "" for e in self.group: - if t != '': + if t != "": t += separator if e.text: t += e.text @@ -87,7 +88,7 @@ class TagGroup: new_group.append(e) else: if warn: - warnings.warn('Filtered {} element ({})'.format(self.tag, warn)) + warnings.warn("Filtered {} element ({})".format(self.tag, warn)) return TagGroup(self.tag, new_group) def force_singleton(self, warn=True) -> TagGroup: @@ -95,35 +96,38 @@ class TagGroup: return self else: if warn: - warnings.warn('Forced single instance of {}'.format(self.tag)) + warnings.warn("Forced single instance of {}".format(self.tag)) return TagGroup(self.tag, self.group[:1]) - RE_ISO8601_DATE = r'^\d{2}(\d{2}|XX)(-\d{2}-\d{2})?$' # Note: Includes non-specific century dates like '18XX' - RE_GERMAN_DATE = r'^(?P
\d{2})\.(?P\d{2})\.(?P\d{4})$' + RE_ISO8601_DATE = r"^\d{2}(\d{2}|XX)(-\d{2}-\d{2})?$" # Note: Includes non-specific century dates like '18XX' + RE_GERMAN_DATE = r"^(?P
\d{2})\.(?P\d{2})\.(?P\d{4})$" def fix_date(self) -> TagGroup: - for e in self.group: - if e.attrib.get('encoding') == 'w3cdtf': + if e.attrib.get("encoding") == "w3cdtf": # This should be 'iso8601' according to MODS-AP 2.3.1 - warnings.warn('Changed w3cdtf encoding to iso8601') - e.attrib['encoding'] = 'iso8601' + warnings.warn("Changed w3cdtf encoding to iso8601") + e.attrib["encoding"] = "iso8601" new_group = [] for e in self.group: if e.text is None: - warnings.warn('Empty date') + warnings.warn("Empty date") continue - if e.attrib.get('encoding') == 'iso8601' and re.match(self.RE_ISO8601_DATE, e.text): + if e.attrib.get("encoding") == "iso8601" and re.match( + self.RE_ISO8601_DATE, e.text + ): new_group.append(e) elif re.match(self.RE_ISO8601_DATE, e.text): - warnings.warn('Added iso8601 encoding to date {}'.format(e.text)) - e.attrib['encoding'] = 'iso8601' + warnings.warn("Added iso8601 encoding to date {}".format(e.text)) + e.attrib["encoding"] = "iso8601" new_group.append(e) elif m := re.match(self.RE_GERMAN_DATE, e.text): - warnings.warn('Converted date {} to iso8601 encoding'.format(e.text)) - e.text = '{}-{}-{}'.format(m.group('yyyy'), m.group('mm'), m.group('dd')) - e.attrib['encoding'] = 'iso8601' + warnings.warn("Converted date {} to iso8601 encoding".format(e.text)) + e.text = "{}-{}-{}".format( + m.group("yyyy"), m.group("mm"), m.group("dd") + ) + e.attrib["encoding"] = "iso8601" new_group.append(e) else: warnings.warn('Not a iso8601 date: "{}"'.format(e.text)) @@ -146,26 +150,30 @@ class TagGroup: # Fix this for special cases. for e in self.group: - if e.attrib.get('eventType') is None: + if e.attrib.get("eventType") is None: try: - if e.find('mods:publisher', ns).text.startswith('Staatsbibliothek zu Berlin') and \ - e.find('mods:edition', ns).text == '[Electronic ed.]': - e.attrib['eventType'] = 'digitization' - warnings.warn('Fixed eventType for electronic ed.') + if ( + e.find("mods:publisher", ns).text.startswith( + "Staatsbibliothek zu Berlin" + ) + and e.find("mods:edition", ns).text == "[Electronic ed.]" + ): + e.attrib["eventType"] = "digitization" + warnings.warn("Fixed eventType for electronic ed.") continue except AttributeError: pass try: - if e.find('mods:dateIssued', ns) is not None: - e.attrib['eventType'] = 'publication' - warnings.warn('Fixed eventType for an issued origin') + if e.find("mods:dateIssued", ns) is not None: + e.attrib["eventType"] = "publication" + warnings.warn("Fixed eventType for an issued origin") continue except AttributeError: pass try: - if e.find('mods:dateCreated', ns) is not None: - e.attrib['eventType'] = 'production' - warnings.warn('Fixed eventType for a created origin') + if e.find("mods:dateCreated", ns) is not None: + e.attrib["eventType"] = "production" + warnings.warn("Fixed eventType for a created origin") continue except AttributeError: pass @@ -174,13 +182,14 @@ class TagGroup: def fix_script_term(self) -> TagGroup: for e in self.group: # MODS-AP 2.3.1 is not clear about this, but it looks like that this should be lower case. - if e.attrib['authority'] == 'ISO15924': - e.attrib['authority'] = 'iso15924' - warnings.warn('Changed scriptTerm authority to lower case') + if e.attrib["authority"] == "ISO15924": + e.attrib["authority"] = "iso15924" + warnings.warn("Changed scriptTerm authority to lower case") return self def merge_sub_tags_to_set(self) -> dict: from .mods4pandas import mods_to_dict + value = {} sub_dicts = [mods_to_dict(e) for e in self.group] @@ -230,6 +239,7 @@ class TagGroup: Extract values using the given XPath expression, convert them to float and return descriptive statistics on the values. """ + def xpath_values(): values = [] for e in self.group: @@ -240,11 +250,11 @@ class TagGroup: values = xpath_values() statistics = {} if values.size > 0: - statistics[f'{xpath_expr}-mean'] = np.mean(values) - statistics[f'{xpath_expr}-median'] = np.median(values) - statistics[f'{xpath_expr}-std'] = np.std(values) - statistics[f'{xpath_expr}-min'] = np.min(values) - statistics[f'{xpath_expr}-max'] = np.max(values) + statistics[f"{xpath_expr}-mean"] = np.mean(values) + statistics[f"{xpath_expr}-median"] = np.median(values) + statistics[f"{xpath_expr}-std"] = np.std(values) + statistics[f"{xpath_expr}-min"] = np.min(values) + statistics[f"{xpath_expr}-max"] = np.max(values) return statistics def xpath_count(self, xpath_expr, namespaces) -> dict[str, int]: @@ -256,11 +266,10 @@ class TagGroup: r = e.xpath(xpath_expr, namespaces=namespaces) values += r - counts = {f'{xpath_expr}-count': len(values)} + counts = {f"{xpath_expr}-count": len(values)} return counts - def sorted_groupby(iterable, key=None): """ Sort iterable by key and then group by the same key. @@ -291,7 +300,7 @@ def _to_dict(root, raise_errors): raise ValueError(f"Unknown namespace {root_name.namespace}") -def flatten(d: MutableMapping, parent='', separator='_') -> dict: +def flatten(d: MutableMapping, parent="", separator="_") -> dict: """ Flatten the given nested dict. @@ -314,11 +323,12 @@ def flatten(d: MutableMapping, parent='', separator='_') -> dict: def valid_column_key(k) -> bool: - if re.match(r'^[a-zA-Z0-9 _@/:\[\]-]+$', k): + if re.match(r"^[a-zA-Z0-9 _@/:\[\]-]+$", k): return True else: return False + def column_names_csv(columns) -> str: """ Format Column names (identifiers) as a comma-separated list. @@ -327,9 +337,11 @@ def column_names_csv(columns) -> str: """ return ",".join('"' + c + '"' for c in columns) + current_columns: dict[str, list] = defaultdict(list) current_columns_types: dict[str, dict] = defaultdict(dict) + def insert_into_db(con, table, d: Dict): """Insert the values from the dict into the table, creating columns if necessary""" @@ -338,7 +350,9 @@ def insert_into_db(con, table, d: Dict): for k in d.keys(): assert valid_column_key(k), f'"{k}" is not a valid column name' current_columns[table].append(k) - con.execute(f"CREATE TABLE {table} ({column_names_csv(current_columns[table])})") + con.execute( + f"CREATE TABLE {table} ({column_names_csv(current_columns[table])})" + ) # Add columns if necessary for k in d.keys(): @@ -361,13 +375,15 @@ def insert_into_db(con, table, d: Dict): f"( {column_names_csv(columns)} )" "VALUES" f"( {','.join('?' for c in columns)} )", - [str(d[c]) for c in columns] + [str(d[c]) for c in columns], ) + def insert_into_db_multiple(con, table, ld: List[Dict]): for d in ld: insert_into_db(con, table, d) + def convert_db_to_parquet(con, table, index_col, output_file): df = pd.read_sql_query(f"SELECT * FROM {table}", con, index_col) @@ -386,6 +402,8 @@ def convert_db_to_parquet(con, table, index_col, output_file): elif column_type == "set": df[c] = df[c].apply(lambda s: list(ast.literal_eval(s)) if s else None) else: - raise NotImplementedError(f"Column {c}: type {column_type} not implemented yet.") + raise NotImplementedError( + f"Column {c}: type {column_type} not implemented yet." + ) - df.to_parquet(output_file) \ No newline at end of file + df.to_parquet(output_file) diff --git a/src/mods4pandas/mods4pandas.py b/src/mods4pandas/mods4pandas.py index 669c1e0..017f6e9 100755 --- a/src/mods4pandas/mods4pandas.py +++ b/src/mods4pandas/mods4pandas.py @@ -17,7 +17,16 @@ from collections.abc import MutableMapping, Sequence import click from tqdm import tqdm -from .lib import convert_db_to_parquet, sorted_groupby, TagGroup, ns, flatten, insert_into_db, insert_into_db_multiple, current_columns_types +from .lib import ( + convert_db_to_parquet, + sorted_groupby, + TagGroup, + ns, + flatten, + insert_into_db, + insert_into_db_multiple, + current_columns_types, +) with warnings.catch_warnings(): # Filter warnings on WSL @@ -26,7 +35,8 @@ with warnings.catch_warnings(): import pandas as pd -logger = logging.getLogger('mods4pandas') +logger = logging.getLogger("mods4pandas") + def mods_to_dict(mods, raise_errors=True): """Convert MODS metadata to a nested dictionary""" @@ -37,179 +47,290 @@ def mods_to_dict(mods, raise_errors=True): value = {} # Iterate through each group of tags - for tag, group in sorted_groupby(mods, key=attrgetter('tag')): + for tag, group in sorted_groupby(mods, key=attrgetter("tag")): group = list(group) - if tag == '{http://www.loc.gov/mods/v3}location': + if tag == "{http://www.loc.gov/mods/v3}location": + def only_current_location(location): - return location.get('type') != 'former' - value['location'] = TagGroup(tag, group) \ - .filter(only_current_location) \ - .has_attributes([{}, {'type': 'current'}]) \ - .is_singleton().descend(raise_errors) - elif tag == '{http://www.loc.gov/mods/v3}physicalLocation': + return location.get("type") != "former" + + value["location"] = ( + TagGroup(tag, group) + .filter(only_current_location) + .has_attributes([{}, {"type": "current"}]) + .is_singleton() + .descend(raise_errors) + ) + elif tag == "{http://www.loc.gov/mods/v3}physicalLocation": + def no_display_label(physical_location): - return physical_location.get('displayLabel') is None - value['physicalLocation'] = TagGroup(tag, group).filter(no_display_label).text() - elif tag == '{http://www.loc.gov/mods/v3}shelfLocator': + return physical_location.get("displayLabel") is None + + value["physicalLocation"] = ( + TagGroup(tag, group).filter(no_display_label).text() + ) + elif tag == "{http://www.loc.gov/mods/v3}shelfLocator": # This element should not be repeated according to MODS-AP 2.3.1, however a few of the files contain # a second element with empty text and a "displayLabel" attribute set. def no_display_label(shelf_locator): - return shelf_locator.get('displayLabel') is None - value['shelfLocator'] = TagGroup(tag, group) \ - .filter(no_display_label) \ - .force_singleton() \ - .has_no_attributes() \ + return shelf_locator.get("displayLabel") is None + + value["shelfLocator"] = ( + TagGroup(tag, group) + .filter(no_display_label) + .force_singleton() + .has_no_attributes() .text() - elif tag == '{http://www.loc.gov/mods/v3}originInfo': + ) + elif tag == "{http://www.loc.gov/mods/v3}originInfo": + def has_event_type(origin_info): # According to MODS-AP 2.3.1, every originInfo should have its eventType set. However, some # are empty and not fixable. - return origin_info.attrib.get('eventType') is not None - tag_group = TagGroup(tag, group).fix_event_type().filter(has_event_type, warn="has no eventType") - for event_type, grouped_group in sorted_groupby(tag_group.group, key=lambda g: g.attrib['eventType']): + return origin_info.attrib.get("eventType") is not None + + tag_group = ( + TagGroup(tag, group) + .fix_event_type() + .filter(has_event_type, warn="has no eventType") + ) + for event_type, grouped_group in sorted_groupby( + tag_group.group, key=lambda g: g.attrib["eventType"] + ): for n, e in enumerate(grouped_group): - value['originInfo-{}{}'.format(event_type, n)] = mods_to_dict(e, raise_errors) - elif tag == '{http://www.loc.gov/mods/v3}place': - value['place'] = TagGroup(tag, group).force_singleton(warn=False).has_no_attributes().descend(raise_errors) - elif tag == '{http://www.loc.gov/mods/v3}placeTerm': - value['placeTerm'] = TagGroup(tag, group).is_singleton().has_attributes({'type': 'text'}).text() - elif tag == '{http://www.loc.gov/mods/v3}dateIssued': - value['dateIssued'] = TagGroup(tag, group) \ - .fix_date() \ - .sort(key=lambda d: d.attrib.get('keyDate') == 'yes', reverse=True) \ - .ignore_attributes() \ - .force_singleton() \ + value["originInfo-{}{}".format(event_type, n)] = mods_to_dict( + e, raise_errors + ) + elif tag == "{http://www.loc.gov/mods/v3}place": + value["place"] = ( + TagGroup(tag, group) + .force_singleton(warn=False) + .has_no_attributes() + .descend(raise_errors) + ) + elif tag == "{http://www.loc.gov/mods/v3}placeTerm": + value["placeTerm"] = ( + TagGroup(tag, group) + .is_singleton() + .has_attributes({"type": "text"}) .text() - elif tag == '{http://www.loc.gov/mods/v3}dateCreated': - value['dateCreated'] = TagGroup(tag, group) \ - .fix_date() \ - .sort(key=lambda d: d.attrib.get('keyDate') == 'yes', reverse=True) \ - .ignore_attributes() \ - .force_singleton() \ + ) + elif tag == "{http://www.loc.gov/mods/v3}dateIssued": + value["dateIssued"] = ( + TagGroup(tag, group) + .fix_date() + .sort(key=lambda d: d.attrib.get("keyDate") == "yes", reverse=True) + .ignore_attributes() + .force_singleton() .text() - elif tag == '{http://www.loc.gov/mods/v3}dateCaptured': - value['dateCaptured'] = TagGroup(tag, group).fix_date().ignore_attributes().is_singleton().text() - elif tag == '{http://www.loc.gov/mods/v3}dateOther': - value['dateOther'] = TagGroup(tag, group).fix_date().ignore_attributes().is_singleton().text() - elif tag == '{http://www.loc.gov/mods/v3}publisher': - value['publisher'] = TagGroup(tag, group).force_singleton(warn=False).has_no_attributes().text() - elif tag == '{http://www.loc.gov/mods/v3}edition': - value['edition'] = TagGroup(tag, group).force_singleton().has_no_attributes().text() - elif tag == '{http://www.loc.gov/mods/v3}classification': - authorities = {e.attrib['authority'] for e in group} + ) + elif tag == "{http://www.loc.gov/mods/v3}dateCreated": + value["dateCreated"] = ( + TagGroup(tag, group) + .fix_date() + .sort(key=lambda d: d.attrib.get("keyDate") == "yes", reverse=True) + .ignore_attributes() + .force_singleton() + .text() + ) + elif tag == "{http://www.loc.gov/mods/v3}dateCaptured": + value["dateCaptured"] = ( + TagGroup(tag, group) + .fix_date() + .ignore_attributes() + .is_singleton() + .text() + ) + elif tag == "{http://www.loc.gov/mods/v3}dateOther": + value["dateOther"] = ( + TagGroup(tag, group) + .fix_date() + .ignore_attributes() + .is_singleton() + .text() + ) + elif tag == "{http://www.loc.gov/mods/v3}publisher": + value["publisher"] = ( + TagGroup(tag, group) + .force_singleton(warn=False) + .has_no_attributes() + .text() + ) + elif tag == "{http://www.loc.gov/mods/v3}edition": + value["edition"] = ( + TagGroup(tag, group).force_singleton().has_no_attributes().text() + ) + elif tag == "{http://www.loc.gov/mods/v3}classification": + authorities = {e.attrib["authority"] for e in group} for authority in authorities: - sub_group = [e for e in group if e.attrib.get('authority') == authority] - value['classification-{}'.format(authority)] = TagGroup(tag, sub_group).text_set() - elif tag == '{http://www.loc.gov/mods/v3}recordInfo': - value['recordInfo'] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors) - elif tag == '{http://www.loc.gov/mods/v3}recordIdentifier': + sub_group = [e for e in group if e.attrib.get("authority") == authority] + value["classification-{}".format(authority)] = TagGroup( + tag, sub_group + ).text_set() + elif tag == "{http://www.loc.gov/mods/v3}recordInfo": + value["recordInfo"] = ( + TagGroup(tag, group) + .is_singleton() + .has_no_attributes() + .descend(raise_errors) + ) + elif tag == "{http://www.loc.gov/mods/v3}recordIdentifier": # By default we assume source="gbv-ppn" mods:recordIdentifiers (= PPNs), # however, in mods:relatedItems, there may be source="dnb-ppns", # which we need to distinguish by using a separate field name. try: - value['recordIdentifier'] = TagGroup(tag, group).is_singleton().has_attributes({'source': 'gbv-ppn'}).text() + value["recordIdentifier"] = ( + TagGroup(tag, group) + .is_singleton() + .has_attributes({"source": "gbv-ppn"}) + .text() + ) except ValueError: - value['recordIdentifier-dnb-ppn'] = TagGroup(tag, group).is_singleton().has_attributes({'source': 'dnb-ppn'}).text() - elif tag == '{http://www.loc.gov/mods/v3}identifier': + value["recordIdentifier-dnb-ppn"] = ( + TagGroup(tag, group) + .is_singleton() + .has_attributes({"source": "dnb-ppn"}) + .text() + ) + elif tag == "{http://www.loc.gov/mods/v3}identifier": for e in group: if len(e.attrib) != 1: - raise ValueError('Unknown attributes for identifier {}'.format(e.attrib)) - value['identifier-{}'.format(e.attrib['type'])] = e.text - elif tag == '{http://www.loc.gov/mods/v3}titleInfo': + raise ValueError( + "Unknown attributes for identifier {}".format(e.attrib) + ) + value["identifier-{}".format(e.attrib["type"])] = e.text + elif tag == "{http://www.loc.gov/mods/v3}titleInfo": + def only_standard_title(title_info): - return title_info.attrib.get('type') is None - value['titleInfo'] = TagGroup(tag, group) \ - .filter(only_standard_title) \ - .is_singleton().has_no_attributes().descend(raise_errors) - elif tag == '{http://www.loc.gov/mods/v3}title': - value['title'] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif tag == '{http://www.loc.gov/mods/v3}partName': - value['partName'] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif tag == '{http://www.loc.gov/mods/v3}subTitle': - value['subTitle'] = TagGroup(tag, group).force_singleton().has_no_attributes().text() - elif tag == '{http://www.loc.gov/mods/v3}note': + return title_info.attrib.get("type") is None + + value["titleInfo"] = ( + TagGroup(tag, group) + .filter(only_standard_title) + .is_singleton() + .has_no_attributes() + .descend(raise_errors) + ) + elif tag == "{http://www.loc.gov/mods/v3}title": + value["title"] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif tag == "{http://www.loc.gov/mods/v3}partName": + value["partName"] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif tag == "{http://www.loc.gov/mods/v3}subTitle": + value["subTitle"] = ( + TagGroup(tag, group).force_singleton().has_no_attributes().text() + ) + elif tag == "{http://www.loc.gov/mods/v3}note": # This could be useful if distinguished by type attribute. pass - elif tag == '{http://www.loc.gov/mods/v3}part': + elif tag == "{http://www.loc.gov/mods/v3}part": pass - elif tag == '{http://www.loc.gov/mods/v3}abstract': - value['abstract'] = TagGroup(tag, group).has_no_attributes().text() - elif tag == '{http://www.loc.gov/mods/v3}subject': - authorities = {e.attrib.get('authority') for e in group} + elif tag == "{http://www.loc.gov/mods/v3}abstract": + value["abstract"] = TagGroup(tag, group).has_no_attributes().text() + elif tag == "{http://www.loc.gov/mods/v3}subject": + authorities = {e.attrib.get("authority") for e in group} for authority in authorities: - k = 'subject-{}'.format(authority) if authority is not None else 'subject' - sub_group = [e for e in group if e.attrib.get('authority') == authority] - value[k] = TagGroup(tag, sub_group).force_singleton().descend(raise_errors) - elif tag == '{http://www.loc.gov/mods/v3}topic': + k = ( + "subject-{}".format(authority) + if authority is not None + else "subject" + ) + sub_group = [e for e in group if e.attrib.get("authority") == authority] + value[k] = ( + TagGroup(tag, sub_group).force_singleton().descend(raise_errors) + ) + elif tag == "{http://www.loc.gov/mods/v3}topic": TagGroup(tag, group).text_set() - elif tag == '{http://www.loc.gov/mods/v3}cartographics': + elif tag == "{http://www.loc.gov/mods/v3}cartographics": pass - elif tag == '{http://www.loc.gov/mods/v3}geographic': + elif tag == "{http://www.loc.gov/mods/v3}geographic": TagGroup(tag, group).text_set() - elif tag == '{http://www.loc.gov/mods/v3}temporal': + elif tag == "{http://www.loc.gov/mods/v3}temporal": TagGroup(tag, group).text_set() - elif tag == '{http://www.loc.gov/mods/v3}genre': - authorities = {e.attrib.get('authority') for e in group} + elif tag == "{http://www.loc.gov/mods/v3}genre": + authorities = {e.attrib.get("authority") for e in group} for authority in authorities: - k = 'genre-{}'.format(authority) if authority is not None else 'genre' - value[k] = {e.text for e in group if e.attrib.get('authority') == authority} - elif tag == '{http://www.loc.gov/mods/v3}language': - value["language"] = TagGroup(tag, group) \ - .merge_sub_tags_to_set() - elif tag == '{http://www.loc.gov/mods/v3}languageTerm': - value['languageTerm'] = TagGroup(tag, group) \ - .has_attributes({'authority': 'iso639-2b', 'type': 'code'}) \ + k = "genre-{}".format(authority) if authority is not None else "genre" + value[k] = { + e.text for e in group if e.attrib.get("authority") == authority + } + elif tag == "{http://www.loc.gov/mods/v3}language": + value["language"] = TagGroup(tag, group).merge_sub_tags_to_set() + elif tag == "{http://www.loc.gov/mods/v3}languageTerm": + value["languageTerm"] = ( + TagGroup(tag, group) + .has_attributes({"authority": "iso639-2b", "type": "code"}) .text_set() - elif tag == '{http://www.loc.gov/mods/v3}scriptTerm': - value['scriptTerm'] = TagGroup(tag, group) \ - .fix_script_term() \ - .has_attributes({'authority': 'iso15924', 'type': 'code'}) \ + ) + elif tag == "{http://www.loc.gov/mods/v3}scriptTerm": + value["scriptTerm"] = ( + TagGroup(tag, group) + .fix_script_term() + .has_attributes({"authority": "iso15924", "type": "code"}) .text_set() - elif tag == '{http://www.loc.gov/mods/v3}relatedItem': + ) + elif tag == "{http://www.loc.gov/mods/v3}relatedItem": tag_group = TagGroup(tag, group) - for type_, grouped_group in sorted_groupby(tag_group.group, key=lambda g: g.attrib['type']): - sub_tag = 'relatedItem-{}'.format(type_) + for type_, grouped_group in sorted_groupby( + tag_group.group, key=lambda g: g.attrib["type"] + ): + sub_tag = "relatedItem-{}".format(type_) grouped_group = list(grouped_group) if type_ in ["original", "host"]: - value[sub_tag] = TagGroup(sub_tag, grouped_group).is_singleton().descend(raise_errors) + value[sub_tag] = ( + TagGroup(sub_tag, grouped_group) + .is_singleton() + .descend(raise_errors) + ) else: # TODO type="series" pass - elif tag == '{http://www.loc.gov/mods/v3}name': + elif tag == "{http://www.loc.gov/mods/v3}name": for n, e in enumerate(group): - value['name{}'.format(n)] = mods_to_dict(e, raise_errors) - elif tag == '{http://www.loc.gov/mods/v3}role': - value["role"] = TagGroup(tag, group) \ - .has_no_attributes() \ - .merge_sub_tags_to_set() - elif tag == '{http://www.loc.gov/mods/v3}roleTerm': - value['roleTerm'] = TagGroup(tag, group) \ - .has_attributes({'authority': 'marcrelator', 'type': 'code'}) \ + value["name{}".format(n)] = mods_to_dict(e, raise_errors) + elif tag == "{http://www.loc.gov/mods/v3}role": + value["role"] = ( + TagGroup(tag, group).has_no_attributes().merge_sub_tags_to_set() + ) + elif tag == "{http://www.loc.gov/mods/v3}roleTerm": + value["roleTerm"] = ( + TagGroup(tag, group) + .has_attributes({"authority": "marcrelator", "type": "code"}) .text_set() - elif tag == '{http://www.loc.gov/mods/v3}namePart': + ) + elif tag == "{http://www.loc.gov/mods/v3}namePart": for e in group: - if not e.attrib.get('type'): - value['namePart'] = e.text + if not e.attrib.get("type"): + value["namePart"] = e.text else: - value['namePart-{}'.format(e.attrib['type'])] = e.text - elif tag == '{http://www.loc.gov/mods/v3}nameIdentifier': + value["namePart-{}".format(e.attrib["type"])] = e.text + elif tag == "{http://www.loc.gov/mods/v3}nameIdentifier": # TODO Use this (e.g. 106168096) or the # mods:name@valueURI to disambiguate pass - elif tag == '{http://www.loc.gov/mods/v3}displayForm': - value['displayForm'] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif tag == '{http://www.loc.gov/mods/v3}physicalDescription': + elif tag == "{http://www.loc.gov/mods/v3}displayForm": + value["displayForm"] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif tag == "{http://www.loc.gov/mods/v3}physicalDescription": pass - elif tag == '{http://www.loc.gov/mods/v3}extension': + elif tag == "{http://www.loc.gov/mods/v3}extension": pass - elif tag == '{http://www.loc.gov/mods/v3}accessCondition': + elif tag == "{http://www.loc.gov/mods/v3}accessCondition": for e in group: - if not e.attrib.get('type'): - raise ValueError('Unknown attributes for accessCondition {}'.format(e.attrib)) - value['accessCondition-{}'.format(e.attrib['type'])] = e.text - elif tag == '{http://www.loc.gov/mods/v3}typeOfResource': - value['typeOfResource'] = TagGroup(tag, group).is_singleton().has_no_attributes().text() - elif tag == '{http://www.loc.gov/mods/v3}mods': + if not e.attrib.get("type"): + raise ValueError( + "Unknown attributes for accessCondition {}".format(e.attrib) + ) + value["accessCondition-{}".format(e.attrib["type"])] = e.text + elif tag == "{http://www.loc.gov/mods/v3}typeOfResource": + value["typeOfResource"] = ( + TagGroup(tag, group).is_singleton().has_no_attributes().text() + ) + elif tag == "{http://www.loc.gov/mods/v3}mods": # XXX Ignore nested mods:mods for now (used in mods:subject) pass else: @@ -230,30 +351,29 @@ def mets_to_dict(mets, raise_errors=True): value = {} # Iterate through each group of tags - for tag, group in sorted_groupby(mets, key=attrgetter('tag')): + for tag, group in sorted_groupby(mets, key=attrgetter("tag")): group = list(group) # XXX Namespaces seem to use a trailing / sometimes, sometimes not. # (e.g. {http://www.loc.gov/METS/} vs {http://www.loc.gov/METS}) - if tag == '{http://www.loc.gov/METS/}amdSec': + if tag == "{http://www.loc.gov/METS/}amdSec": pass # TODO - elif tag == '{http://www.loc.gov/METS/}dmdSec': + elif tag == "{http://www.loc.gov/METS/}dmdSec": pass # TODO - elif tag == '{http://www.loc.gov/METS/}metsHdr': + elif tag == "{http://www.loc.gov/METS/}metsHdr": pass # TODO - elif tag == '{http://www.loc.gov/METS/}structLink': + elif tag == "{http://www.loc.gov/METS/}structLink": pass # TODO - elif tag == '{http://www.loc.gov/METS/}structMap': + elif tag == "{http://www.loc.gov/METS/}structMap": pass # TODO - elif tag == '{http://www.loc.gov/METS/}fileSec': - value['fileSec'] = TagGroup(tag, group) \ - .is_singleton().descend(raise_errors) - elif tag == '{http://www.loc.gov/METS/}fileGrp': + elif tag == "{http://www.loc.gov/METS/}fileSec": + value["fileSec"] = TagGroup(tag, group).is_singleton().descend(raise_errors) + elif tag == "{http://www.loc.gov/METS/}fileGrp": for e in group: - use = e.attrib.get('USE') + use = e.attrib.get("USE") if not use: - raise ValueError('No USE attribute for fileGrp {}'.format(e)) - value[f'fileGrp-{use}-count'] = len(e) + raise ValueError("No USE attribute for fileGrp {}".format(e)) + value[f"fileGrp-{use}-count"] = len(e) else: if raise_errors: print(value) @@ -262,6 +382,7 @@ def mets_to_dict(mets, raise_errors=True): pass return value + def pages_to_dict(mets, raise_errors=True) -> List[Dict]: # TODO replace asserts by ValueError @@ -269,23 +390,36 @@ def pages_to_dict(mets, raise_errors=True) -> List[Dict]: # PPN def get_mets_recordIdentifier(*, source="gbv-ppn"): - return (mets.xpath(f'//mets:dmdSec[1]//mods:mods/mods:recordInfo/mods:recordIdentifier[@source="{source}"]', - namespaces=ns) or [None])[0].text + return ( + mets.xpath( + f'//mets:dmdSec[1]//mods:mods/mods:recordInfo/mods:recordIdentifier[@source="{source}"]', + namespaces=ns, + ) + or [None] + )[0].text + ppn = get_mets_recordIdentifier() # Getting per-page/structure information is a bit different structMap_PHYSICAL = mets.find('./mets:structMap[@TYPE="PHYSICAL"]', ns) structMap_LOGICAL = mets.find('./mets:structMap[@TYPE="LOGICAL"]', ns) - fileSec = mets.find('./mets:fileSec', ns) + fileSec = mets.find("./mets:fileSec", ns) if structMap_PHYSICAL is None: # This is expected in a multivolume work or periodical! if any( - structMap_LOGICAL.find(f'./mets:div[@TYPE="{t}"]', ns) is not None - for t in ["multivolume_work", "MultivolumeWork", "multivolume_manuscript", "periodical"] + structMap_LOGICAL.find(f'./mets:div[@TYPE="{t}"]', ns) is not None + for t in [ + "multivolume_work", + "MultivolumeWork", + "multivolume_manuscript", + "periodical", + ] ): return [] else: - raise ValueError("No structMap[@TYPE='PHYSICAL'] found (but not a multivolume work)") + raise ValueError( + "No structMap[@TYPE='PHYSICAL'] found (but not a multivolume work)" + ) if structMap_LOGICAL is None: raise ValueError("No structMap[@TYPE='LOGICAL'] found") if fileSec is None: @@ -294,13 +428,14 @@ def pages_to_dict(mets, raise_errors=True) -> List[Dict]: div_physSequence = structMap_PHYSICAL[0] assert div_physSequence.attrib.get("TYPE") == "physSequence" - # Build a look-up table to get mets:file by @ID # This cuts retrieving the mets:file down to half the time. mets_file_by_ID = {} + def _init_mets_file_by_ID(): - for f in fileSec.iterfind('./mets:fileGrp/mets:file', ns): + for f in fileSec.iterfind("./mets:fileGrp/mets:file", ns): mets_file_by_ID[f.attrib.get("ID")] = f + _init_mets_file_by_ID() def get_mets_file(*, ID): @@ -312,7 +447,6 @@ def pages_to_dict(mets, raise_errors=True) -> List[Dict]: return structMap_LOGICAL.findall(f'.//mets:div[@ID="{ID}"]', ns) for page in div_physSequence: - # TODO sort by ORDER? assert page.attrib.get("TYPE") == "page" page_dict = {} @@ -326,7 +460,9 @@ def pages_to_dict(mets, raise_errors=True) -> List[Dict]: file_ = get_mets_file(ID=file_id) assert file_ is not None fileGrp_USE = file_.getparent().attrib.get("USE") - file_FLocat_href = (file_.xpath('mets:FLocat/@xlink:href', namespaces=ns) or [None])[0] + file_FLocat_href = ( + file_.xpath("mets:FLocat/@xlink:href", namespaces=ns) or [None] + )[0] if file_FLocat_href is not None: file_FLocat_href = str(file_FLocat_href) page_dict[f"fileGrp_{fileGrp_USE}_file_FLocat_href"] = file_FLocat_href @@ -343,7 +479,7 @@ def pages_to_dict(mets, raise_errors=True) -> List[Dict]: # it suffices to do this the old-fashioned way. sm_links = mets.findall( - f'./mets:structLink/mets:smLink[@xlink:to="{to_phys}"]', ns + f'./mets:structLink/mets:smLink[@xlink:to="{to_phys}"]', ns ) targets = [] @@ -378,10 +514,19 @@ def pages_to_dict(mets, raise_errors=True) -> List[Dict]: @click.command() -@click.argument('mets_files', type=click.Path(exists=True), required=True, nargs=-1) -@click.option('--output', '-o', 'output_file', type=click.Path(), help='Output Parquet file', - default='mods_info_df.parquet', show_default=True) -@click.option('--output-page-info', type=click.Path(), help='Output page info Parquet file') +@click.argument("mets_files", type=click.Path(exists=True), required=True, nargs=-1) +@click.option( + "--output", + "-o", + "output_file", + type=click.Path(), + help="Output Parquet file", + default="mods_info_df.parquet", + show_default=True, +) +@click.option( + "--output-page-info", type=click.Path(), help="Output page info Parquet file" +) def process_command(mets_files: list[str], output_file: str, output_page_info: str): """ A tool to convert the MODS metadata in INPUT to a pandas DataFrame. @@ -395,18 +540,21 @@ def process_command(mets_files: list[str], output_file: str, output_page_info: s """ process(mets_files, output_file, output_page_info) + def process(mets_files: list[str], output_file: str, output_page_info: str): # Extend file list if directories are given mets_files_real: list[str] = [] for m in mets_files: if os.path.isdir(m): - logger.info('Scanning directory {}'.format(m)) - mets_files_real.extend(f.path for f in tqdm(os.scandir(m), leave=False) - if f.is_file() and not f.name.startswith('.')) + logger.info("Scanning directory {}".format(m)) + mets_files_real.extend( + f.path + for f in tqdm(os.scandir(m), leave=False) + if f.is_file() and not f.name.startswith(".") + ) else: mets_files_real.append(m) - # Prepare output files with contextlib.suppress(FileNotFoundError): os.remove(output_file) @@ -414,28 +562,28 @@ def process(mets_files: list[str], output_file: str, output_page_info: str): with contextlib.suppress(FileNotFoundError): os.remove(output_file_sqlite3) - logger.info('Writing SQLite DB to {}'.format(output_file_sqlite3)) + logger.info("Writing SQLite DB to {}".format(output_file_sqlite3)) con = sqlite3.connect(output_file_sqlite3) if output_page_info: output_page_info_sqlite3 = output_page_info + ".sqlite3" - logger.info('Writing SQLite DB to {}'.format(output_page_info_sqlite3)) + logger.info("Writing SQLite DB to {}".format(output_page_info_sqlite3)) with contextlib.suppress(FileNotFoundError): os.remove(output_page_info_sqlite3) con_page_info = sqlite3.connect(output_page_info_sqlite3) # Process METS files - with open(output_file + '.warnings.csv', 'w') as csvfile: + with open(output_file + ".warnings.csv", "w") as csvfile: csvwriter = csv.writer(csvfile) - logger.info('Processing METS files') + logger.info("Processing METS files") for mets_file in tqdm(mets_files_real, leave=True): try: root = ET.parse(mets_file).getroot() - mets = root # XXX .find('mets:mets', ns) does not work here - mods = root.find('mets:dmdSec//mods:mods', ns) + mets = root # XXX .find('mets:mets', ns) does not work here + mods = root.find("mets:dmdSec//mods:mods", ns) with warnings.catch_warnings(record=True) as caught_warnings: - warnings.simplefilter('always') # do NOT filter double occurrences + warnings.simplefilter("always") # do NOT filter double occurrences # MODS d = flatten(mods_to_dict(mods, raise_errors=True)) @@ -445,7 +593,7 @@ def process(mets_files: list[str], output_file: str, output_page_info: str): for k, v in d_mets.items(): d[f"mets_{k}"] = v # "meta" - d['mets_file'] = mets_file + d["mets_file"] = mets_file # Save insert_into_db(con, "mods_info", d) @@ -453,8 +601,12 @@ def process(mets_files: list[str], output_file: str, output_page_info: str): # METS - per-page if output_page_info: - page_info_doc: list[dict] = pages_to_dict(mets, raise_errors=True) - insert_into_db_multiple(con_page_info, "page_info", page_info_doc) + page_info_doc: list[dict] = pages_to_dict( + mets, raise_errors=True + ) + insert_into_db_multiple( + con_page_info, "page_info", page_info_doc + ) con_page_info.commit() if caught_warnings: @@ -463,13 +615,15 @@ def process(mets_files: list[str], output_file: str, output_page_info: str): for caught_warning in caught_warnings: csvwriter.writerow([mets_file, caught_warning.message]) except Exception as e: - logger.exception('Exception in {}'.format(mets_file)) + logger.exception("Exception in {}".format(mets_file)) - logger.info('Writing DataFrame to {}'.format(output_file)) + logger.info("Writing DataFrame to {}".format(output_file)) convert_db_to_parquet(con, "mods_info", "recordInfo_recordIdentifier", output_file) if output_page_info: - logger.info('Writing DataFrame to {}'.format(output_page_info)) - convert_db_to_parquet(con_page_info, "page_info", ["ppn", "ID"], output_page_info) + logger.info("Writing DataFrame to {}".format(output_page_info)) + convert_db_to_parquet( + con_page_info, "page_info", ["ppn", "ID"], output_page_info + ) def main(): @@ -481,5 +635,5 @@ def main(): process_command() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/mods4pandas/tests/test_alto.py b/src/mods4pandas/tests/test_alto.py index adf931f..139a2db 100644 --- a/src/mods4pandas/tests/test_alto.py +++ b/src/mods4pandas/tests/test_alto.py @@ -9,14 +9,17 @@ from mods4pandas.lib import flatten TESTS_DATA_DIR = Path(__file__).parent / "data" + def dict_fromstring(x): - return flatten(alto_to_dict(ET.fromstring(x))) + return flatten(alto_to_dict(ET.fromstring(x))) + def test_Page_counts(): """ Elements below Layout/Page should be counted """ - d = dict_fromstring(""" + d = dict_fromstring( + """ @@ -37,13 +40,16 @@ def test_Page_counts(): - """) - assert d['Layout_Page_TextBlock-count'] == 1 - assert d['Layout_Page_TextLine-count'] == 3 - assert d['Layout_Page_String-count'] == 6 + """ + ) + assert d["Layout_Page_TextBlock-count"] == 1 + assert d["Layout_Page_TextLine-count"] == 3 + assert d["Layout_Page_String-count"] == 6 + def test_Tags_counts(): - d = dict_fromstring(""" + d = dict_fromstring( + """ @@ -57,11 +63,14 @@ def test_Tags_counts(): - """) - assert d['Tags_NamedEntityTag-count'] == 9 + """ + ) + assert d["Tags_NamedEntityTag-count"] == 9 + def test_String_TAGREF_counts(): - d = dict_fromstring(""" + d = dict_fromstring( + """ @@ -80,9 +89,10 @@ def test_String_TAGREF_counts(): - """) - assert d['Layout_Page_//alto:String[@TAGREFS]-count'] == 3 - assert d['Layout_Page_String-count'] == 4 + """ + ) + assert d["Layout_Page_//alto:String[@TAGREFS]-count"] == 3 + assert d["Layout_Page_String-count"] == 4 def test_dtypes(tmp_path): @@ -100,9 +110,9 @@ def test_dtypes(tmp_path): r"Layout_Page_//alto:String/@WC-.*": ("Float64", None), r".*-count": ("Int64", None), r"alto_xmlns": ("object", ["str", "NoneType"]), - r"Layout_Page_(WIDTH|HEIGHT)": ("Int64", None), } + def expected_types(c): """Return the expected types for column c.""" for r, types in EXPECTED_TYPES.items(): @@ -126,7 +136,8 @@ def test_dtypes(tmp_path): if edt == "object": inner_types = set(type(v).__name__ for v in df[c]) - assert all(it in einner_types for it in inner_types), \ - f"Unexpected inner types {inner_types} for column {c} (expected {einner_types})" + assert all( + it in einner_types for it in inner_types + ), f"Unexpected inner types {inner_types} for column {c} (expected {einner_types})" - check_types(alto_info_df) \ No newline at end of file + check_types(alto_info_df) diff --git a/src/mods4pandas/tests/test_mets.py b/src/mods4pandas/tests/test_mets.py index f06cc04..ebe0b2a 100644 --- a/src/mods4pandas/tests/test_mets.py +++ b/src/mods4pandas/tests/test_mets.py @@ -6,15 +6,17 @@ from mods4pandas.lib import flatten def dict_fromstring(x): - """Helper function to parse a METS/MODS XML string to a flattened dict""" - return flatten(mets_to_dict(ET.fromstring(x))) - # XXX move to test lib + """Helper function to parse a METS/MODS XML string to a flattened dict""" + return flatten(mets_to_dict(ET.fromstring(x))) + # XXX move to test lib + def test_fileGrp(): """ Elements of mets:fileGrp should be counted """ - d = dict_fromstring(""" + d = dict_fromstring( + """ @@ -31,5 +33,6 @@ def test_fileGrp(): - """) - assert d['fileSec_fileGrp-PRESENTATION-count'] == 3 + """ + ) + assert d["fileSec_fileGrp-PRESENTATION-count"] == 3 diff --git a/src/mods4pandas/tests/test_mods4pandas.py b/src/mods4pandas/tests/test_mods4pandas.py index 0707a74..8814fbf 100644 --- a/src/mods4pandas/tests/test_mods4pandas.py +++ b/src/mods4pandas/tests/test_mods4pandas.py @@ -10,36 +10,45 @@ from mods4pandas.lib import flatten TESTS_DATA_DIR = Path(__file__).parent / "data" + def dict_fromstring(x): """Helper function to parse a MODS XML string to a flattened dict""" return flatten(mods_to_dict(ET.fromstring(x))) + def test_single_language_languageTerm(): - d = dict_fromstring(""" + d = dict_fromstring( + """ lat ger - """) - assert d['language_languageTerm'] == {'ger', 'lat'} + """ + ) + assert d["language_languageTerm"] == {"ger", "lat"} + def test_multitple_language_languageTerm(): """ Different languages MAY have multiple mods:language elements. See MODS-AP 2.3.1 """ - d = dict_fromstring(""" + d = dict_fromstring( + """ lat ger - """) - assert d['language_languageTerm'] == {'ger', 'lat'} + """ + ) + assert d["language_languageTerm"] == {"ger", "lat"} + def test_role_roleTerm(): - d = dict_fromstring(""" + d = dict_fromstring( + """ Wurm, Mary @@ -51,14 +60,17 @@ def test_role_roleTerm(): - """) - assert d['name0_role_roleTerm'] == {'cmp'} + """ + ) + assert d["name0_role_roleTerm"] == {"cmp"} + def test_multiple_role_roleTerm(): """ Multiple mods:role/mods:roleTerm should be merged into one column. """ - d = dict_fromstring(""" + d = dict_fromstring( + """ Wurm, Mary @@ -73,8 +85,10 @@ def test_multiple_role_roleTerm(): - """) - assert d['name0_role_roleTerm'] == {'cmp', 'aut'} + """ + ) + assert d["name0_role_roleTerm"] == {"cmp", "aut"} + def test_scriptTerm(): """ @@ -82,7 +96,8 @@ def test_scriptTerm(): See MODS-AP 2.3.1. """ - d = dict_fromstring(""" + d = dict_fromstring( + """ ger @@ -94,44 +109,59 @@ def test_scriptTerm(): 216 - """) - assert d['language_scriptTerm'] == {'215', '216', '217'} + """ + ) + assert d["language_scriptTerm"] == {"215", "216", "217"} + def test_recordInfo(): - d = dict_fromstring(""" + d = dict_fromstring( + """ PPN610714341 - """) - assert d['recordInfo_recordIdentifier'] == 'PPN610714341' + """ + ) + assert d["recordInfo_recordIdentifier"] == "PPN610714341" + def test_accessCondition(): - d = dict_fromstring(""" + d = dict_fromstring( + """ UNKNOWN - """) - assert d['accessCondition-use and reproduction'] == 'UNKNOWN' + """ + ) + assert d["accessCondition-use and reproduction"] == "UNKNOWN" + def test_originInfo_no_event_type(): with pytest.warns(UserWarning) as ws: - d = dict_fromstring(""" + d = dict_fromstring( + """ Berlin - """) + """ + ) assert d == {} # empty assert len(ws) == 1 - assert ws[0].message.args[0] == 'Filtered {http://www.loc.gov/mods/v3}originInfo element (has no eventType)' + assert ( + ws[0].message.args[0] + == "Filtered {http://www.loc.gov/mods/v3}originInfo element (has no eventType)" + ) + def test_relatedItem(): - d = dict_fromstring(""" + d = dict_fromstring( + """ @@ -139,12 +169,14 @@ def test_relatedItem(): - """) + """ + ) - assert d['relatedItem-original_recordInfo_recordIdentifier'] == 'PPN167755803' + assert d["relatedItem-original_recordInfo_recordIdentifier"] == "PPN167755803" # mods:relatedItem may also have source="dnb-ppn" recordIdentifiers: - d = dict_fromstring(""" + d = dict_fromstring( + """ @@ -152,12 +184,16 @@ def test_relatedItem(): - """) + """ + ) + + assert d["relatedItem-original_recordInfo_recordIdentifier-dnb-ppn"] == "1236513355" - assert d['relatedItem-original_recordInfo_recordIdentifier-dnb-ppn'] == '1236513355' def test_dtypes(tmp_path): - mets_files = [p.absolute().as_posix() for p in (TESTS_DATA_DIR / "mets-mods").glob("*.xml")] + mets_files = [ + p.absolute().as_posix() for p in (TESTS_DATA_DIR / "mets-mods").glob("*.xml") + ] mods_info_df_parquet = (tmp_path / "test_dtypes_mods_info.parquet").as_posix() page_info_df_parquet = (tmp_path / "test_dtypes_page_info.parquet").as_posix() process(mets_files, mods_info_df_parquet, page_info_df_parquet) @@ -166,7 +202,6 @@ def test_dtypes(tmp_path): EXPECTED_TYPES = { # mods_info - r"mets_file": ("object", ["str"]), r"titleInfo_title": ("object", ["str"]), r"titleInfo_subTitle": ("object", ["str", "NoneType"]), @@ -179,19 +214,16 @@ def test_dtypes(tmp_path): r"typeOfResource": ("object", ["str", "NoneType"]), r"accessCondition-.*": ("object", ["str", "NoneType"]), r"originInfo-.*": ("object", ["str", "NoneType"]), - r".*-count": ("Int64", None), - r"genre-.*": ("object", ["ndarray", "NoneType"]), r"subject-.*": ("object", ["ndarray", "NoneType"]), r"language_.*Term": ("object", ["ndarray", "NoneType"]), r"classification-.*": ("object", ["ndarray", "NoneType"]), - # page_info - r"fileGrp_.*_file_FLocat_href": ("object", ["str", "NoneType"]), r"structMap-LOGICAL_TYPE_.*": ("boolean", None), } + def expected_types(c): """Return the expected types for column c.""" for r, types in EXPECTED_TYPES.items(): @@ -215,8 +247,9 @@ def test_dtypes(tmp_path): if edt == "object": inner_types = set(type(v).__name__ for v in df[c]) - assert all(it in einner_types for it in inner_types), \ - f"Unexpected inner types {inner_types} for column {c} (expected {einner_types})" + assert all( + it in einner_types for it in inner_types + ), f"Unexpected inner types {inner_types} for column {c} (expected {einner_types})" check_types(mods_info_df) - check_types(page_info_df) \ No newline at end of file + check_types(page_info_df) diff --git a/src/mods4pandas/tests/test_page_info.py b/src/mods4pandas/tests/test_page_info.py index eb29f9a..d753c77 100644 --- a/src/mods4pandas/tests/test_page_info.py +++ b/src/mods4pandas/tests/test_page_info.py @@ -10,8 +10,8 @@ TESTS_DATA_DIR = Path(__file__).parent / "data" def removeprefix(s, prefix): - if sys.version_info < (3,9): - return s[len(prefix):] if s.startswith(prefix) else s + if sys.version_info < (3, 9): + return s[len(prefix) :] if s.startswith(prefix) else s else: return s.removeprefix(prefix) @@ -26,20 +26,32 @@ def test_page_info(): assert all(p["ppn"] == "PPN821507109" for p in page_info) # Look closer at an interesting page - from pprint import pprint; pprint(page_info[0]) + from pprint import pprint + + pprint(page_info[0]) page_info_page = next(p for p in page_info if p["ID"] == "PHYS_0005") - assert page_info_page["fileGrp_PRESENTATION_file_FLocat_href"] == "file:///goobi/tiff001/sbb/PPN821507109/00000005.tif" + assert ( + page_info_page["fileGrp_PRESENTATION_file_FLocat_href"] + == "file:///goobi/tiff001/sbb/PPN821507109/00000005.tif" + ) # This is a title page with an illustration, check that we correctly got this info from the # structMap. - struct_types = sorted(removeprefix(k, "structMap-LOGICAL_TYPE_") for k, v in page_info_page.items() if k.startswith("structMap-LOGICAL_TYPE_") and v == 1) + struct_types = sorted( + removeprefix(k, "structMap-LOGICAL_TYPE_") + for k, v in page_info_page.items() + if k.startswith("structMap-LOGICAL_TYPE_") and v == 1 + ) assert struct_types == ["illustration", "monograph", "title_page"] def test_page_info_multivolume_work(): """Test creation of page_info for multivolume_work""" - mets = ET.parse(TESTS_DATA_DIR / "mets-mods" / "PPN717884805-multivolume_work-no-structMap-PHYSICAL.xml") + mets = ET.parse( + TESTS_DATA_DIR + / "mets-mods" + / "PPN717884805-multivolume_work-no-structMap-PHYSICAL.xml" + ) page_info = pages_to_dict(mets) assert page_info == [] -