pull/48/merge
Alexander Pacha 2 years ago committed by GitHub
commit bf954a9319
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -30,7 +30,7 @@ https://huggingface.co/SBB/sbb_binarization
```sh ```sh
sbb_binarize \ sbb_binarize \
-m <path to directory containing model files \ -m <path to directory containing model files> \
<input image> \ <input image> \
<output image> <output image>
``` ```
@ -40,7 +40,7 @@ Images containing a lot of border noise (black pixels) should be cropped beforeh
### Example ### Example
```sh ```sh
sbb_binarize -m /path/to/model/ myimage.tif myimage-bin.tif sbb_binarize -m /path/to/models/ myimage.tif myimage-bin.tif
``` ```
To use the [OCR-D](https://ocr-d.de/) interface: To use the [OCR-D](https://ocr-d.de/) interface:

@ -3,3 +3,4 @@ setuptools >= 41
opencv-python-headless opencv-python-headless
ocrd >= 2.38.0 ocrd >= 2.38.0
tensorflow >= 2.4.0 tensorflow >= 2.4.0
mpire

@ -3,12 +3,16 @@ sbb_binarize CLI
""" """
from click import command, option, argument, version_option, types from click import command, option, argument, version_option, types
from .sbb_binarize import SbbBinarizer from .sbb_binarize import SbbBinarizer
@command() @command()
@version_option() @version_option()
@option('--model-dir', '-m', type=types.Path(exists=True, file_okay=False), required=True, help='directory containing models for prediction') @option('--model-dir', '-m', type=types.Path(exists=True, file_okay=False), required=True, help='directory containing models for prediction')
@argument('input_image') @argument('input_image')
@argument('output_image') @argument('output_image')
def main(model_dir, input_image, output_image): def main(model_dir, input_image, output_image):
SbbBinarizer(model_dir).run(image_path=input_image, save=output_image) binarizer = SbbBinarizer()
binarizer.load_model(model_dir)
binarizer.binarize_image_file(image_path=input_image, save_path=output_image)

@ -69,7 +69,8 @@ class SbbBinarizeProcessor(Processor):
raise FileNotFoundError("Does not exist or is not a directory: %s" % model_path) raise FileNotFoundError("Does not exist or is not a directory: %s" % model_path)
# resolve relative path via OCR-D ResourceManager # resolve relative path via OCR-D ResourceManager
model_path = self.resolve_resource(str(model_path)) model_path = self.resolve_resource(str(model_path))
self.binarizer = SbbBinarizer(model_dir=model_path, logger=LOG) self.binarizer = SbbBinarizer()
self.binarizer.load_model(model_path)
def process(self): def process(self):
""" """
@ -110,7 +111,7 @@ class SbbBinarizeProcessor(Processor):
if oplevel == 'page': if oplevel == 'page':
LOG.info("Binarizing on 'page' level in page '%s'", page_id) LOG.info("Binarizing on 'page' level in page '%s'", page_id)
bin_image = cv2pil(self.binarizer.run(image=pil2cv(page_image))) bin_image = cv2pil(self.binarizer.binarize_image(pil2cv(page_image)))
# update METS (add the image file): # update METS (add the image file):
bin_image_path = self.workspace.save_image_file(bin_image, bin_image_path = self.workspace.save_image_file(bin_image,
file_id + '.IMG-BIN', file_id + '.IMG-BIN',
@ -124,7 +125,7 @@ class SbbBinarizeProcessor(Processor):
LOG.warning("Page '%s' contains no text/table regions", page_id) LOG.warning("Page '%s' contains no text/table regions", page_id)
for region in regions: for region in regions:
region_image, region_xywh = self.workspace.image_from_segment(region, page_image, page_xywh, feature_filter='binarized') region_image, region_xywh = self.workspace.image_from_segment(region, page_image, page_xywh, feature_filter='binarized')
region_image_bin = cv2pil(binarizer.run(image=pil2cv(region_image))) region_image_bin = cv2pil(self.binarizer.binarize_image(image=pil2cv(region_image)))
region_image_bin_path = self.workspace.save_image_file( region_image_bin_path = self.workspace.save_image_file(
region_image_bin, region_image_bin,
"%s_%s.IMG-BIN" % (file_id, region.id), "%s_%s.IMG-BIN" % (file_id, region.id),
@ -139,7 +140,7 @@ class SbbBinarizeProcessor(Processor):
LOG.warning("Page '%s' contains no text lines", page_id) LOG.warning("Page '%s' contains no text lines", page_id)
for region_id, line in region_line_tuples: for region_id, line in region_line_tuples:
line_image, line_xywh = self.workspace.image_from_segment(line, page_image, page_xywh, feature_filter='binarized') line_image, line_xywh = self.workspace.image_from_segment(line, page_image, page_xywh, feature_filter='binarized')
line_image_bin = cv2pil(binarizer.run(image=pil2cv(line_image))) line_image_bin = cv2pil(self.binarizer.binarize_image(image=pil2cv(line_image)))
line_image_bin_path = self.workspace.save_image_file( line_image_bin_path = self.workspace.save_image_file(
line_image_bin, line_image_bin,
"%s_%s_%s.IMG-BIN" % (file_id, region_id, line.id), "%s_%s_%s.IMG-BIN" % (file_id, region_id, line.id),

@ -1,262 +1,187 @@
""" import argparse
Tool to load model and binarize a given image. import gc
""" import itertools
import math
import os
import sys import sys
from glob import glob from pathlib import Path
from os import environ, devnull from typing import Union, List, Tuple, Any
from os.path import join
from warnings import catch_warnings, simplefilter
import numpy as np
from PIL import Image
import cv2 import cv2
environ['TF_CPP_MIN_LOG_LEVEL'] = '3' import numpy as np
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
stderr = sys.stderr stderr = sys.stderr
sys.stderr = open(devnull, 'w') sys.stderr = open(os.devnull, 'w')
import tensorflow as tf import tensorflow as tf
from tensorflow.keras.models import load_model from tensorflow.python.keras.saving.save import load_model
from tensorflow.python.keras import backend as tensorflow_backend
sys.stderr = stderr sys.stderr = stderr
from mpire import WorkerPool
from mpire.utils import make_single_arguments
import logging
def resize_image(img_in, input_height, input_width):
return cv2.resize(img_in, (input_width, input_height), interpolation=cv2.INTER_NEAREST)
class SbbBinarizer: class SbbBinarizer:
def __init__(self) -> None:
super().__init__()
self.models: List[Tuple[Any, int, int, int]] = []
def load_model(self, model_dir: Union[str, Path]):
model_dir = Path(model_dir)
model_paths = list(model_dir.glob('*.h5')) or list(model_dir.glob('*/'))
for path in model_paths:
model = load_model(str(path.absolute()), compile=False)
height = model.layers[len(model.layers) - 1].output_shape[1]
width = model.layers[len(model.layers) - 1].output_shape[2]
classes = model.layers[len(model.layers) - 1].output_shape[3]
self.models.append((model, height, width, classes))
def binarize_image_file(self, image_path: Path, save_path: Path):
if not image_path.exists():
raise ValueError(f"Image not found: {str(image_path)}")
# noinspection PyUnresolvedReferences
img = cv2.imread(str(image_path))
full_image = self.binarize_image(img)
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
# noinspection PyUnresolvedReferences
cv2.imwrite(str(save_path), full_image)
def binarize_image(self, img: np.ndarray) -> np.ndarray:
img_last = False
for model, model_height, model_width, _ in self.models:
img_res = self.binarize_image_by_model(img, model, model_height, model_width)
img_last = img_last + (img_res == 0)
img_last = (~img_last).astype(np.uint8) * 255
return img_last
def __init__(self, model_dir, logger=None): def binarize_image_by_model(self, img: np.ndarray, model: Any, model_height: int, model_width: int) -> np.ndarray:
self.model_dir = model_dir # Padded images must be multiples of model size
self.log = logger if logger else logging.getLogger('SbbBinarizer') original_image_height, original_image_width, image_channels = img.shape
self.start_new_session() padded_image_height = math.ceil(original_image_height / model_height) * model_height
padded_image_width = math.ceil(original_image_width / model_width) * model_width
self.model_files = glob('%s/*.h5' % self.model_dir) padded_image = np.zeros((padded_image_height, padded_image_width, image_channels))
if not self.model_files: padded_image[0:original_image_height, 0:original_image_width, :] = img[:, :, :]
self.model_files = glob('%s/*/' % self.model_dir)
if not self.model_files: image_batch = np.expand_dims(padded_image, 0) # Create the batch dimension
raise ValueError(f"No models found in {self.model_dir}") patches = tf.image.extract_patches(
images=image_batch,
self.models = [] sizes=[1, model_height, model_width, 1],
for model_file in self.model_files: strides=[1, model_height, model_width, 1],
self.models.append(self.load_model(model_file)) rates=[1, 1, 1, 1],
padding='SAME'
def start_new_session(self): )
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True number_of_horizontal_patches = patches.shape[1]
number_of_vertical_patches = patches.shape[2]
self.session = tf.compat.v1.Session(config=config) # tf.InteractiveSession() total_number_of_patches = number_of_horizontal_patches * number_of_vertical_patches
tensorflow_backend.set_session(self.session) target_shape = (total_number_of_patches, model_height, model_width, image_channels)
# Squeeze all image patches (n, m, width, height, channels) into a single big batch (b, width, height, channels)
def end_session(self): image_patches = tf.reshape(patches, target_shape)
tensorflow_backend.clear_session() # Normalize the image to values between 0.0 - 1.0
self.session.close() image_patches = image_patches / float(255.0)
del self.session
predicted_patches = model.predict(image_patches, verbose=0)
def load_model(self, model_name): # We have to manually call garbage collection and clear_session here to avoid memory leaks.
model = load_model(model_name, compile=False) # Taken from https://medium.com/dive-into-ml-ai/dealing-with-memory-leak-issue-in-keras-model-training-e703907a6501
model_height = model.layers[len(model.layers)-1].output_shape[1] gc.collect()
model_width = model.layers[len(model.layers)-1].output_shape[2] tf.keras.backend.clear_session()
n_classes = model.layers[len(model.layers)-1].output_shape[3]
return model, model_height, model_width, n_classes # The result is a white-on-black image that needs to be inverted to be displayed as black-on-white image
# We do this by converting the binary values to a boolean numpy-array and then inverting the values
def predict(self, model_in, img): black_on_white_patches = np.invert(np.argmax(predicted_patches, axis=3).astype(bool))
tensorflow_backend.set_session(self.session) # cv2 can't export a boolean numpy array into a black-and-white PNG image, so we have to convert it to uint8 (grayscale) values
model, model_height, model_width, n_classes = model_in grayscale_patches = black_on_white_patches.astype(np.uint8) * 255
full_image_with_padding = self._patches_to_image(
img_org_h = img.shape[0] grayscale_patches,
img_org_w = img.shape[1] padded_image_height,
padded_image_width,
if img.shape[0] < model_height and img.shape[1] >= model_width: model_height,
img_padded = np.zeros(( model_height, img.shape[1], img.shape[2] )) model_width
)
index_start_h = int( abs( img.shape[0] - model_height) /2.) full_image = full_image_with_padding[0:original_image_height, 0:original_image_width]
index_start_w = 0 return full_image
img_padded [ index_start_h: index_start_h+img.shape[0], :, : ] = img[:,:,:] def _patches_to_image(self, patches: np.ndarray, image_height: int, image_width: int, patch_height: int, patch_width: int):
height = math.ceil(image_height / patch_height) * patch_height
elif img.shape[0] >= model_height and img.shape[1] < model_width: width = math.ceil(image_width / patch_width) * patch_width
img_padded = np.zeros(( img.shape[0], model_width, img.shape[2] ))
image_reshaped = np.reshape(
index_start_h = 0 np.squeeze(patches),
index_start_w = int( abs( img.shape[1] - model_width) /2.) [height // patch_height, width // patch_width, patch_height, patch_width]
)
img_padded [ :, index_start_w: index_start_w+img.shape[1], : ] = img[:,:,:] image_transposed = np.transpose(a=image_reshaped, axes=[0, 2, 1, 3])
image_resized = np.reshape(image_transposed, [height, width])
return image_resized
elif img.shape[0] < model_height and img.shape[1] < model_width:
img_padded = np.zeros(( model_height, model_width, img.shape[2] ))
def split_list_into_worker_batches(files: List[Any], number_of_workers: int) -> List[List[Any]]:
index_start_h = int( abs( img.shape[0] - model_height) /2.) """ Splits any given list into batches for the specified number of workers and returns a list of lists. """
index_start_w = int( abs( img.shape[1] - model_width) /2.) batches = []
batch_size = math.ceil(len(files) / number_of_workers)
img_padded [ index_start_h: index_start_h+img.shape[0], index_start_w: index_start_w+img.shape[1], : ] = img[:,:,:] batch_start = 0
for i in range(1, number_of_workers + 1):
else: batch_end = i * batch_size
index_start_h = 0 file_batch_to_delete = files[batch_start: batch_end]
index_start_w = 0 batches.append(file_batch_to_delete)
img_padded = np.copy(img) batch_start = batch_end
return batches
img = np.copy(img_padded)
def batch_predict(input_data):
margin = int(0.1 * model_width) model_dir, input_images, output_images, worker_number = input_data
print(f"Setting visible cuda devices to {str(worker_number)}")
width_mid = model_width - 2 * margin # Each worker thread will be assigned only one of the available GPUs to allow multiprocessing across GPUs
height_mid = model_height - 2 * margin os.environ["CUDA_VISIBLE_DEVICES"] = str(worker_number)
binarizer = SbbBinarizer()
img = img / float(255.0) binarizer.load_model(model_dir)
img_h = img.shape[0] for image_path, output_path in zip(input_images, output_images):
img_w = img.shape[1] binarizer.binarize_image(image_path=image_path, save_path=output_path)
print(f"Binarized {image_path}")
prediction_true = np.zeros((img_h, img_w, 3))
mask_true = np.zeros((img_h, img_w))
nxf = img_w / float(width_mid) if __name__ == '__main__':
nyf = img_h / float(height_mid) parser = argparse.ArgumentParser()
parser.add_argument('-m', '--model_dir', default="model_2021_03_09", help="Path to the directory where the TF model resides or path to an h5 file.")
if nxf > int(nxf): parser.add_argument('-i', '--input-path', required=True)
nxf = int(nxf) + 1 parser.add_argument('-o', '--output-path', required=True)
else: args = parser.parse_args()
nxf = int(nxf)
input_path = Path(args.input_path)
if nyf > int(nyf): output_path = Path(args.output_path)
nyf = int(nyf) + 1 model_directory = args.model_dir
else:
nyf = int(nyf) if input_path.is_dir():
print(f"Enumerating all PNG files in {str(input_path)}")
for i in range(nxf): all_input_images = list(input_path.rglob("*.png"))
for j in range(nyf): print(f"Filtering images that have already been binarized in {str(output_path)}")
input_images = [i for i in all_input_images if not (output_path / (i.relative_to(input_path))).exists()]
if i == 0: output_images = [output_path / (i.relative_to(input_path)) for i in input_images]
index_x_d = i * width_mid input_images = [i for i in input_images]
index_x_u = index_x_d + model_width
elif i > 0: print(f"Starting batch-binarization of {len(input_images)} images")
index_x_d = i * width_mid
index_x_u = index_x_d + model_width number_of_gpus = len(tf.config.list_physical_devices('GPU'))
number_of_workers = max(1, number_of_gpus)
if j == 0: image_batches = split_list_into_worker_batches(input_images, number_of_workers)
index_y_d = j * height_mid output_batches = split_list_into_worker_batches(output_images, number_of_workers)
index_y_u = index_y_d + model_height
elif j > 0: # Must use spawn to create completely new process that has its own resources to properly multiprocess across GPUs
index_y_d = j * height_mid with WorkerPool(n_jobs=number_of_workers, start_method='spawn') as pool:
index_y_u = index_y_d + model_height model_dirs = itertools.repeat(model_directory, len(image_batches))
input_data = zip(model_dirs, image_batches, output_batches, range(number_of_workers))
if index_x_u > img_w: contents = pool.map_unordered(
index_x_u = img_w batch_predict,
index_x_d = img_w - model_width make_single_arguments(input_data),
if index_y_u > img_h: iterable_len=number_of_workers,
index_y_u = img_h progress_bar=False
index_y_d = img_h - model_height )
img_patch = img[index_y_d:index_y_u, index_x_d:index_x_u, :]
label_p_pred = model.predict(img_patch.reshape(1, img_patch.shape[0], img_patch.shape[1], img_patch.shape[2]))
seg = np.argmax(label_p_pred, axis=3)[0]
seg_color = np.repeat(seg[:, :, np.newaxis], 3, axis=2)
if i == 0 and j == 0:
seg_color = seg_color[0:seg_color.shape[0] - margin, 0:seg_color.shape[1] - margin, :]
seg = seg[0:seg.shape[0] - margin, 0:seg.shape[1] - margin]
mask_true[index_y_d + 0:index_y_u - margin, index_x_d + 0:index_x_u - margin] = seg
prediction_true[index_y_d + 0:index_y_u - margin, index_x_d + 0:index_x_u - margin, :] = seg_color
elif i == nxf-1 and j == nyf-1:
seg_color = seg_color[margin:seg_color.shape[0] - 0, margin:seg_color.shape[1] - 0, :]
seg = seg[margin:seg.shape[0] - 0, margin:seg.shape[1] - 0]
mask_true[index_y_d + margin:index_y_u - 0, index_x_d + margin:index_x_u - 0] = seg
prediction_true[index_y_d + margin:index_y_u - 0, index_x_d + margin:index_x_u - 0, :] = seg_color
elif i == 0 and j == nyf-1:
seg_color = seg_color[margin:seg_color.shape[0] - 0, 0:seg_color.shape[1] - margin, :]
seg = seg[margin:seg.shape[0] - 0, 0:seg.shape[1] - margin]
mask_true[index_y_d + margin:index_y_u - 0, index_x_d + 0:index_x_u - margin] = seg
prediction_true[index_y_d + margin:index_y_u - 0, index_x_d + 0:index_x_u - margin, :] = seg_color
elif i == nxf-1 and j == 0:
seg_color = seg_color[0:seg_color.shape[0] - margin, margin:seg_color.shape[1] - 0, :]
seg = seg[0:seg.shape[0] - margin, margin:seg.shape[1] - 0]
mask_true[index_y_d + 0:index_y_u - margin, index_x_d + margin:index_x_u - 0] = seg
prediction_true[index_y_d + 0:index_y_u - margin, index_x_d + margin:index_x_u - 0, :] = seg_color
elif i == 0 and j != 0 and j != nyf-1:
seg_color = seg_color[margin:seg_color.shape[0] - margin, 0:seg_color.shape[1] - margin, :]
seg = seg[margin:seg.shape[0] - margin, 0:seg.shape[1] - margin]
mask_true[index_y_d + margin:index_y_u - margin, index_x_d + 0:index_x_u - margin] = seg
prediction_true[index_y_d + margin:index_y_u - margin, index_x_d + 0:index_x_u - margin, :] = seg_color
elif i == nxf-1 and j != 0 and j != nyf-1:
seg_color = seg_color[margin:seg_color.shape[0] - margin, margin:seg_color.shape[1] - 0, :]
seg = seg[margin:seg.shape[0] - margin, margin:seg.shape[1] - 0]
mask_true[index_y_d + margin:index_y_u - margin, index_x_d + margin:index_x_u - 0] = seg
prediction_true[index_y_d + margin:index_y_u - margin, index_x_d + margin:index_x_u - 0, :] = seg_color
elif i != 0 and i != nxf-1 and j == 0:
seg_color = seg_color[0:seg_color.shape[0] - margin, margin:seg_color.shape[1] - margin, :]
seg = seg[0:seg.shape[0] - margin, margin:seg.shape[1] - margin]
mask_true[index_y_d + 0:index_y_u - margin, index_x_d + margin:index_x_u - margin] = seg
prediction_true[index_y_d + 0:index_y_u - margin, index_x_d + margin:index_x_u - margin, :] = seg_color
elif i != 0 and i != nxf-1 and j == nyf-1:
seg_color = seg_color[margin:seg_color.shape[0] - 0, margin:seg_color.shape[1] - margin, :]
seg = seg[margin:seg.shape[0] - 0, margin:seg.shape[1] - margin]
mask_true[index_y_d + margin:index_y_u - 0, index_x_d + margin:index_x_u - margin] = seg
prediction_true[index_y_d + margin:index_y_u - 0, index_x_d + margin:index_x_u - margin, :] = seg_color
else: else:
seg_color = seg_color[margin:seg_color.shape[0] - margin, margin:seg_color.shape[1] - margin, :] binarizer = SbbBinarizer()
seg = seg[margin:seg.shape[0] - margin, margin:seg.shape[1] - margin] binarizer.load_model(model_directory)
binarizer.binarize_image(image_path=input_path, save_path=output_path)
mask_true[index_y_d + margin:index_y_u - margin, index_x_d + margin:index_x_u - margin] = seg
prediction_true[index_y_d + margin:index_y_u - margin, index_x_d + margin:index_x_u - margin, :] = seg_color
prediction_true = prediction_true[index_start_h: index_start_h+img_org_h, index_start_w: index_start_w+img_org_w,:]
prediction_true = prediction_true.astype(np.uint8)
return prediction_true[:,:,0]
def run(self, image=None, image_path=None, save=None):
if (image is not None and image_path is not None) or \
(image is None and image_path is None):
raise ValueError("Must pass either a opencv2 image or an image_path")
if image_path is not None:
image = cv2.imread(image_path)
img_last = 0
for n, (model, model_file) in enumerate(zip(self.models, self.model_files)):
self.log.info('Predicting with model %s [%s/%s]' % (model_file, n + 1, len(self.model_files)))
res = self.predict(model, image)
img_fin = np.zeros((res.shape[0], res.shape[1], 3))
res[:, :][res[:, :] == 0] = 2
res = res - 1
res = res * 255
img_fin[:, :, 0] = res
img_fin[:, :, 1] = res
img_fin[:, :, 2] = res
img_fin = img_fin.astype(np.uint8)
img_fin = (res[:, :] == 0) * 255
img_last = img_last + img_fin
kernel = np.ones((5, 5), np.uint8)
img_last[:, :][img_last[:, :] > 0] = 255
img_last = (img_last[:, :] == 0) * 255
if save:
cv2.imwrite(save, img_last)
return img_last

Loading…
Cancel
Save