From a8556f52108b5e96fcfc4eb0c8f81df68f49be36 Mon Sep 17 00:00:00 2001 From: Robert Sachunsky Date: Mon, 30 Mar 2026 13:18:40 +0200 Subject: [PATCH] =?UTF-8?q?run:=20sort=20parallel=20log=20messages=20by=20?= =?UTF-8?q?file=20name=20instead=20of=20prefixing=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (as follow-up to ec08004f:) - create log queues and QueueListener separately for each job - receive job logs sequentially - drop log filter mechanism (prefixing log messages by file name) - also count ratio of successful jobs --- src/eynollah/eynollah.py | 45 +++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/src/eynollah/eynollah.py b/src/eynollah/eynollah.py index 67ee525..a84da84 100644 --- a/src/eynollah/eynollah.py +++ b/src/eynollah/eynollah.py @@ -99,21 +99,16 @@ KERNEL = np.ones((5, 5), np.uint8) _instance = None -def _set_instance(instance, logq): +def _set_instance(instance): global _instance _instance = instance +def _run_single(*args, **kwargs): + logq = kwargs.pop('logq') # replace all inherited handlers with queue handler logging.root.handlers.clear() - instance.logger.handlers.clear() + _instance.logger.handlers.clear() handler = logging.handlers.QueueHandler(logq) - handler.setFormatter(logging.Formatter(fmt="[%(jobId)s] %(message)s")) logging.root.addHandler(handler) -def _run_single(*args, **kwargs): - # prefix log messages with img_filename - def log_filter(record: logging.LogRecord): - record.jobId = args[0] - return record - logging.root.handlers[0].filters = [log_filter] return _instance.run_single(*args, **kwargs) class Eynollah: @@ -2297,24 +2292,36 @@ class Eynollah: ls_imgs = [os.path.join(dir_in, image_filename) for image_filename in filter(is_image_filename, os.listdir(dir_in))] - logq = mp.get_context('fork').Queue() with ProcessPoolExecutor(max_workers=num_jobs or None, mp_context=mp.get_context('fork'), initializer=_set_instance, - initargs=(self, logq) + initargs=(self,) ) as exe: - logging.handlers.QueueListener(logq, *self.logger.handlers, - respect_handler_level=False).start() - jobs = {exe.submit(_run_single, img_filename, - dir_out=dir_out, - overwrite=overwrite): img_filename - for img_filename in ls_imgs} - for job in as_completed(jobs): - img_filename = jobs[job] + jobs = {} + mngr = mp.get_context('fork').Manager() + for img_filename in ls_imgs: + logq = mngr.Queue() + jobs[exe.submit(_run_single, img_filename, + dir_out=dir_out, + overwrite=overwrite, + logq=logq)] = img_filename, logq + for job in as_completed(list(jobs)): + img_filename, logq = jobs[job] + loglistener = logging.handlers.QueueListener( + logq, *self.logger.handlers, respect_handler_level=False) try: + loglistener.start() job.result() + jobs[job] = True except: self.logger.exception("Job %s failed", img_filename) + jobs[job] = False + finally: + loglistener.stop() + results = list(jobs.values()) + success = list(filter(None, results)) + # for img_filename, result in zip(ls_imgs, results) ... + self.logger.info("%d of %d jobs successful", len(success), len(results)) self.logger.info("All jobs done in %.1fs", time.time() - t0_tot) elif image_filename: try: