predictor: fix termination for pytests…

- rename `terminate` → `stopped`
- call `terminate()` from superclass during shutdown
- del `self.model_zoo` in the parent process after spawn,
  and in the child during shutdown
This commit is contained in:
Robert Sachunsky 2026-03-11 02:34:29 +01:00
parent bb468bf68f
commit cf5caa1eca

View file

@ -46,7 +46,7 @@ class Predictor(mp.context.SpawnProcess):
log_listener = logging.handlers.QueueListener( log_listener = logging.handlers.QueueListener(
self.logq, *self.logger.handlers, self.logq, *self.logger.handlers,
respect_handler_level=True).start() respect_handler_level=True).start()
self.terminate = ctxt.Event() self.stopped = ctxt.Event()
ctxt = mp.get_context('fork') # ocrd.Processor will fork workers ctxt = mp.get_context('fork') # ocrd.Processor will fork workers
self.results = ctxt.Manager().dict() self.results = ctxt.Manager().dict()
self.closable = ctxt.Manager().list() self.closable = ctxt.Manager().list()
@ -69,7 +69,7 @@ class Predictor(mp.context.SpawnProcess):
return self.result(jobid) return self.result(jobid)
def result(self, jobid): def result(self, jobid):
while not self.terminate.is_set(): while not self.stopped.is_set():
if jobid in self.results: if jobid in self.results:
#self.logger.debug("received result for '%d'", jobid) #self.logger.debug("received result for '%d'", jobid)
result = self.results.pop(jobid) result = self.results.pop(jobid)
@ -93,9 +93,9 @@ class Predictor(mp.context.SpawnProcess):
self.setup() # fill model_zoo etc self.setup() # fill model_zoo etc
except Exception as e: except Exception as e:
self.logger.exception("setup failed") self.logger.exception("setup failed")
self.terminate.set() self.stopped.set()
closing = {} closing = {}
while not self.terminate.is_set(): while not self.stopped.is_set():
for jobid in list(self.closable): for jobid in list(self.closable):
self.closable.remove(jobid) self.closable.remove(jobid)
closing.pop(jobid).close() closing.pop(jobid).close()
@ -124,13 +124,13 @@ class Predictor(mp.context.SpawnProcess):
result = e result = e
self.resultq.put((jobid, result)) self.resultq.put((jobid, result))
#self.logger.debug("sent result for '%d'", jobid) #self.logger.debug("sent result for '%d'", jobid)
self.resultq.close()
self.resultq.cancel_join_thread()
#self.logger.debug("predictor terminated") #self.logger.debug("predictor terminated")
def load_models(self, *loadable: List[str]): def load_models(self, *loadable: List[str]):
self.loadable = loadable self.loadable = loadable
self.start() self.start()
# parent context here
del self.model_zoo
def setup(self): def setup(self):
logging.root.handlers = [logging.handlers.QueueHandler(self.logq)] logging.root.handlers = [logging.handlers.QueueHandler(self.logq)]
@ -140,14 +140,15 @@ class Predictor(mp.context.SpawnProcess):
def shutdown(self): def shutdown(self):
# do not terminate from forked processor instances # do not terminate from forked processor instances
if mp.parent_process() is None: if mp.parent_process() is None:
self.terminate.set() self.stopped.set()
self.terminate()
self.logq.close()
self.taskq.close() self.taskq.close()
self.taskq.cancel_join_thread() self.taskq.cancel_join_thread()
#self.logger.debug(f"terminated {self} in {mp.current_process().name}") self.resultq.close()
self.resultq.cancel_join_thread()
else: else:
pass self.model_zoo.shutdown()
#self.logger.debug(f"not touching {self} in {mp.current_process().name}")
self.model_zoo.shutdown()
def __del__(self): def __del__(self):
#self.logger.debug(f"deinit of {self} in {mp.current_process().name}") #self.logger.debug(f"deinit of {self} in {mp.current_process().name}")