From 4cd398bd0d5778099fc4d002f4d708285e464251 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Sat, 9 May 2026 04:12:02 +0200 Subject: [PATCH] =?UTF-8?q?standalone=20binarization:=20update,=20simplify?= =?UTF-8?q?=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - re-use Eynollah base class, drop copied code - simplify `run()` and `run_single()` - delegate to `do_prediction()` instead of custom (old) tiling loop - drop `predict()` - add `--device` option to CLI as well --- src/eynollah/cli/cli_binarize.py | 16 +- src/eynollah/image_enhancer.py | 4 +- src/eynollah/ocrd_cli_binarization.py | 19 +- src/eynollah/sbb_binarize.py | 391 ++++---------------------- 4 files changed, 82 insertions(+), 348 deletions(-) diff --git a/src/eynollah/cli/cli_binarize.py b/src/eynollah/cli/cli_binarize.py index aa6cefc..f0e56f5 100644 --- a/src/eynollah/cli/cli_binarize.py +++ b/src/eynollah/cli/cli_binarize.py @@ -1,7 +1,11 @@ import click @click.command() -@click.option('--patches/--no-patches', default=True, help='by enabling this parameter you let the model to see the image in patches.') +@click.option( + '--patches/--no-patches', + default=True, + help='let the model see the image in patches (tiling) instead of total (full).' +) @click.option( "--input-image", "--image", "-i", @@ -27,6 +31,11 @@ import click help="overwrite (instead of skipping) if output xml exists", is_flag=True, ) +@click.option( + "--device", + "-D", + help="placement of computations in predictors for each model type; if none (by default), will try to use first available GPU or fall back to CPU; set string to force using a device (e.g. 'GPU0', 'GPU1' or 'CPU'). Can also be a comma-separated list of model category to device mappings (e.g. 'col_classifier:CPU,page:GPU0,*:GPU1')", +) @click.pass_context def binarize_cli( ctx, @@ -35,15 +44,16 @@ def binarize_cli( dir_in, output, overwrite, + device, ): """ Binarize images with a ML model """ from ..sbb_binarize import SbbBinarizer assert bool(input_image) != bool(dir_in), "Either -i (single input) or -di (directory) must be provided, but not both." - binarizer = SbbBinarizer(model_zoo=ctx.obj.model_zoo) + binarizer = SbbBinarizer(model_zoo=ctx.obj.model_zoo, device=device) binarizer.run( - image_path=input_image, + image_filename=input_image, use_patches=patches, output=output, dir_in=dir_in, diff --git a/src/eynollah/image_enhancer.py b/src/eynollah/image_enhancer.py index 90b980a..fe1e16d 100644 --- a/src/eynollah/image_enhancer.py +++ b/src/eynollah/image_enhancer.py @@ -33,7 +33,7 @@ class Enhancer(Eynollah): self.logger = logging.getLogger('eynollah.enhance') self.model_zoo = model_zoo - self.setup_models() + self.setup_models(device=device) def setup_models(self, device=''): loadable = ['enhancement', 'col_classifier', 'page'] @@ -50,7 +50,7 @@ class Enhancer(Eynollah): ) -> None: image = self.cache_images(image_filename=img_filename, image_pil=img_pil) - output_filename = os.path.join(dir_out or "", image['name'] +'.png') + output_filename = os.path.join(dir_out or "", image['name'] + '.png') if os.path.exists(output_filename): if overwrite: diff --git a/src/eynollah/ocrd_cli_binarization.py b/src/eynollah/ocrd_cli_binarization.py index e9059df..a0667c5 100644 --- a/src/eynollah/ocrd_cli_binarization.py +++ b/src/eynollah/ocrd_cli_binarization.py @@ -14,17 +14,9 @@ from ocrd.decorators import ocrd_cli_options, ocrd_cli_wrap_processor from eynollah.model_zoo.model_zoo import EynollahModelZoo from .sbb_binarize import SbbBinarizer +from .utils.pil_cv2 import cv2pil -def cv2pil(img): - return Image.fromarray(img.astype('uint8')) - -def pil2cv(img): - # from ocrd/workspace.py - color_conversion = cv2.COLOR_GRAY2BGR if img.mode in ('1', 'L') else cv2.COLOR_RGB2BGR - pil_as_np_array = np.array(img).astype('uint8') if img.mode == '1' else np.array(img) - return cv2.cvtColor(pil_as_np_array, color_conversion) - class SbbBinarizeProcessor(Processor): # already employs GPU (without singleton process atm) max_workers = 1 @@ -75,7 +67,8 @@ class SbbBinarizeProcessor(Processor): if oplevel == 'page': self.logger.info("Binarizing on 'page' level in page '%s'", page_id) - page_image_bin = cv2pil(self.binarizer.run_single(image=pil2cv(page_image), use_patches=True)) + page_image_bin = cv2pil(self.binarizer.run_single("", img_pil=page_image, + use_patches=True)) # update PAGE (reference the image file): page_image_ref = AlternativeImageType(comments=page_xywh['features'] + ',binarized,clipped') page.add_AlternativeImage(page_image_ref) @@ -88,7 +81,8 @@ class SbbBinarizeProcessor(Processor): for region in regions: region_image, region_xywh = self.workspace.image_from_segment( region, page_image, page_xywh, feature_filter='binarized') - region_image_bin = cv2pil(self.binarizer.run_single(image=pil2cv(region_image), use_patches=True)) + region_image_bin = cv2pil(self.binarizer.run_single("", img_pil=region_image, + use_patches=True)) # update PAGE (reference the image file): region_image_ref = AlternativeImageType(comments=region_xywh['features'] + ',binarized') region.add_AlternativeImage(region_image_ref) @@ -100,7 +94,8 @@ class SbbBinarizeProcessor(Processor): self.logger.warning("Page '%s' contains no text lines", page_id) for line in lines: line_image, line_xywh = self.workspace.image_from_segment(line, page_image, page_xywh, feature_filter='binarized') - line_image_bin = cv2pil(self.binarizer.run_single(image=pil2cv(line_image), use_patches=True)) + line_image_bin = cv2pil(self.binarizer.run_single("", img_pil=line_image, + use_patches=True)) # update PAGE (reference the image file): line_image_ref = AlternativeImageType(comments=line_xywh['features'] + ',binarized') line.add_AlternativeImage(line_image_ref) diff --git a/src/eynollah/sbb_binarize.py b/src/eynollah/sbb_binarize.py index fe044c9..9b154a8 100644 --- a/src/eynollah/sbb_binarize.py +++ b/src/eynollah/sbb_binarize.py @@ -15,348 +15,77 @@ from typing import Optional import numpy as np import cv2 -os.environ['TF_USE_LEGACY_KERAS'] = '1' # avoid Keras 3 after TF 2.15 -from ocrd_utils import tf_disable_interactive_logs -tf_disable_interactive_logs() -import tensorflow as tf - +from .eynollah import Eynollah from .model_zoo import EynollahModelZoo +from .utils.resize import resize_image from .utils import is_image_filename -def resize_image(img_in, input_height, input_width): - return cv2.resize(img_in, (input_width, input_height), interpolation=cv2.INTER_NEAREST) - -class SbbBinarizer: +class SbbBinarizer(Eynollah): def __init__( - self, - *, - model_zoo: EynollahModelZoo, - logger: Optional[logging.Logger] = None, + self, + *, + model_zoo: EynollahModelZoo, + logger: Optional[logging.Logger] = None, + device: str = '', ): self.logger = logger if logger else logging.getLogger('eynollah.binarization') - try: - for device in tf.config.list_physical_devices('GPU'): - tf.config.experimental.set_memory_growth(device, True) - except: - self.logger.warning("no GPU device available") - self.models = (model_zoo.model_path('binarization'), model_zoo.load_model('binarization')) - self.logger.info('Loaded model %s [%s]', self.models[1], self.models[0]) + self.model_zoo = model_zoo + self.setup_models(device=device) - def predict(self, model, img, use_patches, n_batch_inference=5): - model_height = model.layers[len(model.layers)-1].output_shape[1] - model_width = model.layers[len(model.layers)-1].output_shape[2] - - img_org_h = img.shape[0] - img_org_w = img.shape[1] - - if img.shape[0] < model_height and img.shape[1] >= model_width: - img_padded = np.zeros(( model_height, img.shape[1], img.shape[2] )) - - index_start_h = int( abs( img.shape[0] - model_height) /2.) - index_start_w = 0 - - img_padded [ index_start_h: index_start_h+img.shape[0], :, : ] = img[:,:,:] - - elif img.shape[0] >= model_height and img.shape[1] < model_width: - img_padded = np.zeros(( img.shape[0], model_width, img.shape[2] )) - - index_start_h = 0 - index_start_w = int( abs( img.shape[1] - model_width) /2.) - - img_padded [ :, index_start_w: index_start_w+img.shape[1], : ] = img[:,:,:] - - - elif img.shape[0] < model_height and img.shape[1] < model_width: - img_padded = np.zeros(( model_height, model_width, img.shape[2] )) - - index_start_h = int( abs( img.shape[0] - model_height) /2.) - index_start_w = int( abs( img.shape[1] - model_width) /2.) - - img_padded [ index_start_h: index_start_h+img.shape[0], index_start_w: index_start_w+img.shape[1], : ] = img[:,:,:] - + def setup_models(self, device=''): + loadable = ['binarization'] + self.model_zoo.load_models(*loadable, device=device) + for model in loadable: + self.logger.debug("model %s has input shape %s", model, + self.model_zoo.get(model).input_shape) + + def run(self, + image=None, + image_filename=None, + output=None, + use_patches=False, + dir_in=None, + overwrite=False + ): + """ + Binarize the scanned images + """ + if dir_in: + ls_imgs = [(os.path.join(dir_in, image_filename), + os.path.join(output, Path(image_filename).stem + '.png')) + for image_filename in filter(is_image_filename, + os.listdir(dir_in))] + elif image_filename: + ls_imgs = [(image_filename, output)] else: - index_start_h = 0 - index_start_w = 0 - img_padded = np.copy(img) - - - img = np.copy(img_padded) - - + raise ValueError("run requires either a single image filename or a directory") - if use_patches: + for img_filename, output_filename in ls_imgs: + self.logger.info(img_filename) - margin = int(0.1 * model_width) + if os.path.exists(output_filename): + if overwrite: + self.logger.warning("will overwrite existing output file '%s'", output_filename) + else: + self.logger.warning("will skip input for existing output file '%s'", output_filename) + continue - width_mid = model_width - 2 * margin - height_mid = model_height - 2 * margin + img_res = self.run_single(img_filename, + use_patches=use_patches) + cv2.imwrite(output_filename, img_res) + self.logger.info("output filename: '%s'", output_filename) - img = img / float(255.0) - - img_h = img.shape[0] - img_w = img.shape[1] - - prediction_true = np.zeros((img_h, img_w, 3)) - mask_true = np.zeros((img_h, img_w)) - nxf = img_w / float(width_mid) - nyf = img_h / float(height_mid) - - if nxf > int(nxf): - nxf = int(nxf) + 1 - else: - nxf = int(nxf) - - if nyf > int(nyf): - nyf = int(nyf) + 1 - else: - nyf = int(nyf) - - - list_i_s = [] - list_j_s = [] - list_x_u = [] - list_x_d = [] - list_y_u = [] - list_y_d = [] - - batch_indexer = 0 - - img_patch = np.zeros((n_batch_inference, model_height, model_width,3)) - - for i in range(nxf): - for j in range(nyf): - - if i == 0: - index_x_d = i * width_mid - index_x_u = index_x_d + model_width - elif i > 0: - index_x_d = i * width_mid - index_x_u = index_x_d + model_width - - if j == 0: - index_y_d = j * height_mid - index_y_u = index_y_d + model_height - elif j > 0: - index_y_d = j * height_mid - index_y_u = index_y_d + model_height - - if index_x_u > img_w: - index_x_u = img_w - index_x_d = img_w - model_width - if index_y_u > img_h: - index_y_u = img_h - index_y_d = img_h - model_height - - - list_i_s.append(i) - list_j_s.append(j) - list_x_u.append(index_x_u) - list_x_d.append(index_x_d) - list_y_d.append(index_y_d) - list_y_u.append(index_y_u) - - - img_patch[batch_indexer,:,:,:] = img[index_y_d:index_y_u, index_x_d:index_x_u, :] - - batch_indexer = batch_indexer + 1 - - - - if batch_indexer == n_batch_inference: - - label_p_pred = model.predict(img_patch,verbose=0) - - seg = np.argmax(label_p_pred, axis=3) - - #print(seg.shape, len(seg), len(list_i_s)) - - indexer_inside_batch = 0 - for i_batch, j_batch in zip(list_i_s, list_j_s): - seg_in = seg[indexer_inside_batch,:,:] - seg_color = np.repeat(seg_in[:, :, np.newaxis], 3, axis=2) - - index_y_u_in = list_y_u[indexer_inside_batch] - index_y_d_in = list_y_d[indexer_inside_batch] - - index_x_u_in = list_x_u[indexer_inside_batch] - index_x_d_in = list_x_d[indexer_inside_batch] - - if i_batch == 0 and j_batch == 0: - seg_color = seg_color[0 : seg_color.shape[0] - margin, 0 : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + 0 : index_y_u_in - margin, index_x_d_in + 0 : index_x_u_in - margin, :] = seg_color - elif i_batch == nxf - 1 and j_batch == nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - 0, margin : seg_color.shape[1] - 0, :] - prediction_true[index_y_d_in + margin : index_y_u_in - 0, index_x_d_in + margin : index_x_u_in - 0, :] = seg_color - elif i_batch == 0 and j_batch == nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - 0, 0 : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + margin : index_y_u_in - 0, index_x_d_in + 0 : index_x_u_in - margin, :] = seg_color - elif i_batch == nxf - 1 and j_batch == 0: - seg_color = seg_color[0 : seg_color.shape[0] - margin, margin : seg_color.shape[1] - 0, :] - prediction_true[index_y_d_in + 0 : index_y_u_in - margin, index_x_d_in + margin : index_x_u_in - 0, :] = seg_color - elif i_batch == 0 and j_batch != 0 and j_batch != nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - margin, 0 : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + margin : index_y_u_in - margin, index_x_d_in + 0 : index_x_u_in - margin, :] = seg_color - elif i_batch == nxf - 1 and j_batch != 0 and j_batch != nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - margin, margin : seg_color.shape[1] - 0, :] - prediction_true[index_y_d_in + margin : index_y_u_in - margin, index_x_d_in + margin : index_x_u_in - 0, :] = seg_color - elif i_batch != 0 and i_batch != nxf - 1 and j_batch == 0: - seg_color = seg_color[0 : seg_color.shape[0] - margin, margin : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + 0 : index_y_u_in - margin, index_x_d_in + margin : index_x_u_in - margin, :] = seg_color - elif i_batch != 0 and i_batch != nxf - 1 and j_batch == nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - 0, margin : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + margin : index_y_u_in - 0, index_x_d_in + margin : index_x_u_in - margin, :] = seg_color - else: - seg_color = seg_color[margin : seg_color.shape[0] - margin, margin : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + margin : index_y_u_in - margin, index_x_d_in + margin : index_x_u_in - margin, :] = seg_color - - indexer_inside_batch = indexer_inside_batch +1 - - - list_i_s = [] - list_j_s = [] - list_x_u = [] - list_x_d = [] - list_y_u = [] - list_y_d = [] - - batch_indexer = 0 - - img_patch = np.zeros((n_batch_inference, model_height, model_width,3)) - - elif i==(nxf-1) and j==(nyf-1): - label_p_pred = model.predict(img_patch,verbose=0) - - seg = np.argmax(label_p_pred, axis=3) - - #print(seg.shape, len(seg), len(list_i_s)) - - indexer_inside_batch = 0 - for i_batch, j_batch in zip(list_i_s, list_j_s): - seg_in = seg[indexer_inside_batch,:,:] - seg_color = np.repeat(seg_in[:, :, np.newaxis], 3, axis=2) - - index_y_u_in = list_y_u[indexer_inside_batch] - index_y_d_in = list_y_d[indexer_inside_batch] - - index_x_u_in = list_x_u[indexer_inside_batch] - index_x_d_in = list_x_d[indexer_inside_batch] - - if i_batch == 0 and j_batch == 0: - seg_color = seg_color[0 : seg_color.shape[0] - margin, 0 : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + 0 : index_y_u_in - margin, index_x_d_in + 0 : index_x_u_in - margin, :] = seg_color - elif i_batch == nxf - 1 and j_batch == nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - 0, margin : seg_color.shape[1] - 0, :] - prediction_true[index_y_d_in + margin : index_y_u_in - 0, index_x_d_in + margin : index_x_u_in - 0, :] = seg_color - elif i_batch == 0 and j_batch == nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - 0, 0 : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + margin : index_y_u_in - 0, index_x_d_in + 0 : index_x_u_in - margin, :] = seg_color - elif i_batch == nxf - 1 and j_batch == 0: - seg_color = seg_color[0 : seg_color.shape[0] - margin, margin : seg_color.shape[1] - 0, :] - prediction_true[index_y_d_in + 0 : index_y_u_in - margin, index_x_d_in + margin : index_x_u_in - 0, :] = seg_color - elif i_batch == 0 and j_batch != 0 and j_batch != nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - margin, 0 : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + margin : index_y_u_in - margin, index_x_d_in + 0 : index_x_u_in - margin, :] = seg_color - elif i_batch == nxf - 1 and j_batch != 0 and j_batch != nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - margin, margin : seg_color.shape[1] - 0, :] - prediction_true[index_y_d_in + margin : index_y_u_in - margin, index_x_d_in + margin : index_x_u_in - 0, :] = seg_color - elif i_batch != 0 and i_batch != nxf - 1 and j_batch == 0: - seg_color = seg_color[0 : seg_color.shape[0] - margin, margin : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + 0 : index_y_u_in - margin, index_x_d_in + margin : index_x_u_in - margin, :] = seg_color - elif i_batch != 0 and i_batch != nxf - 1 and j_batch == nyf - 1: - seg_color = seg_color[margin : seg_color.shape[0] - 0, margin : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + margin : index_y_u_in - 0, index_x_d_in + margin : index_x_u_in - margin, :] = seg_color - else: - seg_color = seg_color[margin : seg_color.shape[0] - margin, margin : seg_color.shape[1] - margin, :] - prediction_true[index_y_d_in + margin : index_y_u_in - margin, index_x_d_in + margin : index_x_u_in - margin, :] = seg_color - - indexer_inside_batch = indexer_inside_batch +1 - - - list_i_s = [] - list_j_s = [] - list_x_u = [] - list_x_d = [] - list_y_u = [] - list_y_d = [] - - batch_indexer = 0 - - img_patch = np.zeros((n_batch_inference, model_height, model_width,3)) - - - - prediction_true = prediction_true[index_start_h: index_start_h+img_org_h, index_start_w: index_start_w+img_org_w,:] - prediction_true = prediction_true.astype(np.uint8) - - else: - img_h_page = img.shape[0] - img_w_page = img.shape[1] - img = img / float(255.0) - img = resize_image(img, model_height, model_width) - - label_p_pred = model.predict(img.reshape(1, img.shape[0], img.shape[1], img.shape[2])) - - seg = np.argmax(label_p_pred, axis=3)[0] - seg_color = np.repeat(seg[:, :, np.newaxis], 3, axis=2) - prediction_true = resize_image(seg_color, img_h_page, img_w_page) - prediction_true = prediction_true.astype(np.uint8) - return prediction_true[:,:,0] - - def run(self, image=None, image_path=None, output=None, use_patches=False, dir_in=None, overwrite=False): - if not dir_in: - if (image is None) == (image_path is None): - raise ValueError("Must pass either a opencv2 image or an image_path") - if image_path is not None: - image = cv2.imread(image_path) - img_last = self.run_single(image, use_patches) - if output: - if os.path.exists(output): - if overwrite: - self.logger.warning("will overwrite existing output file '%s'", output) - else: - self.logger.warning("output file already exists '%s'", output) - return img_last - self.logger.info('Writing binarized image to %s', output) - cv2.imwrite(output, img_last) - return img_last - else: - ls_imgs = list(filter(is_image_filename, os.listdir(dir_in))) - self.logger.info("Found %d image files to binarize in %s", len(ls_imgs), dir_in) - for i, image_path in enumerate(ls_imgs): - image_stem = os.path.splitext(image_path)[0] - output_path = os.path.join(output, image_stem + '.png') - if os.path.exists(output_path): - if overwrite: - self.logger.warning("will overwrite existing output file '%s'", output_path) - else: - self.logger.warning("will skip input for existing output file '%s'", output_path) - continue - self.logger.info('Binarizing [%3d/%d] %s', i + 1, len(ls_imgs), image_path) - image = cv2.imread(os.path.join(dir_in, image_path)) - img_last = self.run_single(image, use_patches) - self.logger.info('Writing binarized image to %s', output_path) - cv2.imwrite(output_path, img_last) - - def run_single(self, image: np.ndarray, use_patches=False): - img_last = 0 - model_file, model = self.models - res = self.predict(model, image, use_patches) - - img_fin = np.zeros((res.shape[0], res.shape[1], 3)) - res[:, :][res[:, :] == 0] = 2 - res = res - 1 - res = res * 255 - img_fin[:, :, 0] = res - img_fin[:, :, 1] = res - img_fin[:, :, 2] = res - - img_fin = img_fin.astype(np.uint8) - img_fin = (res[:, :] == 0) * 255 - img_last = img_last + img_fin - - kernel = np.ones((5, 5), np.uint8) - img_last[:, :][img_last[:, :] > 0] = 255 - img_last = (img_last[:, :] == 0) * 255 - return img_last + def run_single(self, + img_filename: str, + img_pil=None, + use_patches: bool = False, + ): + image = self.cache_images(image_filename=img_filename, image_pil=img_pil) + img = self.imread(image) + img_bin = self.do_prediction(use_patches, img, self.model_zoo.get("binarization"), + n_batch_inference=5) + img_bin = 255 * (img_bin == 0).astype(np.uint8) + #img_bin = np.repeat(img_bin[:, :, np.newaxis], 3, axis=2).astype(np.uint8) + return img_bin