You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
modstool/qurator/mods4pandas/mods4pandas.py

465 lines
21 KiB
Python

#!/usr/bin/env python3
import csv
import logging
import os
import re
import warnings
from lxml import etree as ET
from itertools import groupby
from operator import attrgetter
from typing import Dict, List
from collections.abc import MutableMapping, Sequence
import click
import pandas as pd
from tqdm import tqdm
from .lib import sorted_groupby, TagGroup, ns, flatten, dicts_to_df
logger = logging.getLogger('mods4pandas')
def mods_to_dict(mods, raise_errors=True):
"""Convert MODS metadata to a nested dictionary"""
# The approach taken here is to handle each element explicitly. This also means that ignored elements are ignored
# explicitly.
value = {}
# Iterate through each group of tags
for tag, group in sorted_groupby(mods, key=attrgetter('tag')):
group = list(group)
if tag == '{http://www.loc.gov/mods/v3}location':
def only_current_location(location):
return location.get('type') != 'former'
value['location'] = TagGroup(tag, group) \
.filter(only_current_location) \
.has_attributes([{}, {'type': 'current'}]) \
.is_singleton().descend(raise_errors)
elif tag == '{http://www.loc.gov/mods/v3}physicalLocation':
def no_display_label(physical_location):
return physical_location.get('displayLabel') is None
value['physicalLocation'] = TagGroup(tag, group).filter(no_display_label).text()
elif tag == '{http://www.loc.gov/mods/v3}shelfLocator':
# This element should not be repeated according to MODS-AP 2.3.1, however a few of the files contain
# a second element with empty text and a "displayLabel" attribute set.
def no_display_label(shelf_locator):
return shelf_locator.get('displayLabel') is None
value['shelfLocator'] = TagGroup(tag, group) \
.filter(no_display_label) \
.force_singleton() \
.has_no_attributes() \
.text()
elif tag == '{http://www.loc.gov/mods/v3}originInfo':
def has_event_type(origin_info):
# According to MODS-AP 2.3.1, every originInfo should have its eventType set. However, some
# are empty and not fixable.
return origin_info.attrib.get('eventType') is not None
tag_group = TagGroup(tag, group).fix_event_type().filter(has_event_type, warn="has no eventType")
for event_type, grouped_group in sorted_groupby(tag_group.group, key=lambda g: g.attrib['eventType']):
for n, e in enumerate(grouped_group):
value['originInfo-{}{}'.format(event_type, n)] = mods_to_dict(e, raise_errors)
elif tag == '{http://www.loc.gov/mods/v3}place':
value['place'] = TagGroup(tag, group).force_singleton(warn=False).has_no_attributes().descend(raise_errors)
elif tag == '{http://www.loc.gov/mods/v3}placeTerm':
value['placeTerm'] = TagGroup(tag, group).is_singleton().has_attributes({'type': 'text'}).text()
elif tag == '{http://www.loc.gov/mods/v3}dateIssued':
value['dateIssued'] = TagGroup(tag, group) \
.fix_date() \
.sort(key=lambda d: d.attrib.get('keyDate') == 'yes', reverse=True) \
.ignore_attributes() \
.force_singleton() \
.text()
elif tag == '{http://www.loc.gov/mods/v3}dateCreated':
value['dateCreated'] = TagGroup(tag, group) \
.fix_date() \
.sort(key=lambda d: d.attrib.get('keyDate') == 'yes', reverse=True) \
.ignore_attributes() \
.force_singleton() \
.text()
elif tag == '{http://www.loc.gov/mods/v3}dateCaptured':
value['dateCaptured'] = TagGroup(tag, group).fix_date().ignore_attributes().is_singleton().text()
elif tag == '{http://www.loc.gov/mods/v3}dateOther':
value['dateOther'] = TagGroup(tag, group).fix_date().ignore_attributes().is_singleton().text()
elif tag == '{http://www.loc.gov/mods/v3}publisher':
value['publisher'] = TagGroup(tag, group).force_singleton(warn=False).has_no_attributes().text()
elif tag == '{http://www.loc.gov/mods/v3}edition':
value['edition'] = TagGroup(tag, group).force_singleton().has_no_attributes().text()
elif tag == '{http://www.loc.gov/mods/v3}classification':
authorities = {e.attrib['authority'] for e in group}
for authority in authorities:
sub_group = [e for e in group if e.attrib.get('authority') == authority]
value['classification-{}'.format(authority)] = TagGroup(tag, sub_group).text_set()
elif tag == '{http://www.loc.gov/mods/v3}recordInfo':
value['recordInfo'] = TagGroup(tag, group).is_singleton().has_no_attributes().descend(raise_errors)
elif tag == '{http://www.loc.gov/mods/v3}recordIdentifier':
# By default we assume source="gbv-ppn" mods:recordIdentifiers (= PPNs),
# however, in mods:relatedItems, there may be source="dnb-ppns",
# which we need to distinguish by using a separate field name.
try:
value['recordIdentifier'] = TagGroup(tag, group).is_singleton().has_attributes({'source': 'gbv-ppn'}).text()
except ValueError:
value['recordIdentifier-dnb-ppn'] = TagGroup(tag, group).is_singleton().has_attributes({'source': 'dnb-ppn'}).text()
elif tag == '{http://www.loc.gov/mods/v3}identifier':
for e in group:
if len(e.attrib) != 1:
raise ValueError('Unknown attributes for identifier {}'.format(e.attrib))
value['identifier-{}'.format(e.attrib['type'])] = e.text
elif tag == '{http://www.loc.gov/mods/v3}titleInfo':
def only_standard_title(title_info):
return title_info.attrib.get('type') is None
value['titleInfo'] = TagGroup(tag, group) \
.filter(only_standard_title) \
.is_singleton().has_no_attributes().descend(raise_errors)
elif tag == '{http://www.loc.gov/mods/v3}title':
value['title'] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif tag == '{http://www.loc.gov/mods/v3}partName':
value['partName'] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif tag == '{http://www.loc.gov/mods/v3}subTitle':
value['subTitle'] = TagGroup(tag, group).force_singleton().has_no_attributes().text()
elif tag == '{http://www.loc.gov/mods/v3}note':
# This could be useful if distinguished by type attribute.
pass
elif tag == '{http://www.loc.gov/mods/v3}part':
pass
elif tag == '{http://www.loc.gov/mods/v3}abstract':
value['abstract'] = TagGroup(tag, group).has_no_attributes().text()
elif tag == '{http://www.loc.gov/mods/v3}subject':
authorities = {e.attrib.get('authority') for e in group}
for authority in authorities:
k = 'subject-{}'.format(authority) if authority is not None else 'subject'
sub_group = [e for e in group if e.attrib.get('authority') == authority]
value[k] = TagGroup(tag, sub_group).force_singleton().descend(raise_errors)
elif tag == '{http://www.loc.gov/mods/v3}topic':
TagGroup(tag, group).text_set()
elif tag == '{http://www.loc.gov/mods/v3}cartographics':
pass
elif tag == '{http://www.loc.gov/mods/v3}geographic':
TagGroup(tag, group).text_set()
elif tag == '{http://www.loc.gov/mods/v3}temporal':
TagGroup(tag, group).text_set()
elif tag == '{http://www.loc.gov/mods/v3}genre':
authorities = {e.attrib.get('authority') for e in group}
for authority in authorities:
k = 'genre-{}'.format(authority) if authority is not None else 'genre'
value[k] = {e.text for e in group if e.attrib.get('authority') == authority}
elif tag == '{http://www.loc.gov/mods/v3}language':
value["language"] = TagGroup(tag, group) \
.merge_sub_tags_to_set()
elif tag == '{http://www.loc.gov/mods/v3}languageTerm':
value['languageTerm'] = TagGroup(tag, group) \
.has_attributes({'authority': 'iso639-2b', 'type': 'code'}) \
.text_set()
elif tag == '{http://www.loc.gov/mods/v3}scriptTerm':
value['scriptTerm'] = TagGroup(tag, group) \
.fix_script_term() \
.has_attributes({'authority': 'iso15924', 'type': 'code'}) \
.text_set()
elif tag == '{http://www.loc.gov/mods/v3}relatedItem':
tag_group = TagGroup(tag, group)
for type_, grouped_group in sorted_groupby(tag_group.group, key=lambda g: g.attrib['type']):
sub_tag = 'relatedItem-{}'.format(type_)
grouped_group = list(grouped_group)
if type_ in ["original", "host"]:
value[sub_tag] = TagGroup(sub_tag, grouped_group).is_singleton().descend(raise_errors)
else:
# TODO type="series"
pass
elif tag == '{http://www.loc.gov/mods/v3}name':
for n, e in enumerate(group):
value['name{}'.format(n)] = mods_to_dict(e, raise_errors)
elif tag == '{http://www.loc.gov/mods/v3}role':
value["role"] = TagGroup(tag, group) \
.has_no_attributes() \
.merge_sub_tags_to_set()
elif tag == '{http://www.loc.gov/mods/v3}roleTerm':
value['roleTerm'] = TagGroup(tag, group) \
.has_attributes({'authority': 'marcrelator', 'type': 'code'}) \
.text_set()
elif tag == '{http://www.loc.gov/mods/v3}namePart':
for e in group:
if not e.attrib.get('type'):
value['namePart'] = e.text
else:
value['namePart-{}'.format(e.attrib['type'])] = e.text
elif tag == '{http://www.loc.gov/mods/v3}nameIdentifier':
# TODO Use this (e.g. <mods:nameIdentifier type="ppn">106168096</mods:nameIdentifier>) or the
# mods:name@valueURI to disambiguate
pass
elif tag == '{http://www.loc.gov/mods/v3}displayForm':
value['displayForm'] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif tag == '{http://www.loc.gov/mods/v3}physicalDescription':
pass
elif tag == '{http://www.loc.gov/mods/v3}extension':
pass
elif tag == '{http://www.loc.gov/mods/v3}accessCondition':
for e in group:
if not e.attrib.get('type'):
raise ValueError('Unknown attributes for accessCondition {}'.format(e.attrib))
value['accessCondition-{}'.format(e.attrib['type'])] = e.text
elif tag == '{http://www.loc.gov/mods/v3}typeOfResource':
value['typeOfResource'] = TagGroup(tag, group).is_singleton().has_no_attributes().text()
elif tag == '{http://www.loc.gov/mods/v3}mods':
# XXX Ignore nested mods:mods for now (used in mods:subject)
pass
else:
if raise_errors:
raise ValueError('Unknown tag "{}"'.format(tag))
else:
pass
return value
def mets_to_dict(mets, raise_errors=True):
"""Convert METS metadata to a nested dictionary"""
# The approach taken here is to handle each element explicitly. This also means that ignored elements are ignored
# explicitly.
value = {}
# Iterate through each group of tags
for tag, group in sorted_groupby(mets, key=attrgetter('tag')):
group = list(group)
# XXX Namespaces seem to use a trailing / sometimes, sometimes not.
# (e.g. {http://www.loc.gov/METS/} vs {http://www.loc.gov/METS})
if tag == '{http://www.loc.gov/METS/}amdSec':
pass # TODO
elif tag == '{http://www.loc.gov/METS/}dmdSec':
pass # TODO
elif tag == '{http://www.loc.gov/METS/}metsHdr':
pass # TODO
elif tag == '{http://www.loc.gov/METS/}structLink':
pass # TODO
elif tag == '{http://www.loc.gov/METS/}structMap':
pass # TODO
elif tag == '{http://www.loc.gov/METS/}fileSec':
value['fileSec'] = TagGroup(tag, group) \
.is_singleton().descend(raise_errors)
elif tag == '{http://www.loc.gov/METS/}fileGrp':
for e in group:
use = e.attrib.get('USE')
if not use:
raise ValueError('No USE attribute for fileGrp {}'.format(e))
value[f'fileGrp-{use}-count'] = len(e)
else:
if raise_errors:
print(value)
raise ValueError('Unknown tag "{}"'.format(tag))
else:
pass
return value
def pages_to_dict(mets, raise_errors=True) -> List[Dict]:
# TODO replace asserts by ValueError
result = []
# PPN
def get_mets_recordIdentifier(*, source="gbv-ppn"):
return (mets.xpath(f'//mets:dmdSec[1]//mods:mods/mods:recordInfo/mods:recordIdentifier[@source="{source}"]',
namespaces=ns) or [None])[0].text
ppn = get_mets_recordIdentifier()
# Getting per-page/structure information is a bit different
structMap_PHYSICAL = mets.find('./mets:structMap[@TYPE="PHYSICAL"]', ns)
structMap_LOGICAL = mets.find('./mets:structMap[@TYPE="LOGICAL"]', ns)
fileSec = mets.find('./mets:fileSec', ns)
if structMap_PHYSICAL is None:
# This is expected in a multivolume work!
if structMap_LOGICAL.find('./mets:div[@TYPE="multivolume_work"]', ns) is not None:
return []
else:
raise ValueError("No structMap[@TYPE='PHYSICAL'] found (but not a multivolume work)")
if structMap_LOGICAL is None:
raise ValueError("No structMap[@TYPE='LOGICAL'] found")
if fileSec is None:
raise ValueError("No fileSec found")
div_physSequence = structMap_PHYSICAL[0]
assert div_physSequence.attrib.get("TYPE") == "physSequence"
# Build a look-up table to get mets:file by @ID
# This cuts retrieving the mets:file down to half the time.
mets_file_by_ID = {}
def _init_mets_file_by_ID():
for f in fileSec.iterfind('./mets:fileGrp/mets:file', ns):
mets_file_by_ID[f.attrib.get("ID")] = f
_init_mets_file_by_ID()
def get_mets_file(*, ID):
if ID:
return mets_file_by_ID[ID]
def get_mets_div(*, ID):
if ID:
return structMap_LOGICAL.findall(f'.//mets:div[@ID="{ID}"]', ns)
for page in div_physSequence:
# TODO sort by ORDER?
assert page.attrib.get("TYPE") == "page"
page_dict = {}
page_dict["ppn"] = ppn
page_dict["ID"] = page.attrib.get("ID")
for fptr in page:
assert fptr.tag == "{http://www.loc.gov/METS/}fptr"
file_id = fptr.attrib.get("FILEID")
assert file_id
file_ = get_mets_file(ID=file_id)
assert file_ is not None
fileGrp_USE = file_.getparent().attrib.get("USE")
file_FLocat_href = (file_.xpath('mets:FLocat/@xlink:href', namespaces=ns) or [None])[0]
page_dict[f"fileGrp_{fileGrp_USE}_file_FLocat_href"] = file_FLocat_href
def get_struct_log(*, to_phys):
"""
Get the logical structMap elements that link to the given physical page.
Keyword arguments:
to_phys -- ID of the page, as per structMap[@TYPE="PHYSICAL"]
"""
# This is all XLink, there might be a more generic way to traverse the links. However, currently,
# it suffices to do this the old-fashioned way.
sm_links = mets.findall(
f'./mets:structLink/mets:smLink[@xlink:to="{to_phys}"]', ns
)
targets = []
for sm_link in sm_links:
xlink_from = sm_link.attrib.get(f"{{{ns['xlink']}}}from")
targets.extend(get_mets_div(ID=xlink_from))
return targets
struct_divs = set(get_struct_log(to_phys=page_dict["ID"]))
# In our documents, there are already links to parent elements, but we want to make
# sure and add them.
def get_struct_log_parents(div):
cursor = div
while (cursor := cursor.getparent()).tag == f"{{{ns['mets']}}}div":
yield cursor
struct_divs_to_add = set()
for struct_div in struct_divs:
struct_divs_to_add.update(get_struct_log_parents(struct_div))
struct_divs.update(struct_divs_to_add)
# Populate structure type indicator variables
for struct_div in struct_divs:
type_ = struct_div.attrib.get("TYPE")
assert type_
page_dict[f"structMap-LOGICAL_TYPE_{type_}"] = 1
result.append(page_dict)
return result
@click.command()
@click.argument('mets_files', type=click.Path(exists=True), required=True, nargs=-1)
@click.option('--output', '-o', 'output_file', type=click.Path(), help='Output pickle file',
default='mods_info_df.pkl', show_default=True)
@click.option('--output-csv', type=click.Path(), help='Output CSV file')
@click.option('--output-xlsx', type=click.Path(), help='Output Excel .xlsx file')
def process(mets_files: List[str], output_file: str, output_csv: str, output_xlsx: str):
"""
A tool to convert the MODS metadata in INPUT to a pandas DataFrame.
INPUT is assumed to be a METS document with MODS metadata. INPUT may optionally be a directory. The tool then reads
all files in the directory.
mods4pandas writes two output files: A pickled pandas DataFrame and a CSV file with all conversion warnings.
"""
# Extend file list if directories are given
mets_files_real = []
for m in mets_files:
if os.path.isdir(m):
logger.info('Scanning directory {}'.format(m))
mets_files_real.extend(f.path for f in tqdm(os.scandir(m), leave=False)
if f.is_file() and not f.name.startswith('.'))
else:
mets_files_real.append(m)
# Process METS files
with open(output_file + '.warnings.csv', 'w') as csvfile:
csvwriter = csv.writer(csvfile)
mods_info = []
page_info = []
logger.info('Processing METS files')
for mets_file in tqdm(mets_files_real, leave=False):
try:
root = ET.parse(mets_file).getroot()
mets = root # XXX .find('mets:mets', ns) does not work here
mods = root.find('mets:dmdSec//mods:mods', ns)
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter('always') # do NOT filter double occurrences
# MODS
d = flatten(mods_to_dict(mods, raise_errors=True))
# METS
d_mets = flatten(mets_to_dict(mets, raise_errors=True))
for k, v in d_mets.items():
d[f"mets_{k}"] = v
# "meta"
d['mets_file'] = mets_file
# METS - per-page
page_info_doc: list[dict] = pages_to_dict(mets, raise_errors=True)
mods_info.append(d)
page_info.extend(page_info_doc)
if caught_warnings:
# PyCharm thinks caught_warnings is not Iterable:
# noinspection PyTypeChecker
for caught_warning in caught_warnings:
csvwriter.writerow([mets_file, caught_warning.message])
except Exception as e:
logger.error('Exception in {}: {}'.format(mets_file, e))
#import traceback; traceback.print_exc()
# Convert the mods_info List[Dict] to a pandas DataFrame
mods_info_df = dicts_to_df(mods_info, index_column="recordInfo_recordIdentifier")
# Pickle the DataFrame
logger.info('Writing DataFrame to {}'.format(output_file))
mods_info_df.to_pickle(output_file)
if output_csv:
logger.info('Writing CSV to {}'.format(output_csv))
mods_info_df.to_csv(output_csv)
if output_xlsx:
logger.info('Writing Excel .xlsx to {}'.format(output_xlsx))
mods_info_df.to_excel(output_xlsx)
# Convert page_info
# XXX hardcoded filenames + other formats
page_info_df = dicts_to_df(page_info, index_column=("ppn", "ID"))
# Pickle the DataFrame
logger.info('Writing DataFrame to {}'.format("page_info_df.pkl"))
page_info_df.to_pickle("page_info_df.pkl")
def main():
logging.basicConfig(level=logging.INFO)
for prefix, uri in ns.items():
ET.register_namespace(prefix, uri)
process()
if __name__ == '__main__':
main()