diff --git a/src/eynollah/eynollah.py b/src/eynollah/eynollah.py index 67ee525..a84da84 100644 --- a/src/eynollah/eynollah.py +++ b/src/eynollah/eynollah.py @@ -99,21 +99,16 @@ KERNEL = np.ones((5, 5), np.uint8) _instance = None -def _set_instance(instance, logq): +def _set_instance(instance): global _instance _instance = instance +def _run_single(*args, **kwargs): + logq = kwargs.pop('logq') # replace all inherited handlers with queue handler logging.root.handlers.clear() - instance.logger.handlers.clear() + _instance.logger.handlers.clear() handler = logging.handlers.QueueHandler(logq) - handler.setFormatter(logging.Formatter(fmt="[%(jobId)s] %(message)s")) logging.root.addHandler(handler) -def _run_single(*args, **kwargs): - # prefix log messages with img_filename - def log_filter(record: logging.LogRecord): - record.jobId = args[0] - return record - logging.root.handlers[0].filters = [log_filter] return _instance.run_single(*args, **kwargs) class Eynollah: @@ -2297,24 +2292,36 @@ class Eynollah: ls_imgs = [os.path.join(dir_in, image_filename) for image_filename in filter(is_image_filename, os.listdir(dir_in))] - logq = mp.get_context('fork').Queue() with ProcessPoolExecutor(max_workers=num_jobs or None, mp_context=mp.get_context('fork'), initializer=_set_instance, - initargs=(self, logq) + initargs=(self,) ) as exe: - logging.handlers.QueueListener(logq, *self.logger.handlers, - respect_handler_level=False).start() - jobs = {exe.submit(_run_single, img_filename, - dir_out=dir_out, - overwrite=overwrite): img_filename - for img_filename in ls_imgs} - for job in as_completed(jobs): - img_filename = jobs[job] + jobs = {} + mngr = mp.get_context('fork').Manager() + for img_filename in ls_imgs: + logq = mngr.Queue() + jobs[exe.submit(_run_single, img_filename, + dir_out=dir_out, + overwrite=overwrite, + logq=logq)] = img_filename, logq + for job in as_completed(list(jobs)): + img_filename, logq = jobs[job] + loglistener = logging.handlers.QueueListener( + logq, *self.logger.handlers, respect_handler_level=False) try: + loglistener.start() job.result() + jobs[job] = True except: self.logger.exception("Job %s failed", img_filename) + jobs[job] = False + finally: + loglistener.stop() + results = list(jobs.values()) + success = list(filter(None, results)) + # for img_filename, result in zip(ls_imgs, results) ... + self.logger.info("%d of %d jobs successful", len(success), len(results)) self.logger.info("All jobs done in %.1fs", time.time() - t0_tot) elif image_filename: try: