mirror of
https://github.com/qurator-spk/eynollah.git
synced 2026-04-14 19:31:57 +02:00
run: add QueueListener to pool / QueueHandler to workers…
- set up a Queue and QueueListener along with ProcessPoolExecutor, delegating messages from the queue to all handlers - in forked subprocesses, instead of just inheriting handlers, replace them with a single QueueHandler, and make sure log messages get prefixes by the respective job id (img_filename) so concurrent messages will still be readable - in the predictor, make sure to pass on the log level to the spawned subprocess, too
This commit is contained in:
parent
b7aa1d24cc
commit
ec08004fb0
3 changed files with 19 additions and 4 deletions
|
|
@ -17,6 +17,7 @@ document layout analysis (segmentation) with output in PAGE-XML
|
||||||
# pyright: reportOptionalSubscript=false
|
# pyright: reportOptionalSubscript=false
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import logging.handlers
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from difflib import SequenceMatcher as sq
|
from difflib import SequenceMatcher as sq
|
||||||
|
|
@ -99,10 +100,21 @@ KERNEL = np.ones((5, 5), np.uint8)
|
||||||
|
|
||||||
|
|
||||||
_instance = None
|
_instance = None
|
||||||
def _set_instance(instance):
|
def _set_instance(instance, logq):
|
||||||
global _instance
|
global _instance
|
||||||
_instance = instance
|
_instance = instance
|
||||||
|
# replace all inherited handlers with queue handler
|
||||||
|
logging.root.handlers.clear()
|
||||||
|
instance.logger.handlers.clear()
|
||||||
|
handler = logging.handlers.QueueHandler(logq)
|
||||||
|
handler.setFormatter(logging.Formatter(fmt="[%(jobId)s] %(message)s"))
|
||||||
|
logging.root.addHandler(handler)
|
||||||
def _run_single(*args, **kwargs):
|
def _run_single(*args, **kwargs):
|
||||||
|
# prefix log messages with img_filename
|
||||||
|
def log_filter(record: logging.LogRecord):
|
||||||
|
record.jobId = args[0]
|
||||||
|
return record
|
||||||
|
logging.root.handlers[0].filters = [log_filter]
|
||||||
return _instance.run_single(*args, **kwargs)
|
return _instance.run_single(*args, **kwargs)
|
||||||
|
|
||||||
class Eynollah:
|
class Eynollah:
|
||||||
|
|
@ -2265,11 +2277,14 @@ class Eynollah:
|
||||||
ls_imgs = [os.path.join(dir_in, image_filename)
|
ls_imgs = [os.path.join(dir_in, image_filename)
|
||||||
for image_filename in filter(is_image_filename,
|
for image_filename in filter(is_image_filename,
|
||||||
os.listdir(dir_in))]
|
os.listdir(dir_in))]
|
||||||
|
logq = mp.get_context('fork').Queue()
|
||||||
with ProcessPoolExecutor(max_workers=num_jobs or None,
|
with ProcessPoolExecutor(max_workers=num_jobs or None,
|
||||||
mp_context=mp.get_context('fork'),
|
mp_context=mp.get_context('fork'),
|
||||||
initializer=_set_instance,
|
initializer=_set_instance,
|
||||||
initargs=(self,)
|
initargs=(self, logq)
|
||||||
) as exe:
|
) as exe:
|
||||||
|
logging.handlers.QueueListener(logq, *self.logger.handlers,
|
||||||
|
respect_handler_level=False).start()
|
||||||
jobs = {exe.submit(_run_single, img_filename,
|
jobs = {exe.submit(_run_single, img_filename,
|
||||||
dir_out=dir_out,
|
dir_out=dir_out,
|
||||||
overwrite=overwrite): img_filename
|
overwrite=overwrite): img_filename
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ class Predictor(mp.context.SpawnProcess):
|
||||||
|
|
||||||
def __init__(self, logger, model_zoo):
|
def __init__(self, logger, model_zoo):
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
|
self.loglevel = logger.level
|
||||||
self.model_zoo = model_zoo
|
self.model_zoo = model_zoo
|
||||||
ctxt = mp.get_context('spawn')
|
ctxt = mp.get_context('spawn')
|
||||||
self.taskq = ctxt.Queue(maxsize=QSIZE)
|
self.taskq = ctxt.Queue(maxsize=QSIZE)
|
||||||
|
|
@ -147,6 +148,7 @@ class Predictor(mp.context.SpawnProcess):
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
logging.root.handlers = [logging.handlers.QueueHandler(self.logq)]
|
logging.root.handlers = [logging.handlers.QueueHandler(self.logq)]
|
||||||
|
self.logger.setLevel(self.loglevel)
|
||||||
self.model_zoo.load_models(*self.loadable)
|
self.model_zoo.load_models(*self.loadable)
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,6 @@ class EynollahXmlWriter:
|
||||||
return Path(Path(self.image_filename).name).stem
|
return Path(Path(self.image_filename).name).stem
|
||||||
|
|
||||||
def calculate_points(self, contour, offset=None):
|
def calculate_points(self, contour, offset=None):
|
||||||
self.logger.debug('enter calculate_points')
|
|
||||||
poly = contour2polygon(contour)
|
poly = contour2polygon(contour)
|
||||||
if offset is not None:
|
if offset is not None:
|
||||||
poly = affinity.translate(poly, *offset)
|
poly = affinity.translate(poly, *offset)
|
||||||
|
|
@ -53,7 +52,6 @@ class EynollahXmlWriter:
|
||||||
return points_from_polygon(poly.exterior.coords[:-1])
|
return points_from_polygon(poly.exterior.coords[:-1])
|
||||||
|
|
||||||
def serialize_lines_in_region(self, text_region, all_found_textline_polygons, region_idx, page_coord, all_box_coord, slopes, counter, ocr_all_textlines_textregion):
|
def serialize_lines_in_region(self, text_region, all_found_textline_polygons, region_idx, page_coord, all_box_coord, slopes, counter, ocr_all_textlines_textregion):
|
||||||
self.logger.debug('enter serialize_lines_in_region')
|
|
||||||
for j, polygon_textline in enumerate(all_found_textline_polygons[region_idx]):
|
for j, polygon_textline in enumerate(all_found_textline_polygons[region_idx]):
|
||||||
coords = CoordsType()
|
coords = CoordsType()
|
||||||
textline = TextLineType(id=counter.next_line_id, Coords=coords)
|
textline = TextLineType(id=counter.next_line_id, Coords=coords)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue