mirror of
https://github.com/qurator-spk/eynollah.git
synced 2026-04-14 19:31:57 +02:00
run: sort parallel log messages by file name instead of prefixing…
(as follow-up to ec08004f:)
- create log queues and QueueListener separately for each job
- receive job logs sequentially
- drop log filter mechanism (prefixing log messages by file name)
- also count ratio of successful jobs
This commit is contained in:
parent
1756443605
commit
a8556f5210
1 changed files with 26 additions and 19 deletions
|
|
@ -99,21 +99,16 @@ KERNEL = np.ones((5, 5), np.uint8)
|
|||
|
||||
|
||||
_instance = None
|
||||
def _set_instance(instance, logq):
|
||||
def _set_instance(instance):
|
||||
global _instance
|
||||
_instance = instance
|
||||
def _run_single(*args, **kwargs):
|
||||
logq = kwargs.pop('logq')
|
||||
# replace all inherited handlers with queue handler
|
||||
logging.root.handlers.clear()
|
||||
instance.logger.handlers.clear()
|
||||
_instance.logger.handlers.clear()
|
||||
handler = logging.handlers.QueueHandler(logq)
|
||||
handler.setFormatter(logging.Formatter(fmt="[%(jobId)s] %(message)s"))
|
||||
logging.root.addHandler(handler)
|
||||
def _run_single(*args, **kwargs):
|
||||
# prefix log messages with img_filename
|
||||
def log_filter(record: logging.LogRecord):
|
||||
record.jobId = args[0]
|
||||
return record
|
||||
logging.root.handlers[0].filters = [log_filter]
|
||||
return _instance.run_single(*args, **kwargs)
|
||||
|
||||
class Eynollah:
|
||||
|
|
@ -2297,24 +2292,36 @@ class Eynollah:
|
|||
ls_imgs = [os.path.join(dir_in, image_filename)
|
||||
for image_filename in filter(is_image_filename,
|
||||
os.listdir(dir_in))]
|
||||
logq = mp.get_context('fork').Queue()
|
||||
with ProcessPoolExecutor(max_workers=num_jobs or None,
|
||||
mp_context=mp.get_context('fork'),
|
||||
initializer=_set_instance,
|
||||
initargs=(self, logq)
|
||||
initargs=(self,)
|
||||
) as exe:
|
||||
logging.handlers.QueueListener(logq, *self.logger.handlers,
|
||||
respect_handler_level=False).start()
|
||||
jobs = {exe.submit(_run_single, img_filename,
|
||||
dir_out=dir_out,
|
||||
overwrite=overwrite): img_filename
|
||||
for img_filename in ls_imgs}
|
||||
for job in as_completed(jobs):
|
||||
img_filename = jobs[job]
|
||||
jobs = {}
|
||||
mngr = mp.get_context('fork').Manager()
|
||||
for img_filename in ls_imgs:
|
||||
logq = mngr.Queue()
|
||||
jobs[exe.submit(_run_single, img_filename,
|
||||
dir_out=dir_out,
|
||||
overwrite=overwrite,
|
||||
logq=logq)] = img_filename, logq
|
||||
for job in as_completed(list(jobs)):
|
||||
img_filename, logq = jobs[job]
|
||||
loglistener = logging.handlers.QueueListener(
|
||||
logq, *self.logger.handlers, respect_handler_level=False)
|
||||
try:
|
||||
loglistener.start()
|
||||
job.result()
|
||||
jobs[job] = True
|
||||
except:
|
||||
self.logger.exception("Job %s failed", img_filename)
|
||||
jobs[job] = False
|
||||
finally:
|
||||
loglistener.stop()
|
||||
results = list(jobs.values())
|
||||
success = list(filter(None, results))
|
||||
# for img_filename, result in zip(ls_imgs, results) ...
|
||||
self.logger.info("%d of %d jobs successful", len(success), len(results))
|
||||
self.logger.info("All jobs done in %.1fs", time.time() - t0_tot)
|
||||
elif image_filename:
|
||||
try:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue