You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
page2tsv/qurator/tsvtools/cli.py

499 lines
21 KiB
Python

import glob
import re
import os
from io import StringIO
from pathlib import Path
import numpy as np
import click
import pandas as pd
import requests
from lxml import etree as ET
import xml.etree.ElementTree as ElementTree
import unicodedata
from ocrd_models.ocrd_page import parse
from ocrd_utils import bbox_from_points
from qurator.utils.tsv import read_tsv, write_tsv, extract_doc_links
from .ocr import get_conf_color
from qurator.utils.ner import ner
from qurator.utils.ned import ned
@click.command()
@click.argument('tsv-file', type=click.Path(exists=True), required=True, nargs=1)
@click.argument('url-file', type=click.Path(exists=False), required=True, nargs=1)
def extract_document_links(tsv_file, url_file):
parts = extract_doc_links(tsv_file)
urls = [part['url'] for part in parts]
urls = pd.DataFrame(urls, columns=['url'])
urls.to_csv(url_file, sep="\t", quoting=3, index=False)
@click.command()
@click.argument('tsv-file', type=click.Path(exists=True), required=True, nargs=1)
@click.argument('annotated-tsv-file', type=click.Path(exists=False), required=True, nargs=1)
def annotate_tsv(tsv_file, annotated_tsv_file):
parts = extract_doc_links(tsv_file)
annotated_parts = []
for part in parts:
part_data = StringIO(part['header'] + part['text'])
df = pd.read_csv(part_data, sep="\t", comment='#', quoting=3)
df['url_id'] = len(annotated_parts)
annotated_parts.append(df)
df = pd.concat(annotated_parts)
df.to_csv(annotated_tsv_file, sep="\t", quoting=3, index=False)
def alto_iterate_textblocks(xml_file=None, root=None):
if root is None:
tree = ElementTree.parse(xml_file)
root = tree.getroot()
for idx, block_elem in enumerate(root.iter('{http://www.loc.gov/standards/alto/ns-v2#}TextBlock')):
id = str(idx)
if 'ID' in block_elem.attrib:
id = block_elem.attrib['ID']
yield id, block_elem
def alto_iterate_lines(root):
for idx, line_elem in enumerate(root.iter('{http://www.loc.gov/standards/alto/ns-v2#}TextLine')):
left, top, right, bottom = -1, -1, -1, -1
if 'HPOS' in line_elem.attrib:
left = int(line_elem.attrib['HPOS'])
if 'VPOS' in line_elem.attrib:
top = int(line_elem.attrib['VPOS'])
if 'HPOS' in line_elem.attrib and 'WIDTH' in line_elem.attrib:
right = int(line_elem.attrib['HPOS']) + int(line_elem.attrib['WIDTH'])
if 'VPOS' in line_elem.attrib and 'HEIGHT' in line_elem.attrib:
bottom = int(line_elem.attrib['VPOS']) + int(line_elem.attrib['HEIGHT'])
yield line_elem, str(idx), left, right, top, bottom
def alto_iterate_string_elements(root):
for string_elem in root.iter('{http://www.loc.gov/standards/alto/ns-v2#}String'):
if 'CONTENT' in string_elem.attrib:
content = string_elem.attrib['CONTENT']
else:
content = str(np.NAN)
left, top, right, bottom = -1, -1, -1, -1
if 'HPOS' in string_elem.attrib:
left = int(string_elem.attrib['HPOS'])
if 'VPOS' in string_elem.attrib:
top = int(string_elem.attrib['VPOS'])
if 'HPOS' in string_elem.attrib and 'WIDTH' in string_elem.attrib:
right = int(string_elem.attrib['HPOS']) + int(string_elem.attrib['WIDTH'])
if 'VPOS' in string_elem.attrib and 'HEIGHT' in string_elem.attrib:
bottom = int(string_elem.attrib['VPOS']) + int(string_elem.attrib['HEIGHT'])
yield unicodedata.normalize('NFC', content), left, top, right, bottom
def alto2tsv(alto_xml_file, tsv_out_file, purpose, image_url, ner_rest_endpoint, ned_rest_endpoint,
noproxy, scale_factor, ned_threshold, ned_priority):
if purpose == "NERD":
out_columns = ['No.', 'TOKEN', 'NE-TAG', 'NE-EMB', 'ID', 'url_id', 'left', 'right', 'top', 'bottom', 'conf']
elif purpose == "OCR":
out_columns = ['TEXT', 'url_id', 'left', 'right', 'top', 'bottom', 'conf', 'line_id']
else:
raise RuntimeError("Unknown purpose.")
if noproxy:
os.environ['no_proxy'] = '*'
urls = []
if os.path.exists(tsv_out_file):
parts = extract_doc_links(tsv_out_file)
urls = [part['url'] for part in parts]
else:
pd.DataFrame([], columns=out_columns).to_csv(tsv_out_file, sep="\t", quoting=3, index=False)
tsv = []
line_info = []
for region_idx, region in alto_iterate_textblocks(alto_xml_file):
for line, _, l_left, l_right, l_top, l_bottom in alto_iterate_lines(region):
line_id = len(line_info)
line_info.append((len(urls), l_left, l_right, l_top, l_bottom, line_id))
for word_num, (word, left, top, right, bottom) in enumerate(alto_iterate_string_elements(line)):
word = word.strip()
if len(word) == 0:
continue
if len(word.split()) > 1:
print(word)
continue
left, top, right, bottom = [int(scale_factor * x) for x in [left, top, right, bottom]]
tsv.append((region_idx, left + (right - left) / 2.0,
word, len(urls), left, right, top, bottom, line_id))
line_info = pd.DataFrame(line_info, columns=['url_id', 'left', 'right', 'top', 'bottom', 'line_id'])
tsv = pd.DataFrame(tsv, columns=['rid', 'hcenter'] +
['TEXT', 'url_id', 'left', 'right', 'top', 'bottom', 'line_id'])
with open(tsv_out_file, 'a') as f:
f.write('# ' + image_url + '\n')
if len(tsv) == 0:
return
vlinecenter = pd.DataFrame(tsv[['line_id', 'top']].groupby('line_id', sort=False).mean().top +
(tsv[['line_id', 'bottom']].groupby('line_id', sort=False).mean().bottom -
tsv[['line_id', 'top']].groupby('line_id', sort=False).mean().top) / 2,
columns=['vlinecenter'])
tsv = tsv.merge(vlinecenter, left_on='line_id', right_index=True)
regions = [region.sort_values(['vlinecenter', 'hcenter']) for rid, region in tsv.groupby('rid', sort=False)]
tsv = pd.concat(regions)
if purpose == 'NERD':
tsv['No.'] = 0
tsv['NE-TAG'] = 'O'
tsv['NE-EMB'] = 'O'
tsv['ID'] = '-'
tsv['conf'] = '-'
tsv = tsv.rename(columns={'TEXT': 'TOKEN'})
elif purpose == 'OCR':
tsv = pd.DataFrame([(line_id, " ".join(part.TEXT.to_list())) for line_id, part in tsv.groupby('line_id')],
columns=['line_id', 'TEXT'])
tsv = tsv.merge(line_info, left_on='line_id', right_index=True)
tsv = tsv[out_columns].reset_index(drop=True)
try:
if purpose == 'NERD' and ner_rest_endpoint is not None:
tsv, ner_result = ner(tsv, ner_rest_endpoint)
if ned_rest_endpoint is not None:
tsv, _ = ned(tsv, ner_result, ned_rest_endpoint, threshold=ned_threshold, priority=ned_priority)
tsv.to_csv(tsv_out_file, sep="\t", quoting=3, index=False, mode='a', header=False)
except requests.HTTPError as e:
print(e)
def unicode_normalize(text, normalization_map=None, use_combining_characters=True):
if normalization_map is None:
ret = "".join([c if unicodedata.category(c) != "Co" else '' for c in text])
if ret != text:
print("Warning: Due to unicode normalization possible loss of information: "
"{} => {} (normalization file missing?)".format(text, ret))
elif use_combining_characters:
ret = "".join([c if unicodedata.category(c) != "Co" else
"{}{}".format(normalization_map.loc[ord(c)].base,
chr(int(normalization_map.loc[ord(c)].combining_character, base=16))
if normalization_map.loc[ord(c)].combining_character != '' else '')
if ord(c) in normalization_map.index else '' for c in text])
# do it again since the normalization map may again contain unicode private use chars
ret = "".join([c if unicodedata.category(c) != "Co" else '' for c in ret])
if ret != text:
print("Warning: Due to unicode normalization possible loss of information: "
"{} => {}".format(text, ret))
else:
ret = "".join([c if unicodedata.category(c) != "Co" else
normalization_map.loc[ord(c)].base
if ord(c) in normalization_map.index else ''
for c in text])
# do it again since the normalization map may again contain unicode private use chars
ret = "".join([c if unicodedata.category(c) != "Co" else '' for c in ret])
if ret != text:
print("Warning: Due to unicode normalization possible loss of information: "
"{} => {}".format(text, ret))
return unicodedata.normalize('NFC', ret)
def page2tsv(page_xml_file, tsv_out_file, purpose, image_url, ner_rest_endpoint, ned_rest_endpoint,
noproxy, scale_factor, ned_threshold, min_confidence, max_confidence, ned_priority, normalization_file):
print("page2tsv - processing file: {}".format(page_xml_file))
if purpose == "NERD":
out_columns = ['No.', 'TOKEN', 'NE-TAG', 'NE-EMB', 'ID', 'url_id', 'left', 'right', 'top', 'bottom', 'conf']
elif purpose == "OCR":
out_columns = ['TEXT', 'url_id', 'left', 'right', 'top', 'bottom', 'conf', 'line_id']
if min_confidence is not None and max_confidence is not None:
out_columns += ['ocrconf']
else:
raise RuntimeError("Unknown purpose.")
if noproxy:
os.environ['no_proxy'] = '*'
urls = []
if os.path.exists(tsv_out_file):
parts = extract_doc_links(tsv_out_file)
urls = [part['url'] for part in parts]
else:
pd.DataFrame([], columns=out_columns).to_csv(tsv_out_file, sep="\t", quoting=3, index=False)
pcgts = parse(page_xml_file)
tsv = []
line_info = []
_unicode_normalize = unicode_normalize
if normalization_file is not None:
normalization_map = pd.read_pickle(normalization_file)
normalization_map = normalization_map.set_index('decimal')
# import ipdb;ipdb.set_trace()
_unicode_normalize = lambda s: unicode_normalize(s, normalization_map=normalization_map)
for region_idx, region in enumerate(pcgts.get_Page().get_AllRegions(classes=['Text'], order='reading-order')):
for text_line in region.get_TextLine():
left, top, right, bottom = [int(scale_factor * x) for x in bbox_from_points(text_line.get_Coords().points)]
if min_confidence is not None and max_confidence is not None:
conf = np.max([textequiv.conf for textequiv in text_line.get_TextEquiv()])
else:
conf = np.nan
line_info.append((len(urls), left, right, top, bottom, conf, text_line.id))
words = [word for word in text_line.get_Word()]
if len(words) <= 0:
for text_equiv in text_line.get_TextEquiv():
# transform OCR coordinates using `scale_factor` to derive
# correct coordinates for the web presentation image
left, top, right, bottom = [int(scale_factor * x) for x in bbox_from_points(text_line.get_Coords().points)]
text = text_equiv.get_Unicode()
for text_part in text.split(" "):
tsv.append((region_idx, len(line_info) - 1, left + (right - left) / 2.0,
_unicode_normalize(text_part), len(urls), left, right, top, bottom, text_line.id))
else:
for word in words:
# XXX TODO make this configurable
textequiv = ''
list_textequivs = word.get_TextEquiv()
if list_textequivs:
textequiv = list_textequivs[0].get_Unicode()
# transform OCR coordinates using `scale_factor` to derive
# correct coordinates for the web presentation image
left, top, right, bottom = [int(scale_factor * x) for x in bbox_from_points(word.get_Coords().points)]
tsv.append((region_idx, len(line_info) - 1, left + (right - left) / 2.0,
_unicode_normalize(textequiv), len(urls), left, right, top, bottom, text_line.id))
line_info = pd.DataFrame(line_info, columns=['url_id', 'left', 'right', 'top', 'bottom', 'conf', 'line_id'])
if min_confidence is not None and max_confidence is not None:
line_info['ocrconf'] = line_info.conf.map(lambda x: get_conf_color(x, min_confidence, max_confidence))
tsv = pd.DataFrame(tsv, columns=['rid', 'line', 'hcenter'] +
['TEXT', 'url_id', 'left', 'right', 'top', 'bottom', 'line_id'])
# print(tsv)
with open(tsv_out_file, 'a') as f:
f.write('# ' + image_url + '\n')
if len(tsv) == 0:
return
vlinecenter = pd.DataFrame(tsv[['line', 'top']].groupby('line', sort=False).mean().top +
(tsv[['line', 'bottom']].groupby('line', sort=False).mean().bottom -
tsv[['line', 'top']].groupby('line', sort=False).mean().top) / 2,
columns=['vlinecenter'])
tsv = tsv.merge(vlinecenter, left_on='line', right_index=True)
regions = [region.sort_values(['vlinecenter', 'hcenter']) for rid, region in tsv.groupby('rid', sort=False)]
tsv = pd.concat(regions)
if purpose == 'NERD':
tsv['No.'] = 0
tsv['NE-TAG'] = 'O'
tsv['NE-EMB'] = 'O'
tsv['ID'] = '-'
tsv['conf'] = '-'
tsv = tsv.rename(columns={'TEXT': 'TOKEN'})
elif purpose == 'OCR':
tsv = pd.DataFrame([(line, " ".join(part.TEXT.to_list())) for line, part in tsv.groupby('line')],
columns=['line', 'TEXT'])
tsv = tsv.merge(line_info, left_on='line', right_index=True)
tsv = tsv[out_columns].reset_index(drop=True)
# import ipdb;ipdb.set_trace()
try:
if purpose == 'NERD' and ner_rest_endpoint is not None:
tsv, ner_result = ner(tsv, ner_rest_endpoint)
if ned_rest_endpoint is not None:
tsv, _ = ned(tsv, ner_result, ned_rest_endpoint, threshold=ned_threshold, priority=ned_priority)
# import ipdb;ipdb.set_trace()
tsv.to_csv(tsv_out_file, sep="\t", quoting=3, index=False, mode='a', header=False, encoding='utf-8')
except requests.HTTPError as e:
print(e)
def tsv2page(output_filename, keep_words, page_file, tsv_file):
if not output_filename:
output_filename = Path(page_file).stem + '.corrected.xml'
ns = {'pc': 'http://schema.primaresearch.org/PAGE/gts/pagecontent/2019-07-15'}
tsv = pd.read_csv(tsv_file, sep='\t', comment='#', quoting=3)
tree = ET.parse(page_file)
for _, row in tsv.iterrows():
el_textline = tree.find(f'//pc:TextLine[@id="{row.line_id}"]', namespaces=ns)
el_textline.find('pc:TextEquiv/pc:Unicode', namespaces=ns).text = row.TEXT
if not keep_words:
for el_word in el_textline.findall('pc:Word', namespaces=ns):
el_textline.remove(el_word)
with open(output_filename, 'w', encoding='utf-8') as f:
f.write(ET.tostring(tree, pretty_print=True).decode('utf-8'))
@click.command()
@click.option('--output-filename', '-o', help="Output filename. "
"If omitted, PAGE-XML filename with .corrected.xml extension")
@click.option('--keep-words', '-k', is_flag=True, help="Keep (out-of-date) Words of TextLines")
@click.argument('page-file')
@click.argument('tsv-file')
def tsv2page_cli(output_filename, keep_words, page_file, tsv_file):
return tsv2page_cli(output_filename, keep_words, page_file, tsv_file)
@click.command()
@click.option('--xls-file', type=click.Path(exists=True), default=None,
help="Read parameters from xls-file. Expected columns: Filename, iiif_url, scale_factor.")
@click.option('--directory', type=click.Path(exists=True), default=None,
help="Search directory for PPN**/*.xml files. Extract PPN and file number into image-url.")
@click.option('--purpose', type=click.Choice(['NERD', 'OCR'], case_sensitive=False), default="NERD",
help="Purpose of output tsv file. "
"\n\nNERD: NER/NED application/ground-truth creation. "
"\n\nOCR: OCR application/ground-truth creation. "
"\n\ndefault: NERD.")
def make_page2tsv_commands(xls_file, directory, purpose):
if xls_file is not None:
if xls_file.endswith(".xls"):
df = pd.read_excel(xls_file)
else:
df = pd.read_excel(xls_file, engine='openpyxl')
df = df.dropna(how='all')
for _, row in df.iterrows():
print('page2tsv $(OPTIONS) {}.xml {}.tsv --image-url={} --scale-factor={} --purpose={}'.
format(row.Filename, row.Filename, row.iiif_url.replace('/full/full', '/left,top,width,height/full'),
row.scale_factor, purpose))
elif directory is not None:
for file in glob.glob('{}/**/*.xml'.format(directory), recursive=True):
ma = re.match('(.*/(PPN[0-9X]+)/.*?([0-9]+).*?).xml', file)
if ma:
print('page2tsv {} {}.tsv '
'--image-url=https://content.staatsbibliothek-berlin.de/dc/'
'{}-{:08d}/left,top,width,height/full/0/default.jpg --scale-factor=1.0 --purpose={}'.
format(file, ma.group(1), ma.group(2), int(ma.group(3)), purpose))
@click.command()
@click.argument('page-xml-file', type=click.Path(exists=True), required=True, nargs=1)
@click.argument('tsv-out-file', type=click.Path(), required=True, nargs=1)
@click.option('--purpose', type=click.Choice(['NERD', 'OCR'], case_sensitive=False), default="NERD",
help="Purpose of output tsv file. "
"\n\nNERD: NER/NED application/ground-truth creation. "
"\n\nOCR: OCR application/ground-truth creation. "
"\n\ndefault: NERD.")
@click.option('--image-url', type=str, default='http://empty')
@click.option('--ner-rest-endpoint', type=str, default=None,
help="REST endpoint of sbb_ner service. See https://github.com/qurator-spk/sbb_ner for details. "
"Only applicable in case of NERD.")
@click.option('--ned-rest-endpoint', type=str, default=None,
help="REST endpoint of sbb_ned service. See https://github.com/qurator-spk/sbb_ned for details. "
"Only applicable in case of NERD.")
@click.option('--noproxy', type=bool, is_flag=True, help='disable proxy. default: enabled.')
@click.option('--scale-factor', type=float, default=1.0, help='default: 1.0')
@click.option('--ned-threshold', type=float, default=None)
@click.option('--min-confidence', type=float, default=None)
@click.option('--max-confidence', type=float, default=None)
@click.option('--ned-priority', type=int, default=1)
@click.option('--normalization-file', type=click.Path(exists=True), default=None)
def page2tsv_cli(page_xml_file, tsv_out_file, purpose, image_url, ner_rest_endpoint, ned_rest_endpoint,
noproxy, scale_factor, ned_threshold, min_confidence, max_confidence, ned_priority, normalization_file):
return page2tsv(page_xml_file, tsv_out_file, purpose, image_url, ner_rest_endpoint, ned_rest_endpoint,
noproxy, scale_factor, ned_threshold, min_confidence, max_confidence, ned_priority, normalization_file)
@click.command()
@click.argument('alto-xml-file', type=click.Path(exists=True), required=True, nargs=1)
@click.argument('tsv-out-file', type=click.Path(), required=True, nargs=1)
@click.option('--purpose', type=click.Choice(['NERD', 'OCR'], case_sensitive=False), default="NERD",
help="Purpose of output tsv file. "
"\n\nNERD: NER/NED application/ground-truth creation. "
"\n\nOCR: OCR application/ground-truth creation. "
"\n\ndefault: NERD.")
@click.option('--image-url', type=str, default='http://empty')
@click.option('--ner-rest-endpoint', type=str, default=None,
help="REST endpoint of sbb_ner service. See https://github.com/qurator-spk/sbb_ner for details. "
"Only applicable in case of NERD.")
@click.option('--ned-rest-endpoint', type=str, default=None,
help="REST endpoint of sbb_ned service. See https://github.com/qurator-spk/sbb_ned for details. "
"Only applicable in case of NERD.")
@click.option('--noproxy', type=bool, is_flag=True, help='disable proxy. default: enabled.')
@click.option('--scale-factor', type=float, default=1.0, help='default: 1.0')
@click.option('--ned-threshold', type=float, default=None)
@click.option('--ned-priority', type=int, default=1)
def alto2tsv_cli(alto_xml_file, tsv_out_file, purpose, image_url, ner_rest_endpoint, ned_rest_endpoint,
noproxy, scale_factor, ned_threshold, ned_priority):
return alto2tsv(alto_xml_file, tsv_out_file, purpose, image_url, ner_rest_endpoint, ned_rest_endpoint,
noproxy, scale_factor, ned_threshold, ned_priority)