#!/usr/bin/env python3 import csv import logging import os import re import warnings from lxml import etree as ET from itertools import groupby from operator import attrgetter from typing import List from collections.abc import MutableMapping, Sequence import click import pandas as pd from tqdm import tqdm from .lib import sorted_groupby, TagGroup, ns, flatten logger = logging.getLogger('modstool') def mods_to_dict(mods, raise_errors=True): """Convert MODS metadata to a nested dictionary""" # The approach taken here is to handle each element explicitly. This also means that ignored elements are ignored # explicitly. value = {} # Iterate through each group of tags for tag, group in sorted_groupby(mods, key=attrgetter('tag')): group = list(group) if tag == '{http://www.loc.gov/mods/v3}location': def only_current_location(location): return location.get('type') != 'former' value['location'] = TagGroup(tag, group) \ .filter(only_current_location) \ .has_attributes([{}, {'type': 'current'}]) \ .is_singleton().descend(raise_errors) elif tag == '{http://www.loc.gov/mods/v3}physicalLocation': def no_display_label(physical_location): return physical_location.get('displayLabel') is None value['physicalLocation'] = TagGroup(tag, group).filter(no_display_label).text() elif tag == '{http://www.loc.gov/mods/v3}shelfLocator': # This element should not be repeated according to MODS-AP 2.3.1, however a few of the files contain # a second element with empty text and a "displayLabel" attribute set. def no_display_label(shelf_locator): return shelf_locator.get('displayLabel') is None value['shelfLocator'] = TagGroup(tag, group) \ .filter(no_display_label) \ .force_singleton() \ .has_no_attributes() \ .text() elif tag == '{http://www.loc.gov/mods/v3}originInfo': def has_event_type(origin_info): # According to MODS-AP 2.3.1, every originInfo should have its eventType set. However, some # are empty and not fixable. return origin_info.attrib.get('eventType') is not None tag_group = TagGroup(tag, group).fix_event_type().filter(has_event_type, warn="has no eventType") for event_type, grouped_group in sorted_groupby(tag_group.group, key=lambda g: g.attrib['eventType']): for n, e in enumerate(grouped_group): value['originInfo-{}{}'.format(event_type, n)] = mods_to_dict(e, raise_errors) elif tag == '{http://www.loc.gov/mods/v3}place': value['place'] = TagGroup(tag, group).force_singleton(warn=False).has_no_attributes().descend(raise_errors) elif tag == '{http://www.loc.gov/mods/v3}placeTerm': value['placeTerm'] = TagGroup(tag, group).is_singleton().has_attributes({'type': 'text'}).text() elif tag == '{http://www.loc.gov/mods/v3}dateIssued': value['dateIssued'] = TagGroup(tag, group) \ .fix_date() \ .sort(key=lambda d: d.attrib.get('keyDate') == 'yes', reverse=True) \ .ignore_attributes() \ .force_singleton() \ .text() elif tag == '{http://www.loc.gov/mods/v3}dateCreated': value['dateCreated'] = TagGroup(tag, group) \ .fix_date() \ .sort(key=lambda d: d.attrib.get('keyDate') == 'yes', reverse=True) \ .ignore_attributes() \ .force_singleton() \ .text() elif tag == '{http://www.loc.gov/mods/v3}dateCaptured': value['dateCaptured'] = TagGroup(tag, group).fix_date().ignore_attributes().is_singleton().text() elif tag == '{http://www.loc.gov/mods/v3}dateOther': value['dateOther'] = TagGroup(tag, group).fix_date().ignore_attributes().is_singleton().text() elif tag == '{http://www.loc.gov/mods/v3}publisher': value['publisher'] = TagGroup(tag, group).force_singleton(warn=False).has_no_attributes().text() elif tag == '{http://www.loc.gov/mods/v3}edition': value['edition'] = TagGroup(tag, group).force_singleton().has_no_attributes().text() elif tag == '{http://www.loc.gov/mods/v3}classification': authorities = {e.attrib['authority'] for e in group} for authority in authorities: sub_group = [e for e in group if e.attrib.get('authority') == authority] value['classification-{}'.format(authority)] = TagGroup(tag, sub_group).text_set() elif tag == '{http://www.loc.gov/mods/v3}recordInfo': value['recordInfo'] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors) elif tag == '{http://www.loc.gov/mods/v3}recordIdentifier': value['recordIdentifier'] = TagGroup(tag, group).is_singleton().has_attributes({'source': 'gbv-ppn'}).text() elif tag == '{http://www.loc.gov/mods/v3}identifier': for e in group: if len(e.attrib) != 1: raise ValueError('Unknown attributes for identifier {}'.format(e.attrib)) value['identifier-{}'.format(e.attrib['type'])] = e.text elif tag == '{http://www.loc.gov/mods/v3}titleInfo': def only_standard_title(title_info): return title_info.attrib.get('type') is None value['titleInfo'] = TagGroup(tag, group) \ .filter(only_standard_title) \ .is_singleton().has_no_attributes().descend(raise_errors) elif tag == '{http://www.loc.gov/mods/v3}title': value['title'] = TagGroup(tag, group).is_singleton().has_no_attributes().text() elif tag == '{http://www.loc.gov/mods/v3}partName': value['partName'] = TagGroup(tag, group).is_singleton().has_no_attributes().text() elif tag == '{http://www.loc.gov/mods/v3}subTitle': value['subTitle'] = TagGroup(tag, group).force_singleton().has_no_attributes().text() elif tag == '{http://www.loc.gov/mods/v3}note': # This could be useful if distinguished by type attribute. pass elif tag == '{http://www.loc.gov/mods/v3}part': pass elif tag == '{http://www.loc.gov/mods/v3}abstract': value['abstract'] = TagGroup(tag, group).has_no_attributes().text() elif tag == '{http://www.loc.gov/mods/v3}subject': authorities = {e.attrib.get('authority') for e in group} for authority in authorities: k = 'subject-{}'.format(authority) if authority is not None else 'subject' sub_group = [e for e in group if e.attrib.get('authority') == authority] value[k] = TagGroup(tag, sub_group).force_singleton().descend(raise_errors) elif tag == '{http://www.loc.gov/mods/v3}topic': TagGroup(tag, group).text_set() elif tag == '{http://www.loc.gov/mods/v3}cartographics': pass elif tag == '{http://www.loc.gov/mods/v3}geographic': TagGroup(tag, group).text_set() elif tag == '{http://www.loc.gov/mods/v3}temporal': TagGroup(tag, group).text_set() elif tag == '{http://www.loc.gov/mods/v3}genre': authorities = {e.attrib.get('authority') for e in group} for authority in authorities: k = 'genre-{}'.format(authority) if authority is not None else 'genre' value[k] = {e.text for e in group if e.attrib.get('authority') == authority} elif tag == '{http://www.loc.gov/mods/v3}language': value["language"] = TagGroup(tag, group) \ .merge_sub_tags_to_set() elif tag == '{http://www.loc.gov/mods/v3}languageTerm': value['languageTerm'] = TagGroup(tag, group) \ .has_attributes({'authority': 'iso639-2b', 'type': 'code'}) \ .text_set() elif tag == '{http://www.loc.gov/mods/v3}scriptTerm': value['scriptTerm'] = TagGroup(tag, group) \ .fix_script_term() \ .has_attributes({'authority': 'iso15924', 'type': 'code'}) \ .text_set() elif tag == '{http://www.loc.gov/mods/v3}relatedItem': pass elif tag == '{http://www.loc.gov/mods/v3}name': for n, e in enumerate(group): value['name{}'.format(n)] = mods_to_dict(e, raise_errors) elif tag == '{http://www.loc.gov/mods/v3}role': value["role"] = TagGroup(tag, group) \ .has_no_attributes() \ .merge_sub_tags_to_set() elif tag == '{http://www.loc.gov/mods/v3}roleTerm': value['roleTerm'] = TagGroup(tag, group) \ .has_attributes({'authority': 'marcrelator', 'type': 'code'}) \ .text_set() elif tag == '{http://www.loc.gov/mods/v3}namePart': for e in group: if not e.attrib.get('type'): value['namePart'] = e.text else: value['namePart-{}'.format(e.attrib['type'])] = e.text elif tag == '{http://www.loc.gov/mods/v3}nameIdentifier': # TODO Use this (e.g. 106168096) or the # mods:name@valueURI to disambiguate pass elif tag == '{http://www.loc.gov/mods/v3}displayForm': value['displayForm'] = TagGroup(tag, group).is_singleton().has_no_attributes().text() elif tag == '{http://www.loc.gov/mods/v3}physicalDescription': pass elif tag == '{http://www.loc.gov/mods/v3}extension': pass elif tag == '{http://www.loc.gov/mods/v3}accessCondition': for e in group: if not e.attrib.get('type'): raise ValueError('Unknown attributes for accessCondition {}'.format(e.attrib)) value['accessCondition-{}'.format(e.attrib['type'])] = e.text elif tag == '{http://www.loc.gov/mods/v3}typeOfResource': value['typeOfResource'] = TagGroup(tag, group).is_singleton().has_no_attributes().text() elif tag == '{http://www.loc.gov/mods/v3}mods': # XXX Ignore nested mods:mods for now (used in mods:subject) pass else: if raise_errors: raise ValueError('Unknown tag "{}"'.format(tag)) else: pass return value def mets_to_dict(mets, raise_errors=True): """Convert METS metadata to a nested dictionary""" # The approach taken here is to handle each element explicitly. This also means that ignored elements are ignored # explicitly. value = {} # Iterate through each group of tags for tag, group in sorted_groupby(mets, key=attrgetter('tag')): group = list(group) # XXX Namespaces seem to use a trailing / sometimes, sometimes not. # (e.g. {http://www.loc.gov/METS/} vs {http://www.loc.gov/METS}) if tag == '{http://www.loc.gov/METS/}amdSec': pass # TODO elif tag == '{http://www.loc.gov/METS/}dmdSec': pass # TODO elif tag == '{http://www.loc.gov/METS/}metsHdr': pass # TODO elif tag == '{http://www.loc.gov/METS/}structLink': pass # TODO elif tag == '{http://www.loc.gov/METS/}structMap': pass # TODO elif tag == '{http://www.loc.gov/METS/}fileSec': value['fileSec'] = TagGroup(tag, group) \ .is_singleton().descend(raise_errors) elif tag == '{http://www.loc.gov/METS/}fileGrp': for e in group: use = e.attrib.get('USE') if not use: raise ValueError('No USE attribute for fileGrp {}'.format(e)) value[f'fileGrp-{use}-count'] = len(e) else: if raise_errors: print(value) raise ValueError('Unknown tag "{}"'.format(tag)) else: pass return value @click.command() @click.argument('mets_files', type=click.Path(exists=True), required=True, nargs=-1) @click.option('--output', '-o', 'output_file', type=click.Path(), help='Output pickle file', default='mods_info_df.pkl', show_default=True) @click.option('--output-csv', type=click.Path(), help='Output CSV file') @click.option('--output-xlsx', type=click.Path(), help='Output Excel .xlsx file') def process(mets_files: List[str], output_file: str, output_csv: str, output_xlsx: str): """ A tool to convert the MODS metadata in INPUT to a pandas DataFrame. INPUT is assumed to be a METS document with MODS metadata. INPUT may optionally be a directory. The tool then reads all files in the directory. modstool writes two output files: A pickled pandas DataFrame and a CSV file with all conversion warnings. """ # Extend file list if directories are given mets_files_real = [] for m in mets_files: if os.path.isdir(m): logger.info('Scanning directory {}'.format(m)) mets_files_real.extend(f.path for f in tqdm(os.scandir(m), leave=False) if f.is_file() and not f.name.startswith('.')) else: mets_files_real.append(m) # Process METS files with open(output_file + '.warnings.csv', 'w') as csvfile: csvwriter = csv.writer(csvfile) mods_info = [] logger.info('Processing METS files') for mets_file in tqdm(mets_files_real, leave=False): try: root = ET.parse(mets_file).getroot() mets = root # XXX .find('mets:mets', ns) does not work here mods = root.find('mets:dmdSec//mods:mods', ns) with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter('always') # do NOT filter double occurrences # MODS d = flatten(mods_to_dict(mods, raise_errors=True)) # METS d_mets = flatten(mets_to_dict(mets, raise_errors=True)) for k, v in d_mets.items(): d[f"mets_{k}"] = v # "meta" d['mets_file'] = mets_file mods_info.append(d) if caught_warnings: # PyCharm thinks caught_warnings is not Iterable: # noinspection PyTypeChecker for caught_warning in caught_warnings: csvwriter.writerow([mets_file, caught_warning.message]) except Exception as e: logger.error('Exception in {}: {}'.format(mets_file, e)) #import traceback; traceback.print_exc() # Convert the mods_info List[Dict] to a pandas DataFrame columns = [] for m in mods_info: for c in m.keys(): if c not in columns: columns.append(c) data = [[m.get(c) for c in columns] for m in mods_info] index = [m['recordInfo_recordIdentifier'] for m in mods_info] # PPN mods_info_df = pd.DataFrame(data=data, index=index, columns=columns) # Pickle the DataFrame logger.info('Writing DataFrame to {}'.format(output_file)) mods_info_df.to_pickle(output_file) if output_csv: logger.info('Writing CSV to {}'.format(output_csv)) mods_info_df.to_csv(output_csv) if output_xlsx: logger.info('Writing Excel .xlsx to {}'.format(output_xlsx)) mods_info_df.to_excel(output_xlsx) def main(): logging.basicConfig(level=logging.INFO) for prefix, uri in ns.items(): ET.register_namespace(prefix, uri) process() if __name__ == '__main__': main()