diff --git a/src/eynollah/predictor.py b/src/eynollah/predictor.py index b2a0dcf..1b0e970 100644 --- a/src/eynollah/predictor.py +++ b/src/eynollah/predictor.py @@ -46,7 +46,7 @@ class Predictor(mp.context.SpawnProcess): log_listener = logging.handlers.QueueListener( self.logq, *self.logger.handlers, respect_handler_level=True).start() - self.terminate = ctxt.Event() + self.stopped = ctxt.Event() ctxt = mp.get_context('fork') # ocrd.Processor will fork workers self.results = ctxt.Manager().dict() self.closable = ctxt.Manager().list() @@ -69,7 +69,7 @@ class Predictor(mp.context.SpawnProcess): return self.result(jobid) def result(self, jobid): - while not self.terminate.is_set(): + while not self.stopped.is_set(): if jobid in self.results: #self.logger.debug("received result for '%d'", jobid) result = self.results.pop(jobid) @@ -93,9 +93,9 @@ class Predictor(mp.context.SpawnProcess): self.setup() # fill model_zoo etc except Exception as e: self.logger.exception("setup failed") - self.terminate.set() + self.stopped.set() closing = {} - while not self.terminate.is_set(): + while not self.stopped.is_set(): for jobid in list(self.closable): self.closable.remove(jobid) closing.pop(jobid).close() @@ -124,13 +124,13 @@ class Predictor(mp.context.SpawnProcess): result = e self.resultq.put((jobid, result)) #self.logger.debug("sent result for '%d'", jobid) - self.resultq.close() - self.resultq.cancel_join_thread() #self.logger.debug("predictor terminated") def load_models(self, *loadable: List[str]): self.loadable = loadable self.start() + # parent context here + del self.model_zoo def setup(self): logging.root.handlers = [logging.handlers.QueueHandler(self.logq)] @@ -140,14 +140,15 @@ class Predictor(mp.context.SpawnProcess): def shutdown(self): # do not terminate from forked processor instances if mp.parent_process() is None: - self.terminate.set() + self.stopped.set() + self.terminate() + self.logq.close() self.taskq.close() self.taskq.cancel_join_thread() - #self.logger.debug(f"terminated {self} in {mp.current_process().name}") + self.resultq.close() + self.resultq.cancel_join_thread() else: - pass - #self.logger.debug(f"not touching {self} in {mp.current_process().name}") - self.model_zoo.shutdown() + self.model_zoo.shutdown() def __del__(self): #self.logger.debug(f"deinit of {self} in {mp.current_process().name}")