Skip to content
Snippets Groups Projects
Commit f942643d authored by Christoph Schmidt's avatar Christoph Schmidt
Browse files

Added option to register custom signals

parent 76c09fd6
No related branches found
No related tags found
No related merge requests found
import os
import time
import cmp
class ChildProcess3(cmp.CProcess):
def __init__(self, state_queue, cmd_queue, enable_internal_logging):
super().__init__(state_queue, cmd_queue, enable_internal_logging=enable_internal_logging)
self.logger = None
def postrun_init(self):
self.logger, self.logger_h = self.create_new_logger(f"{self.__class__.__name__}-({os.getpid()})")
@cmp.CProcess.register_for_signal()
def test_call(self, a):
self.logger.info(f"{os.getpid()} -> test_call!")
time.sleep(1)
return a
\ No newline at end of file
from PySide6.QtCore import Signal
import cmp
from ChildProcess3 import ChildProcess3
class ChildProcessControl3(cmp.CProcessControl):
mp_finished = Signal(int, name='mp_finished')
def __init__(self, parent, enable_internal_logging):
super().__init__(parent, enable_internal_logging=enable_internal_logging)
self.register_child_process(ChildProcess3(
self.state_queue,
self.cmd_queue,
enable_internal_logging=enable_internal_logging))
@cmp.CProcessControl.register_function(signal=mp_finished)
def test_call(self, a):
pass
#print(f"{os.getpid()} -> call_without_mp with {a}, {b}, {c}!")
# -*- coding: utf-8 -*-
"""
Author(s): Christoph Schmidt <christoph.schmidt@tugraz.at>
Created: 2023-10-19 12:35
Package Version:
"""
import signal
import sys
from multiprocessing import Process, Queue, Pipe
from threading import Thread
from PySide6.QtCore import QObject, Signal, SIGNAL
from PySide6.QtWidgets import QDialog, QApplication, QTextBrowser, QLineEdit, QVBoxLayout, QMainWindow, QMessageBox
sys.path.append('./src')
from ChildProcessControl3 import ChildProcessControl3
class Form(QDialog):
on_text_converted = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
child_con = ChildProcessControl3(self, enable_internal_logging=True)
child_con.mp_finished.connect(self.updateUI)
self.browser = QTextBrowser()
self.lineedit = QLineEdit('Type text and press <Enter>')
self.lineedit.selectAll()
layout = QVBoxLayout()
layout.addWidget(self.browser)
layout.addWidget(self.lineedit)
self.setLayout(layout)
self.lineedit.setFocus()
self.setWindowTitle('Upper')
# self.lineedit.returnPressed.connect(lambda: child_con.call_without_mp(1, 2, c=3))
self.lineedit.returnPressed.connect(lambda: child_con.test_call(1))
def updateUI(self, text):
print("updateUI: ", text)
self.browser.append("->" + str(text))
def closeEvent(self, arg__1):
self.destroyed.emit()
print("Form destroyed.")
if __name__ == '__main__':
try:
app = QApplication(sys.argv)
form = Form()
form.show()
app.exec()
except KeyboardInterrupt:
print("KeyboardInterrupt")
sys.exit(0)
......@@ -9,12 +9,13 @@ class CCommandRecord:
self.signal_name: str = None
def _register_signal(self, signal_name: str):
def register_signal(self, signal_name: str):
self.signal_name: str = signal_name
def execute(self, class_object: CProcess):
class_object._internal_logger.info(f"Executing {self} in {class_object.name}.\n")
getattr(class_object, self.func_name)(*self.args, **self.kwargs)
if hasattr(class_object, '_internal_logger'):
class_object._internal_logger.info(f"Executing {self} in {class_object.name}.")
getattr(class_object, self.func_name)(signal_name=self.signal_name, *self.args, **self.kwargs)
def __repr__(self):
args_str = ', '.join(map(repr, self.args))
......
......@@ -70,23 +70,13 @@ class CProcess(Process):
continue
if isinstance(cmd, cmp.CCommandRecord):
self._internal_logger.info(f"Received cmd: {cmd}")
# print(f"Received cmd with args: {cmd_with_args}")
# cmd = cmd_with_args['func']
# args = cmd_with_args['args']
# kwargs = cmd_with_args['kwargs']
# if 'sig_name' is not None:
# #sig_name = cmd_with_args['sig_name']
self._internal_logger.debug(
f"cmd: {cmd}, args: {cmd.args}, kwargs: {cmd.kwargs}, sig_name: {cmd.signal_name}")
f"cmd: {cmd}, args: {cmd.args}, kwargs: {cmd.kwargs}, Signal to emit: {cmd.signal_name}")
try:
cmd.execute(self)
except Exception as e:
traceback_str = ''.join(traceback.format_tb(e.__traceback__))
self._internal_logger.error(f"Exception '{e}' occurred in {cmd}!. Traceback:\n{traceback_str}")
# else:
# self._internal_logger.debug(f"cmd: {cmd}, args: {cmd.args}, kwargs: {cmd.kwargs}")
# self._internal_logger.info(f"Executing {cmd} in Process Class.\n")
# getattr(self, cmd.func_name)(*cmd.args, **cmd.kwargs)
self._internal_logger.error(f"Control Process exited. Terminating Process {os.getpid()}")
if self._kill_flag.value == 0:
self._internal_logger.error(f"Process {os.getpid()} received kill signal!")
......@@ -106,21 +96,21 @@ class CProcess(Process):
@staticmethod
def register_for_signal(postfix='_finished'):
_postfix = postfix.strip() if postfix is not None else None
def register(func):
def get_signature(self, *args, **kwargs):
res = func(self, *args, **kwargs)
# arguments = locals().copy()
# print(f"arguments: {arguments}")
# if 'sig_name' in arguments:
if 'sig_name' in kwargs and kwargs['sig_name'] is None:
sign = kwargs['sig_name']
if 'signal_name' in kwargs and kwargs['signal_name'] is not None:
sign = kwargs.pop('signal_name')
elif _postfix is not None:
sign = f"{func.__name__}{_postfix}"
self._internal_logger.debug(f"Constructing signal name for function '{func.__name__}': {sign}")
else:
sign = f"{func.__name__}{postfix}"
raise ValueError(f"Cannot register function '{func.__name__}' for signal. No signal name provided!")
res = func(self, *args, **kwargs)
self._internal_logger.debug(f"{func.__name__} finished. Emitting signal {sign} in control class.")
result = cmp.CResultRecord(sign, res)
self.state_queue.put(result)
return res
return get_signature
return register
import logging
import logging.handlers
import os
import re
import signal
import time
from multiprocessing import Queue, Process, Value
from PySide6.QtCore import QObject, QThreadPool
from PySide6.QtCore import QObject, QThreadPool, Signal
from PySide6.QtGui import QWindow
from PySide6.QtWidgets import QWidget
from rich.logging import RichHandler
......@@ -31,7 +32,6 @@ class CProcessControl(QObject):
else:
self.register_signal_class(self)
# The child process
self._child: Process = None
# Queues for data exchange
......@@ -45,7 +45,8 @@ class CProcessControl(QObject):
self._child_process_pid = None
self._child_kill_flag = Value('i', 1)
self._internal_logger, self._il_handler = self.create_new_logger(f"{self.__class__.__name__}-Int({os.getpid()})")
self._internal_logger, self._il_handler = self.create_new_logger(
f"{self.__class__.__name__}-Int({os.getpid()})")
self.logger, self.logger_handler = self.create_new_logger(f"{self.__class__.__name__}({os.getpid()})")
self.enable_internal_logging(enable_internal_logging)
......@@ -109,25 +110,51 @@ class CProcessControl(QObject):
self.cmd_queue.close()
@staticmethod
def register_function(signal_name: str = None):
def register_function(signal: Signal = None):
"""
Decorator for registering functions in the command queue.
This automatically puts the command into the queue and executes the function.
If a signal_name is specified, the given Signal name will be emitted after the function has been executed.
:param signal:
:return:
"""
def register(func):
def match_signal_name(_signal: Signal):
pattern = re.compile(r'(\w+)\(([^)]*)\)')
match = pattern.match(str(_signal))
name = match.group(1).strip()
args = match.group(2).split(',')
return name, args
def get_signature(self: CProcessControl, *args, **kwargs):
arguments = locals().copy()
arguments.pop("func")
name = getattr(func, '__name__', 'unknown')
args = arguments.pop("args")
kwargs = arguments.pop("kwargs")
name = getattr(func, '__name__', 'unknown')
cmd = cmp.CCommandRecord(name, *args, **kwargs)
if signal is not None:
sig_name, args = match_signal_name(signal)
cmd.register_signal(sig_name)
else:
func(self, *args, **kwargs)
try:
self._internal_logger.debug(f"New function registered: {cmd} -> "
f"{cmd.signal_name if cmd.signal_name is not None else 'None'}("
f"{', '.join(a for a in args) if args is not None else 'None'})")
self.cmd_queue.put(cmd)
except Exception as e:
self._internal_logger.error(f"Error while putting {cmd} into cmd_queue: {e}")
raise e
return get_signature
return register
def safe_exit(self, reason: str = ""):
self._internal_logger.warning(f"Shutting down ProcessControl {os.getpid()}. Reason: {reason}")
self._child_kill_flag.value = 0
......
import os
import time
from PySide6.QtCore import Signal
import cmp
import unittest
class ChildProcessCustomSignals(cmp.CProcess):
def __init__(self, state_queue, cmd_queue, enable_internal_logging):
super().__init__(state_queue, cmd_queue, enable_internal_logging=enable_internal_logging)
self.logger = None
def postrun_init(self):
self.logger, self.logger_h = self.create_new_logger(f"{self.__class__.__name__}-({os.getpid()})")
@cmp.CProcess.register_for_signal()
def call_without_mp(self, a, b, c=None, **kwargs):
self.logger.info(f"{os.getpid()} -> call_without_mp with {a}, {b}, {c} and {kwargs}!")
time.sleep(1)
return c
class ChildProcessControlCustomSignals(cmp.CProcessControl):
testSignals = Signal(int)
# call_without_mp2_changed = Signal(int, int, int)
def __init__(self, parent, signal_class, enable_internal_logging):
super().__init__(parent, signal_class, enable_internal_logging=enable_internal_logging)
self.register_child_process(ChildProcessCustomSignals(self.state_queue, self.cmd_queue,
enable_internal_logging=enable_internal_logging))
@cmp.CProcessControl.register_function()
def call_without_mp(self, a, b, c=None):
pass
class TestInterchangeCommands(unittest.TestCase):
def test_custom_signals(self):
process_control = ChildProcessControlCustomSignals()
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment