mirror of
https://github.com/qurator-spk/modstool.git
synced 2025-06-07 19:05:06 +02:00
⚡ Include METS fileGrp counts
This commit is contained in:
parent
2399699990
commit
73333ea2e2
2 changed files with 67 additions and 5 deletions
|
@ -4,7 +4,7 @@ import logging
|
|||
import os
|
||||
import re
|
||||
import warnings
|
||||
import xml.etree.ElementTree as ET
|
||||
from lxml import etree as ET
|
||||
from itertools import groupby
|
||||
from operator import attrgetter
|
||||
from typing import List
|
||||
|
@ -66,7 +66,7 @@ class TagGroup:
|
|||
return {e.text for e in self.group}
|
||||
|
||||
def descend(self, raise_errors):
|
||||
return mods_to_dict(self.is_singleton().group[0], raise_errors)
|
||||
return _to_dict(self.is_singleton().group[0], raise_errors)
|
||||
|
||||
def filter(self, cond, warn=None):
|
||||
new_group = []
|
||||
|
@ -175,6 +175,15 @@ def sorted_groupby(iterable, key=None):
|
|||
"""
|
||||
return groupby(sorted(iterable, key=key), key=key)
|
||||
|
||||
def _to_dict(root, raise_errors):
|
||||
|
||||
root_name = ET.QName(root.tag)
|
||||
if root_name.namespace == "http://www.loc.gov/mods/v3":
|
||||
return mods_to_dict(root, raise_errors)
|
||||
elif root_name.namespace == "http://www.loc.gov/METS/":
|
||||
return mets_to_dict(root, raise_errors)
|
||||
else:
|
||||
raise ValueError(f"Unknown namespace {root_name.namespace}")
|
||||
|
||||
def mods_to_dict(mods, raise_errors=True):
|
||||
"""Convert MODS metadata to a nested dictionary"""
|
||||
|
@ -357,7 +366,6 @@ def mods_to_dict(mods, raise_errors=True):
|
|||
pass
|
||||
else:
|
||||
if raise_errors:
|
||||
print(value)
|
||||
raise ValueError('Unknown tag "{}"'.format(tag))
|
||||
else:
|
||||
pass
|
||||
|
@ -365,6 +373,49 @@ def mods_to_dict(mods, raise_errors=True):
|
|||
return value
|
||||
|
||||
|
||||
def mets_to_dict(mets, raise_errors=True):
|
||||
"""Convert METS metadata to a nested dictionary"""
|
||||
|
||||
# The approach taken here is to handle each element explicitly. This also means that ignored elements are ignored
|
||||
# explicitly.
|
||||
|
||||
value = {}
|
||||
|
||||
# Iterate through each group of tags
|
||||
for tag, group in sorted_groupby(mets, key=attrgetter('tag')):
|
||||
group = list(group)
|
||||
|
||||
# XXX Namespaces seem to use a trailing / sometimes, sometimes not.
|
||||
# (e.g. {http://www.loc.gov/METS/} vs {http://www.loc.gov/METS})
|
||||
if tag == '{http://www.loc.gov/METS/}amdSec':
|
||||
pass # TODO
|
||||
elif tag == '{http://www.loc.gov/METS/}dmdSec':
|
||||
pass # TODO
|
||||
elif tag == '{http://www.loc.gov/METS/}metsHdr':
|
||||
pass # TODO
|
||||
elif tag == '{http://www.loc.gov/METS/}structLink':
|
||||
pass # TODO
|
||||
elif tag == '{http://www.loc.gov/METS/}structMap':
|
||||
pass # TODO
|
||||
elif tag == '{http://www.loc.gov/METS/}fileSec':
|
||||
value['fileSec'] = TagGroup(tag, group) \
|
||||
.is_singleton().descend(raise_errors)
|
||||
elif tag == '{http://www.loc.gov/METS/}fileGrp':
|
||||
for e in group:
|
||||
use = e.attrib.get('USE')
|
||||
if not use:
|
||||
raise ValueError('No USE attribute for fileGrp {}'.format(e))
|
||||
value[f'fileGrp-{use}-count'] = len(e)
|
||||
else:
|
||||
if raise_errors:
|
||||
print(value)
|
||||
raise ValueError('Unknown tag "{}"'.format(tag))
|
||||
else:
|
||||
pass
|
||||
|
||||
return value
|
||||
|
||||
|
||||
def flatten(d: MutableMapping, parent='', separator='_'):
|
||||
"""
|
||||
Flatten the given nested dict.
|
||||
|
@ -418,13 +469,22 @@ def process(mets_files: List[str], output_file: str):
|
|||
logging.info('Processing METS files')
|
||||
for mets_file in tqdm(mets_files_real):
|
||||
try:
|
||||
dmd_sec = ET.parse(mets_file).getroot().find('mets:dmdSec', ns)
|
||||
mods = dmd_sec.find('.//mods:mods', ns)
|
||||
root = ET.parse(mets_file).getroot()
|
||||
mets = root # XXX .find('mets:mets', ns) does not work here
|
||||
mods = root.find('mets:dmdSec//mods:mods', ns)
|
||||
|
||||
with warnings.catch_warnings(record=True) as caught_warnings:
|
||||
warnings.simplefilter('always') # do NOT filter double occurrences
|
||||
|
||||
# MODS
|
||||
d = flatten(mods_to_dict(mods, raise_errors=True))
|
||||
# METS
|
||||
d_mets = flatten(mets_to_dict(mets, raise_errors=True))
|
||||
for k, v in d_mets.items():
|
||||
d[f"mets_{k}"] = v
|
||||
# "meta"
|
||||
d['mets_file'] = mets_file
|
||||
|
||||
mods_info.append(d)
|
||||
|
||||
if caught_warnings:
|
||||
|
@ -434,6 +494,7 @@ def process(mets_files: List[str], output_file: str):
|
|||
csvwriter.writerow([mets_file, caught_warning.message])
|
||||
except Exception as e:
|
||||
warnings.warn('Exception in {}:\n{}'.format(mets_file, e))
|
||||
import traceback; traceback.print_exc()
|
||||
|
||||
# Convert the mods_info List[Dict] to a pandas DataFrame
|
||||
columns = []
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
click
|
||||
pandas
|
||||
tqdm
|
||||
lxml
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue