Skip to content
Snippets Groups Projects
Commit 3388be6f authored by Christoph Schmidt's avatar Christoph Schmidt
Browse files

Initial Commit

parent ab616dea
Branches
Tags
No related merge requests found
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
#======================================
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
\ No newline at end of file
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
LICENSE 0 → 100644
This diff is collapsed.
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
\ No newline at end of file
# cmp # PySide Multiprocessing
...@@ -15,14 +15,14 @@ Already a pro? Just edit this README.md and make it your own. Want to make it ea ...@@ -15,14 +15,14 @@ Already a pro? Just edit this README.md and make it your own. Want to make it ea
``` ```
cd existing_repo cd existing_repo
git remote add origin https://gitlab.tugraz.at/flexsensor-public/modules/cmp.git git remote add origin https://gitlab.tugraz.at/flexsensor/pyside-multiprocessing.git
git branch -M main git branch -M main
git push -uf origin main git push -uf origin main
``` ```
## Integrate with your tools ## Integrate with your tools
- [ ] [Set up project integrations](https://gitlab.tugraz.at/flexsensor-public/modules/cmp/-/settings/integrations) - [ ] [Set up project integrations](https://gitlab.tugraz.at/flexsensor/pyside-multiprocessing/-/settings/integrations)
## Collaborate with your team ## Collaborate with your team
......
# -*- coding: utf-8 -*-
"""
Author(s): Christoph Schmidt <christoph.schmidt@tugraz.at>
Created: 2023-10-19 12:35
Package Version:
"""
import signal
import sys
from multiprocessing import Process, Queue, Pipe
from threading import Thread
from PySide6.QtCore import QObject, Signal, SIGNAL
from PySide6.QtWidgets import QDialog, QApplication, QTextBrowser, QLineEdit, QVBoxLayout, QMainWindow, QMessageBox
from mp_process import ChildProc, ChildControl
class Form(QDialog):
on_text_converted = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
child_con = ChildControl(self, enable_internal_logging=True)
child_con.call_without_mp_finished.connect(self.updateUI)
child_con.call_without_mp2_changed.connect(self.updateUI2)
self.browser = QTextBrowser()
self.lineedit = QLineEdit('Type text and press <Enter>')
self.lineedit.selectAll()
layout = QVBoxLayout()
layout.addWidget(self.browser)
layout.addWidget(self.lineedit)
self.setLayout(layout)
self.lineedit.setFocus()
self.setWindowTitle('Upper')
#self.lineedit.returnPressed.connect(lambda: child_con.call_without_mp(1, 2, c=3))
self.lineedit.returnPressed.connect(lambda: child_con.call_all())
#self.emitter.register(self.on_text_converted, self.updateUI)
def test(self):
#Signal(str)
self.data_to_child.put(self.to_child.__name__)
#self.lineedit.clear()
def updateUI(self, text):
print("updateUI: ", text)
self.browser.append(str(text))
def updateUI2(self, text, text2, text3):
print("updateUI2: ", text)
self.browser.append("->" + str(text) + "+" + str(text2) + "=" + str(text3))
def closeEvent(self, event):
print("closeEvent")
#try:
self.destroyed.emit()
#except KeyboardInterrupt:
# print("KeyboardInterrupt")
#event.ignore()
if __name__ == '__main__':
try:
app = QApplication(sys.argv)
form = Form()
form.show()
app.exec()
except KeyboardInterrupt:
print("KeyboardInterrupt")
sys.exit(0)
# -*- coding: utf-8 -*-
"""
Author(s): Christoph Schmidt <christoph.schmidt@tugraz.at>
Created: 2023-10-19 12:35
Package Version:
"""
import os
import sys
import time
from PySide6.QtCore import Signal
sys.path.append('./src')
import cmp
class Sceleton:
def call_without_mp(self, a, b, c=None, **kwargs):
raise NotImplementedError()
class ChildProc(cmp.CProcess, Sceleton):
def __init__(self, state_queue, cmd_queue, enable_interal_logging):
super().__init__(state_queue, cmd_queue, enable_interal_logging=enable_interal_logging)
@cmp.CProcess.register_for_signal()
def call_without_mp(self, a, b, c=None, **kwargs):
print(f"{os.getpid()} -> call_without_mp with {a}, {b}, {c} and {kwargs}!")
time.sleep(1)
return c
@cmp.CProcess.register_for_signal('_changed')
def call_without_mp2(self, a, b, c=None, **kwargs):
print(f"{os.getpid()} -> call_without_mp2 with {a}, {b}, {c} and {kwargs}!")
time.sleep(1)
return b, c, b+c
#@CProccess.register_function
def call_all(self, *args, **kwargs):
self.call_without_mp(1, 2, c=3)
self.call_without_mp2(4, 7, c=5)
class ChildControl(cmp.CProcessControl, Sceleton):
call_without_mp_finished = Signal(int)
call_without_mp2_changed = Signal(int, int, int)
def __init__(self, parent, enable_internal_logging):
super().__init__(parent, enable_internal_logging=enable_internal_logging)
self.register_child_process(ChildProc(self.state_queue, self.cmd_queue, enable_interal_logging=enable_internal_logging))
@cmp.CProcessControl.register_function()
def call_without_mp(self, a, b, c=None):
pass
#print(f"{os.getpid()} -> call_without_mp with {a}, {b}, {c}!")
@cmp.CProcessControl.register_function()
def call_without_mp2(self, a, b, c=None, **kwargs):
pass
@cmp.CProcessControl.register_function()
def call_all(self):
pass
#print(f"{os.getpid()} -> Executing call_all in Control Class.")
import os
import time
import cmp
class ChildProcess2(cmp.CProcess):
def __init__(self, state_queue, cmd_queue, enable_interal_logging):
super().__init__(state_queue, cmd_queue, enable_interal_logging=enable_interal_logging)
self.logger = None
def postrun_init(self):
self.logger, self.logger_h = self.create_new_logger(f"{self.__class__.__name__}-({os.getpid()})")
@cmp.CProcess.register_for_signal()
def call_without_mp(self, a, b, c=None, **kwargs):
self.logger.info(f"{os.getpid()} -> call_without_mp with {a}, {b}, {c} and {kwargs}!")
time.sleep(1)
return c
@cmp.CProcess.register_for_signal('_changed')
def call_without_mp2(self, a, b, c=None, **kwargs):
print(f"{os.getpid()} -> call_without_mp2 with {a}, {b}, {c} and {kwargs}!")
time.sleep(1)
return b, c, b+c
#@CProccess.register_function
def call_all(self, *args, **kwargs):
self.call_without_mp(1, 2, c=3)
self.call_without_mp2(4, 7, c=5)
from PySide6.QtCore import Signal
import cmp
from ChildProcess2 import ChildProcess2
class ChildProcessControl2(cmp.CProcessControl):
call_without_mp_finished = Signal(int)
#call_without_mp2_changed = Signal(int, int, int)
def __init__(self, parent, signal_class, enable_internal_logging):
super().__init__(parent, signal_class, enable_internal_logging=enable_internal_logging)
self.register_child_process(ChildProcess2(self.state_queue, self.cmd_queue, enable_interal_logging=enable_internal_logging))
@cmp.CProcessControl.register_function()
def call_without_mp(self, a, b, c=None):
pass
#print(f"{os.getpid()} -> call_without_mp with {a}, {b}, {c}!")
@cmp.CProcessControl.register_function()
def call_without_mp2(self, a, b, c=None, **kwargs):
pass
@cmp.CProcessControl.register_function()
def call_all(self):
pass
#print(f"{os.getpid()} -> Executing call_all in Control Class.")
# -*- coding: utf-8 -*-
"""
Author(s): Christoph Schmidt <christoph.schmidt@tugraz.at>
Created: 2023-10-19 12:35
Package Version:
"""
import signal
import sys
from multiprocessing import Process, Queue, Pipe
from threading import Thread
from PySide6.QtCore import QObject, Signal, SIGNAL
from PySide6.QtWidgets import QDialog, QApplication, QTextBrowser, QLineEdit, QVBoxLayout, QMainWindow, QMessageBox
sys.path.append('./src')
from ChildProcessControl2 import ChildProcessControl2
class MySignals(QObject):
def __init__(self, parent=None):
super().__init__(parent)
call_without_mp2_changed = Signal(int, int, int)
class Form(QDialog):
on_text_converted = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
mysignals = MySignals()
child_con = ChildProcessControl2(self, mysignals, enable_internal_logging=False)
mysignals.call_without_mp2_changed.connect(self.updateUI)
self.browser = QTextBrowser()
self.lineedit = QLineEdit('Type text and press <Enter>')
self.lineedit.selectAll()
layout = QVBoxLayout()
layout.addWidget(self.browser)
layout.addWidget(self.lineedit)
self.setLayout(layout)
self.lineedit.setFocus()
self.setWindowTitle('Upper')
# self.lineedit.returnPressed.connect(lambda: child_con.call_without_mp(1, 2, c=3))
self.lineedit.returnPressed.connect(lambda: child_con.call_all())
def updateUI(self, text, text2, text3):
print("updateUI: ", text)
self.browser.append("->" + str(text) + "+" + str(text2) + "=" + str(text3))
if __name__ == '__main__':
try:
app = QApplication(sys.argv)
form = Form()
form.show()
app.exec()
except KeyboardInterrupt:
print("KeyboardInterrupt")
sys.exit(0)
[project]
name = "cmp"
version = "0.0.1"
authors = [
{ name="Christoph Schmidt", email="cschmidt.fs@gmail.com" },
]
description = "A Pyside-to-Process Controll class"
readme = "README.md"
requires-python = ">=3.7"
dependencies = [
'pyside6', 'pyyaml'
]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3.0",
"Operating System :: OS Independent",
]
[project.urls]
"Homepage" = "https://gitlab.tugraz.at/flexsensor-public/---"
\ No newline at end of file
from cmp import CProcess as CProcess
class CCommandRecord:
def __init__(self, func_name: str, *args: (), **kwargs: {}):
self.func_name: str = func_name
self.args: () = args
self.kwargs: {} = kwargs
self.signal_name: str = None
def _register_signal(self, signal_name: str):
self.signal_name: str = signal_name
def execute(self, class_object: CProcess):
class_object._internal_logger.info(f"Executing {self} in {class_object.name}.\n")
getattr(class_object, self.func_name)(*self.args, **self.kwargs)
def __repr__(self):
args_str = ', '.join(map(repr, self.args))
kwargs_str = ', '.join(f"{key}={repr(value)}" for key, value in self.kwargs.items())
all_args = ', '.join(filter(None, [args_str, kwargs_str]))
return f"Function {self.func_name}({all_args})"
\ No newline at end of file
import logging.handlers
import logging
import os
import time
import traceback
from multiprocessing import Process, Queue, Value
import cmp
class CProcess(Process):
def __init__(self, state_queue: Queue, cmd_queue: Queue, enable_internal_logging=True):
Process.__init__(self)
self._enable_internal_logging = enable_internal_logging
self.logger = None
self.logger_handler = None
self._internal_logger = None
self._il_handler = None
self.cmd_queue = cmd_queue
self.state_queue = state_queue
self._kill_flag = Value('i', 1)
def register_kill_flag(self, kill_flag: Value):
self._kill_flag = kill_flag
def create_new_logger(self, name: str) -> (logging.Logger, logging.Handler):
_handler = logging.handlers.QueueHandler(self.state_queue)
_logger = logging.getLogger(name)
_logger.handlers.clear()
_logger.handlers = [_handler]
_logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(message)s')
_handler.setFormatter(formatter)
return _logger, _handler
def enable_internal_logging(self, enable: bool):
self._enable_internal_logging = enable
if self._internal_logger is not None and enable is False:
self._internal_logger.disabled = True
#self._internal_logger.handlers = []
elif self._internal_logger is not None and enable is True:
self._internal_logger.disabled = False
#self._internal_logger.handlers = [self._il_handler]
def postrun_init(self):
"""
Dummy function for initializing e.g. loggers (some handlers are not pickable)
or pther non-pickable objects
"""
pass
def run(self):
self.name = f"{self.name}/{os.getpid()}"
self.postrun_init()
self._internal_logger, self._il_handler = self.create_new_logger(f"{self.__class__.__name__}-Int({os.getpid()})")
self.logger, self.logger_handler = self.create_new_logger(f"{self.__class__.__name__}({os.getpid()})")
self.enable_internal_logging(self._enable_internal_logging)
try:
while self._kill_flag.value:
try:
cmd = self.cmd_queue.get(block=True, timeout=1)
except:
continue
if isinstance(cmd, cmp.CCommandRecord):
self._internal_logger.info(f"Received cmd: {cmd}")
# print(f"Received cmd with args: {cmd_with_args}")
# cmd = cmd_with_args['func']
# args = cmd_with_args['args']
# kwargs = cmd_with_args['kwargs']
# if 'sig_name' is not None:
# #sig_name = cmd_with_args['sig_name']
self._internal_logger.debug(
f"cmd: {cmd}, args: {cmd.args}, kwargs: {cmd.kwargs}, sig_name: {cmd.signal_name}")
try:
cmd.execute(self)
except Exception as e:
traceback_str = ''.join(traceback.format_tb(e.__traceback__))
self._internal_logger.error(f"Exception '{e}' occurred in {cmd}!. Traceback:\n{traceback_str}")
# else:
# self._internal_logger.debug(f"cmd: {cmd}, args: {cmd.args}, kwargs: {cmd.kwargs}")
# self._internal_logger.info(f"Executing {cmd} in Process Class.\n")
# getattr(self, cmd.func_name)(*cmd.args, **cmd.kwargs)
self._internal_logger.error(f"Control Process exited. Terminating Process {os.getpid()}")
except KeyboardInterrupt:
self._internal_logger.warning(f"Received KeyboardInterrupt! Exiting Process {os.getpid()}")
time.sleep(1)
self.close()
except Exception as e:
self._internal_logger.warning(f"Received Exception {e}! Exiting Process {os.getpid()}")
def __del__(self):
# if self._internal_logger is not None:
# self._internal_logger.warning(f"Exiting Process")
self.cmd_queue.close()
self.state_queue.close()
@staticmethod
def register_for_signal(postfix='_finished'):
def register(func):
def get_signature(self, *args, **kwargs):
res = func(self, *args, **kwargs)
# arguments = locals().copy()
# print(f"arguments: {arguments}")
# if 'sig_name' in arguments:
if 'sig_name' in kwargs and kwargs['sig_name'] is None:
sign = kwargs['sig_name']
else:
sign = f"{func.__name__}{postfix}"
result = cmp.CResultRecord(sign, res)
self.state_queue.put(result)
return res
return get_signature
return register
import logging
import logging.handlers
import os
import signal
import time
from multiprocessing import Queue, Process, Value
from PySide6.QtCore import QObject, QThreadPool
from PySide6.QtGui import QWindow
from PySide6.QtWidgets import QWidget
from rich.logging import RichHandler
import cmp
class CProcessControl(QObject):
def __init__(self, kill_child_process_flag: Value,
parent: QObject = None,
signal_class: QObject = None,
enable_internal_logging: bool = False):
super().__init__(parent)
self._kill_child_process_flag = kill_child_process_flag
self._enable_internal_logging = enable_internal_logging
if parent is not None:
parent.destroyed.connect(self.safe_exit)
# Register this class as signal class (all signals will be implemented and emitted from this class)
if signal_class is not None:
self.register_signal_class(signal_class)
else:
self.register_signal_class(self)
# The child process
self._child: Process = None
# Queues for data exchange
self.cmd_queue = Queue()
self.state_queue = Queue()
# Thread manager for monitoring the state queue
self.thread_manager = QThreadPool()
# The child process pid
self._child_process_pid = None
self._child_kill_flag = Value('i', 1)
self._internal_logger, self._il_handler = self.create_new_logger(f"{self.__class__.__name__}-Int({os.getpid()})")
self.logger, self.logger_handler = self.create_new_logger(f"{self.__class__.__name__}({os.getpid()})")
self.enable_internal_logging(enable_internal_logging)
def create_new_logger(self, name: str) -> (logging.Logger, logging.Handler):
qh = RichHandler(rich_tracebacks=True)
_internal_logger = logging.getLogger(name)
_internal_logger.handlers = [qh]
_internal_logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(name)s %(message)s')
qh.setFormatter(formatter)
return _internal_logger, qh
def enable_internal_logging(self, enable: bool):
self._enable_internal_logging = enable
if self._internal_logger is not None:
self._internal_logger.disabled = not enable
def register_signal_class(self, signal_class: QObject):
self._signal_class = signal_class
@property
def child_process_pid(self):
return self._child_process_pid
def register_child_process(self, child: cmp.CProcess):
self._child = child
self._child.register_kill_flag(self._child_kill_flag)
self._child_process_pid = child.pid
self._child.enable_internal_logging(self._enable_internal_logging)
self._child.start()
self.thread_manager.start(self._monitor_result_state)
def _monitor_result_state(self):
self._internal_logger.info("Starting monitor thread.")
try:
while self._child.is_alive():
try:
res = self.state_queue.get(block=True, timeout=1)
except:
continue
if isinstance(res, logging.LogRecord):
try:
self.logger.handle(res)
except Exception as e:
self.logger.warning(f"Error cannot handle log record: {e}")
elif isinstance(res, cmp.CResultRecord):
try:
res.emit_signal(self._signal_class)
self._internal_logger.debug(f"Emitted {res} in {self._signal_class.__class__.__name__}.")
except Exception as e:
self._internal_logger.error(f"Error while emitting {res} in {self.__class__.__name__}: {e}")
except:
print(f"Error in monitor thread")
time.sleep(1)
self._internal_logger.info("Ended monitor thread.")
self.state_queue.close()
self.cmd_queue.close()
@staticmethod
def register_function(signal_name: str = None):
def register(func):
def get_signature(self: CProcessControl, *args, **kwargs):
arguments = locals().copy()
arguments.pop("func")
args = arguments.pop("args")
kwargs = arguments.pop("kwargs")
name = getattr(func, '__name__', 'unknown')
cmd = cmp.CCommandRecord(name, *args, **kwargs)
func(self, *args, **kwargs)
self.cmd_queue.put(cmd)
return get_signature
return register
def safe_exit(self):
self._internal_logger.warning(f"Shutting down ProcessControl {os.getpid()}")
self._child_kill_flag.value = 0
def __del__(self):
self._internal_logger.warning(f"Closing ProcessControl {self.__class__.__name__} with pid {os.getpid()}")
from inspect import Signature, Parameter
from cmp import CProcessControl as CProcessControl
class CResultRecord:
def __init__(self, signal_name: str, result):
self.signal_name: str = signal_name
self.result = result
def emit_signal(self, class_object: CProcessControl):
if hasattr(class_object, '_internal_logger'):
class_object._internal_logger.info(f"Emitting {self} in {class_object.__class__.__name__}.")
emitter = getattr(class_object, self.signal_name).emit
if isinstance(self.result, tuple):
emitter(*self.result)
elif self.result is None:
emitter()
else:
emitter(self.result)
def __repr__(self):
if isinstance(self.result, tuple):
args_str = ', '.join(map(repr, self.result))
else:
args_str = repr(self.result)
# shorten arg_str if too long
if len(args_str) > 100:
args_str = args_str[0:10] + '...' + args_str[-10:]
return f"Signal {self.signal_name}({args_str})"
import logging
import os
import sys
from rich.logging import RichHandler
from .CCommandRecord import CCommandRecord
from .CResultRecord import CResultRecord
from .CProcess import CProcess
from .CProcessControl import CProcessControl
sys.path.append(os.path.join(os.path.dirname(__file__), '../'))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment