run: add QueueListener to pool / QueueHandler to workers…

- set up a Queue and QueueListener along with ProcessPoolExecutor,
  delegating messages from the queue to all handlers
- in forked subprocesses, instead of just inheriting handlers,
  replace them with a single QueueHandler, and make sure
  log messages get prefixes by the respective job id (img_filename)
  so concurrent messages will still be readable
- in the predictor, make sure to pass on the log level to the
  spawned subprocess, too
This commit is contained in:
Robert Sachunsky 2026-03-14 00:43:58 +01:00
parent b7aa1d24cc
commit ec08004fb0
3 changed files with 19 additions and 4 deletions

View file

@ -17,6 +17,7 @@ document layout analysis (segmentation) with output in PAGE-XML
# pyright: reportOptionalSubscript=false
import logging
import logging.handlers
import sys
from difflib import SequenceMatcher as sq
@ -99,10 +100,21 @@ KERNEL = np.ones((5, 5), np.uint8)
_instance = None
def _set_instance(instance):
def _set_instance(instance, logq):
global _instance
_instance = instance
# replace all inherited handlers with queue handler
logging.root.handlers.clear()
instance.logger.handlers.clear()
handler = logging.handlers.QueueHandler(logq)
handler.setFormatter(logging.Formatter(fmt="[%(jobId)s] %(message)s"))
logging.root.addHandler(handler)
def _run_single(*args, **kwargs):
# prefix log messages with img_filename
def log_filter(record: logging.LogRecord):
record.jobId = args[0]
return record
logging.root.handlers[0].filters = [log_filter]
return _instance.run_single(*args, **kwargs)
class Eynollah:
@ -2265,11 +2277,14 @@ class Eynollah:
ls_imgs = [os.path.join(dir_in, image_filename)
for image_filename in filter(is_image_filename,
os.listdir(dir_in))]
logq = mp.get_context('fork').Queue()
with ProcessPoolExecutor(max_workers=num_jobs or None,
mp_context=mp.get_context('fork'),
initializer=_set_instance,
initargs=(self,)
initargs=(self, logq)
) as exe:
logging.handlers.QueueListener(logq, *self.logger.handlers,
respect_handler_level=False).start()
jobs = {exe.submit(_run_single, img_filename,
dir_out=dir_out,
overwrite=overwrite): img_filename

View file

@ -36,6 +36,7 @@ class Predictor(mp.context.SpawnProcess):
def __init__(self, logger, model_zoo):
self.logger = logger
self.loglevel = logger.level
self.model_zoo = model_zoo
ctxt = mp.get_context('spawn')
self.taskq = ctxt.Queue(maxsize=QSIZE)
@ -147,6 +148,7 @@ class Predictor(mp.context.SpawnProcess):
def setup(self):
logging.root.handlers = [logging.handlers.QueueHandler(self.logq)]
self.logger.setLevel(self.loglevel)
self.model_zoo.load_models(*self.loadable)
def shutdown(self):

View file

@ -44,7 +44,6 @@ class EynollahXmlWriter:
return Path(Path(self.image_filename).name).stem
def calculate_points(self, contour, offset=None):
self.logger.debug('enter calculate_points')
poly = contour2polygon(contour)
if offset is not None:
poly = affinity.translate(poly, *offset)
@ -53,7 +52,6 @@ class EynollahXmlWriter:
return points_from_polygon(poly.exterior.coords[:-1])
def serialize_lines_in_region(self, text_region, all_found_textline_polygons, region_idx, page_coord, all_box_coord, slopes, counter, ocr_all_textlines_textregion):
self.logger.debug('enter serialize_lines_in_region')
for j, polygon_textline in enumerate(all_found_textline_polygons[region_idx]):
coords = CoordsType()
textline = TextLineType(id=counter.next_line_id, Coords=coords)