Pro.MP
— Multi-Processing API¶
Available since Cerbero Suite 5.2 and Cerbero Engine 2.2.
Overview¶
Our products make use of parallel processing in terms of multi-threading whenever possible, but there are limitations to the capabilities of multi-threading.
Some of the advantages offered by multi-processing are:
Possible process isolation
Increased stability for 3rd party components
Overcoming the Global Interpreter Lock (GIL) in Python
When designing our multi-processing technology, we briefly took into consideration Python’s multiprocessing library, but we discarded the idea, because it wasn’t flexible enough for our intended purposes and we wanted to have an API not limited to Python.
We wanted our API not only to be flexible but also easy to use: when dealing with multi-processing there are challenges which we wanted to solve upfront, so that our users wouldn’t have to worry about them when using our API.
Additionally, since we wanted our multi-processing technology to also be fast and stable, we built it on top of ZeroMQ, an established ultra-fast messaging library which can be also used for clustered solutions.
Core Concepts¶
In our API there are managers and workers. The manager (ProManager
) is the object assigning tasks to workers (ProWorker
). A worker is a separate process launched in the background which awaits instructions from the manager.
We can create as many managers as we want from our process and a manager can have as many workers as permitted by the resources of the system.
The manager can be created from within any thread, but must be accessed from within a single thread. Periodically the ProManager.processMessages()
method of ProManager
should be called to process certain messages.
The worker processes messages from a dedicated thread and every task assigned to it is guaranteed to be executed in the main thread. That’s very important, because it allows workers to access the user-interface API if needed.
The manager and worker maintain a regular communication. When a worker exits, the manager is informed about it. When the manager stops responding to a worker, the worker exits. This behavior guarantees that workers don’t become zombie processes.
The following is a basic code example.
from Pro.MP import *
import time
def main():
m = ProManager()
m.startWorker()
for i in range(3):
m.processMessages()
time.sleep(1)
print("finished!")
main()
This code creates a manager, starts a worker and processes messages for three seconds. It doesn’t do anything apart keeping the worker alive.
We can build upon the previous code by launching a test message box.
from Pro.MP import *
import time
def main():
m = ProManager()
worker_id = m.startWorker()
m.testMessageBox(worker_id)
while m.isBusy():
m.processMessages()
time.sleep(0.5)
print("finished!")
main()
The main code finishes as soon as the message box is closed.
It is also possible to create multiple workers which all do the same task by using the special id ProWorker_All
.
from Pro.MP import *
import time
def main():
m = ProManager()
for i in range(3):
m.startWorker()
m.testMessageBox(ProWorker_All)
while m.isBusy():
m.processMessages()
time.sleep(0.5)
print("finished!")
main()
This time the main code finishes when all three message boxes are closed.
Output Redirection¶
Let’s now print something out from one of the workers.
from Pro.MP import *
import time
def main():
m = ProManager()
# we must specify this option in order to obtain the output from the workers
m.setOptions(ProMPOpt_RedirectOutput)
m.testMessage(m.startWorker())
for i in range(3):
m.processMessages()
time.sleep(1)
print("finished!")
main()
The output is of the code is:
Test message.
finished!
As explained in the code, the ProMPOpt_RedirectOutput
option must be set to obtain the output from the workers.
This option automatically simplifies one of the challenges when using multi-processing.
Let’s now launch multiple workers with a snippet of Python code to evaluate.
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_RedirectOutput)
for i in range(5):
m.startWorker()
m.evalPythonCode(ProWorker_All, "print('remote script')")
while m.isBusy():
m.processMessages()
time.sleep(0.5)
print("finished!")
main()
The output is a bit confusing:
remote scriptremote script
remote scriptremote scriptremote script
finished!
The reason for this is that the print function of Python internally writes the string and the new-line separately. Since in our case the execution is parallel, the strings and new-lines get mixed up.
To remedy this problem we can set the ProMPOpt_AtomicOutput
option. This option does nothing else than to discard writes of standalone new-lines and append a new-line to every incoming string if a new-line at the end is missing.
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_RedirectOutput | ProMPOpt_AtomicOutput)
for i in range(5):
m.startWorker()
m.evalPythonCode(ProWorker_All, "print('remote script')")
while m.isBusy():
m.processMessages()
time.sleep(0.5)
print("finished!")
main()
Now the output is what would be expected:
remote script
remote script
remote script
remote script
remote script
finished!
ProMPOpt_AtomicOutput
can be used in conjunction with ProMPOpt_RedirectOutput
or by itself, since it makes ProMPOpt_RedirectOutput
implicit.
We can also execute a Python script on disk:
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
for i in range(5):
m.startWorker()
m.executePythonScript(ProWorker_All, r"path/to/remote.py")
while m.isBusy():
m.processMessages()
time.sleep(0.2)
print("finished!")
main()
Calling Python Functions¶
If we need to call a function, we can use ProManager.evalPythonFunction()
and ProManager.executePythonFunction()
. These two methods are the counterparts of ProManager.evalPythonCode()
and ProManager.executePythonScript()
.
from Pro.Core import NTVariantList
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.startWorker()
code = """
def sum(a, b):
print(a + b)
"""
args = NTVariantList()
args.append(4)
args.append(5)
m.evalPythonFunction(ProWorker_All, code, "sum", args)
for i in range(10):
m.processMessages()
time.sleep(0.2)
print("finished!")
main()
The result of the call is outputted, but what if we want to retrieve the result from the remote call in our code?
In that case we can set the last argument of ProManager.evalPythonFunction()
to True
, which will cause the result of the call to be sent to the manager.
from Pro.Core import NTVariantList
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
worker_id = m.startWorker()
code = """
def sum(a, b):
return a + b
"""
args = NTVariantList()
args.append(4)
args.append(5)
m.evalPythonFunction(worker_id, code, "sum", args, True)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
res = m.takeResult(worker_id)
print("result:", res)
print("finished!")
main()
Similarly, we can launch multiple workers and collect the results from all of them:
from Pro.Core import NTVariantList
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
for i in range(10):
m.startWorker()
code = """
import random
def genRandom():
return random.randint(0, 1000)
"""
m.evalPythonFunction(ProWorker_All, code, "genRandom", NTVariantList(), True)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
while m.hasResults():
res = m.takeResult(ProWorker_Any)
print("result:", res)
print("finished!")
main()
The random output:
result: 4
result: 619
result: 277
result: 141
result: 542
result: 670
result: 541
result: 506
result: 248
result: 803
finished!
Custom Messaging¶
Many times we would want to establish a custom communication between the manager and the worker. For this purpose, we can define our own messages and send them.
A ProMPMessage
consists of an id and optional data. We can define our own message ids in the range of 0 - 0x7FFFFFFF (higher values are reserved for internal purposes).
The following snippet of code launches a worker with a snippet of Python code which waits for a request and sends a response. The manager sends a request and waits for a response. If the response is received, it prints out the content as a string.
from Pro.MP import *
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
worker_id = m.startWorker()
code = """
from Pro.MP import *
w = proWorkerObject()
if w.waitForMessage(1000):
msg = w.getMessage()
if msg.id == 1:
resp = ProMPMessage(2)
resp.data = b'remote message'
w.sendMessage(resp)
"""
m.evalPythonCode(worker_id, code)
req = ProMPMessage(1)
m.sendMessage(worker_id, req)
if m.waitForMessage(worker_id, 1000):
msg = m.getMessage(worker_id)
if msg.id == 2:
print(msg.data.decode("utf-8"))
else:
print("unknown message:", msg.id)
else:
print("no message")
print("finished!")
main()
The output is:
remote message
finished!
Multi-level Workers¶
As already mentioned, a single process can create multiple managers. That’s true even for worker processes.
Let’s take into consideration the following snippet which must be launched from the command-line using the ‘-r’ argument:
from Pro.MP import *
import time
if proWorkerProcessLevel() < 5:
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.startWorker()
m.executePythonScript(ProWorker_Any, __file__)
while m.isBusy():
m.processMessages()
time.sleep(0.2)
m = None
else:
# last worker
print("Hello, world!")
proWorkerProcessLevel()
returns the level of the worker process. The first process we launch has a level of 0, which means it’s the manager or, in this case, the root manager.
As long as proWorkerProcessLevel()
is less than 5, the code creates a manager, starts a worker and tells the worker to run itself. The last worker (level 5) prints out a message.
The output of the root process is:
Hello, world!
The reason is that the output is forwarded among each worker until it reaches the root manager.
Also important to notice is the following line in the script:
m = None
Since the code is not in a function, we don’t want to leave a reference to the manager as otherwise the root process may not terminate and so won’t its workers.
Remote Containers¶
There might be occasions in which a manager wants to share a container with a worker. The API to accomplish this is very simple: all the manager has to do is to share the container using ProManager.shareContainer()
or ProManager.shareWritableContainer()
and the worker can access the container using ProWorker.getSharedContainer()
.
Hint
Sharing containers with workers is a very inexpensive operation in terms of resources. Therefore, sharing many containers is not an issue.
In the following example a 10 mega-byte container is created with a signature appended at the end. A local and remote search is performed to find the signature. Both operations are timed.
from Pro.Core import NTContainer, MB_SIZE, NTTime
from Pro.MP import *
import time
remote_code = r"""
from Pro.Core import NTTime
from Pro.MP import *
def main():
c = proWorkerObject().getSharedContainer('NAME')
# remote search
magic = b'\xAA\xBB\xCC\xDD'
t = NTTime()
t.start()
match = c.findFirst(magic)
print('remote search (ms): ' + str(t.elapsed()))
main()
"""
def main():
magic = b"\xAA\xBB\xCC\xDD"
buf = b"\xFF" * 10 * MB_SIZE + magic
c = NTContainer()
c.setData(buf)
# local search
t = NTTime()
t.start()
match = c.findFirst(magic)
print("local search (ms):", t.elapsed())
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.shareContainer("NAME", c)
worker_id = m.startWorker()
m.evalPythonCode(worker_id, remote_code)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
main()
The output is:
local search (ms): 17
remote search (ms): 351
The reason for the time difference is, of course, that accessing the remote data is comparatively slower. This factor needs to be taken into consideration when working with remote containers.
Important
Remote containers shared using ProManager.shareContainer()
are read-only. This is for security reasons, as it wouldn’t be safe to allow other processes to change the original container. To allow workers to write into a remote container, it must have been shared using ProManager.shareWritableContainer()
.
In the next example the code asks the user to choose a Windows executable (PE), opens it and shares the container. The import table of the PE is then parsed from the worker process.
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
import time
remote_code = r'''
from Pro.Core import *
from Pro.MP import *
from Pro.PE import *
def main():
c = proWorkerObject().getSharedContainer("PE")
# print imported modules
obj = PEObject()
if not obj.Load(c):
print("error: couldn't load file")
return
imp = obj.ImportDirectory()
it = CFFStructIt(imp)
while it.hasNext():
cur = it.next()
name_rva = cur.Uns("Name")
name_offs = obj.RvaToOffset(name_rva)
if name_offs != INVALID_STREAM_OFFSET:
name = obj.ReadUInt8String(name_offs, 1000)[0]
name = name.decode("utf-8", errors="ignore")
print("imported module: " + name)
main()
'''
def main():
fname = proContext().getOpenFileName("Select Windows executable...", str(), "Executable files (*.exe)")
if not fname:
return
c = createContainerFromFile(fname)
if c.isNull():
return
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.shareContainer("PE", c)
worker_id = m.startWorker()
m.evalPythonCode(worker_id, remote_code)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
main()
An example of output is:
imported module: KERNEL32.dll
imported module: SHLWAPI.dll
In the following example the shared container is shown in a hex view from the worker process.
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
import time
remote_code = r'''
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
def main():
c = proWorkerObject().getSharedContainer("DATA")
ctx = proContext()
hv = ctx.createView(ProView.Type_Hex, "Remote Container Data")
hv.setData(c)
dlg = ctx.createDialog(hv)
dlg.show()
main()
'''
def main():
fname = proContext().getOpenFileName("Select a file...")
if not fname:
return
c = createContainerFromFile(fname)
if c.isNull():
return
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.shareContainer("DATA", c)
worker_id = m.startWorker()
m.evalPythonCode(worker_id, remote_code)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
main()
In this case, the code blocks the manager application. This can be avoided by displaying a wait dialog and by processing UI events, which brings us to our next paragraph.
Wait Objects¶
Managers and workers support wait objects. A wait object can be a wait dialog box or any other type of wait object.
Let’s take this basic code snippet which runs in a single process. The function doSomething
performs a task until it finishes or until the user aborts the operation from the wait dialog.
from Pro.UI import *
def doSomething(wo):
import time
i = 1
while not wo.wasAborted() and i < 101:
time.sleep(0.05)
wo.msg("Completed: " + str(i) + "%")
wo.progress(i)
wo.processEvents()
i += 1
def main():
wait = proContext().startWait("Doing something...")
doSomething(wait)
wait.stop()
main()
Let’s now write the same sample using multi-processing. This time doSomething
is executed in a different process.
from Pro.Core import NTVariantList
from Pro.MP import *
from Pro.UI import *
import time
remote_code = """
def doSomething(wo):
import time
i = 1
while not wo.wasAborted() and i < 101:
time.sleep(0.05)
wo.msg('Completed: ' + str(i) + '%')
wo.progress(i)
i += 1
def stub():
from Pro.MP import proWorkerObject
doSomething(proWorkerObject().waitObject())
"""
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
worker_id = m.startWorker()
ui_wait = proContext().startWait("Doing something...")
wait = m.createWaitObject(worker_id, ui_wait)
m.evalPythonFunction(worker_id, remote_code, "stub", NTVariantList())
while m.isBusy():
m.processMessages()
time.sleep(0.02)
wait.processEvents()
wait.stop()
main()
The code of doSomething
remained the same. We only removed the call to Pro.Core.NTIWait.processEvents()
as we didn’t need to process UI events any longer, but we could have left it there as it wouldn’t have had any effect.
The important thing to remember is that both wait objects must remain referenced as long as we need them, since ProManager.createWaitObject()
doesn’t add a reference to ui_wait
.
User Interface¶
We can now further expand our use of managers and workers to the context of a user interface. Let’s say we want to keep an interface responsive while also performing some CPU-intensive operation.
An obvious solution is to launch a worker to do the heavy lifting for us and just wait for a response. The way we process messages in a UI context is to start idle processing using Pro.UI.ProCustomView.startIdleNotifications()
on a custom view. This will enable the custom view to receive Pro.UI.pvnIdle
notifications, which in turn can be used to call ProManager.processMessages()
at fixed intervals.
The following code sample creates a custom view with a text control and inserts the text of every incoming message from the worker into the text control.
from Pro.UI import *
from Pro.MP import *
def MPViewCallback(cv, m, code, view, data):
if code == pvnInit:
cv.startIdleNotifications()
return 1
elif code == pvnIdle:
m.processMessages()
if m.hasMessage(ProWorker_Any):
text_view = cv.getView(1)
while True:
msg = m.getMessage(ProWorker_Any)
if msg.id == 1:
text_view.setSelectedText(msg.data.decode("utf-8"))
if not m.hasMessage(ProWorker_Any):
break
return 0
def main():
ctx = proContext()
v = ctx.createView(ProView.Type_Custom, "MP View")
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
worker_id = m.startWorker()
code = """
from Pro.MP import *
import time
w = proWorkerObject()
wo = w.waitObject()
msg = ProMPMessage(1)
i = 0
while not wo.wasAborted():
msg.data = b'remote message ' + str(i).encode('utf-8') + b'\\n'
w.sendMessage(msg)
time.sleep(1)
i += 1
"""
m.evalPythonCode(worker_id, code)
v.setup("<ui><hs><text id='1'/></hs></ui>", MPViewCallback, m)
ctx.addView(v)
main()
For the final code example we not only work with the UI, but also with wait objects.
We launch 10 workers. Each worker has a custom wait object which updates a progress bar in our view. The user can abort each worker by clicking on a ‘Cancel’ button next to the progress bar.
from Pro.Core import NTSimpleWait
from Pro.UI import *
from Pro.MP import *
remote_code = """
def doSomething(wo):
import time
i = 1
while not wo.wasAborted() and i < 101:
time.sleep(0.05)
wo.progress(i)
i += 1
from Pro.MP import proWorkerObject
doSomething(proWorkerObject().waitObject())
"""
class ProgressWait(NTSimpleWait):
def __init__(self, ctrl):
super(ProgressWait, self).__init__()
self.ctrl = ctrl
def progress(self, i):
self.ctrl.setValue(i)
class MPView(object):
def __init__(self):
pass
@staticmethod
def callback(cv, self, code, view, data):
if code == pvnInit:
self.worker_ids = []
# note: we must keep references to all wait objects
self.ui_wait_objects = []
self.mp_wait_object = []
# create workers
for i in range(self.worker_count):
worker_id = self.manager.startWorker()
self.worker_ids.append(worker_id)
ui_wo = ProgressWait(self.view.getView(i))
self.ui_wait_objects.append(ui_wo)
mp_wo = self.manager.createWaitObject(worker_id, ui_wo)
self.mp_wait_object.append(mp_wo)
self.manager.evalPythonCode(worker_id, remote_code)
cv.startIdleNotifications()
return 1
elif code == pvnIdle:
# process messages
self.manager.processMessages()
elif code == pvnButtonClicked:
view.setEnabled(False)
worker_id = self.worker_ids[view.id() - 1000]
self.manager.abortOperation(worker_id, 1000)
return 0
@staticmethod
def create():
ctx = proContext()
self = MPView()
self.worker_count = 10
# create manager
self.manager = ProManager()
self.manager.setOptions(ProMPOpt_AtomicOutput)
# create view
self.view = ctx.createView(ProView.Type_Custom, "MP View")
ui = "<ui><gl margin='20' spacing='20' align='top'>"
for i in range(self.worker_count):
ui += "<progbar id='%d'/><btn id='%d' text='Stop'/><nl/>" % (i, i + 1000)
ui += "</gl></ui>"
self.view.setup(ui, MPView.callback, self)
ctx.addView(self.view)
MPView.create()
Module API¶
Pro.MP module API.
Classes:
ProMPMessage
(id)This class represents a multi-processing message.
This class represents a manager.
This class represents a multi-process wait object.
This class represents a worker.
Attributes:
Id for an invalid message.
Improves the redirected output of workers by guaranteeing that each write results in a separate line.
Redirects the output of the worker to the process of the manager.
A special worker id which causes an operation to be performed by all the workers handled by the manager.
A special worker id which causes an operation to be performed by the first available worker.
Invalid worker id.
Functions:
Checks whether the current is the root manager process.
Checks whether the current is a worker process.
Returns the global worker object if the current is a worker process; otherwise returns
None
.Returns the worker level of the current process.
Returns the global ZeroMQ context.
- class ProMPMessage(id: Optional[int] = None)¶
This class represents a multi-processing message.
- Parameters
id (Optional[int]) – The id of the message.
See also
id
.Methods:
clear
()Clears the fields of the message.
Attributes:
The data of the message.
The id of the message.
- clear() → None¶
Clears the fields of the message.
- ProMPMessage_Invalid: Final[int]¶
Id for an invalid message.
See also
ProMPMessage.id
.
- ProMPOpt_AtomicOutput: Final[int]¶
Improves the redirected output of workers by guaranteeing that each write results in a separate line.
This option makes the
ProMPOpt_RedirectOutput
option implicit.See also
ProMPOpt_RedirectOutput
andProManager.setOptions()
.
- ProMPOpt_RedirectOutput: Final[int]¶
Redirects the output of the worker to the process of the manager.
See also
ProMPOpt_AtomicOutput
andProManager.setOptions()
.
- class ProManager¶
This class represents a manager.
See also
ProWorker
.Methods:
abortOperation
(worker_id[, timeout])Aborts an operation given a maximum time-out.
createWaitObject
(worker_id, wo)Wraps a wait object into a wait object which operates on the worker.
evalPythonCode
(worker_id, script)Executes Python code.
evalPythonFunction
(worker_id, script, …[, …])Calls a Python function.
executePythonFunction
(worker_id, file_name, …)Calls a Python function from a file.
executePythonScript
(worker_id, file_name)Executes Python code from a file.
Returns the manager time-out interval.
getMessage
(worker_id)Retrieves a pending message without blocking.
Returns the options for the workers.
Retrieves the worker id for an incoming message.
hasMessage
(worker_id)Checks the presence of a message without blocking.
Returns
True
if results are available; otherwise returnsFalse
.
hasSharedContainer
(name)Checks the availability of a shared container.
isBusy
()Returns
True
if any worker is busy or if there are messages to be processed; otherwise returnsFalse
.
isWorkerAlive
(worker_id)Checks whether a worker is alive.
isWorkerBusy
(worker_id)Checks whether a worker is busy.
isWorkerReady
(worker_id)Check whether a worker is ready to process messages.
Processes pending operations which must be processed from within the thread that created the manager.
sendMessage
(worker_id, msg)Sends a message to a worker.
setManagerTimeout
(timeout)Sets the manager time-out interval.
setOptions
(options)Sets the options for the worker.
shareContainer
(name, c)Shares a container with the workers.
shareWritableContainer
(name, c[, max_size])Shares a writable container with the workers.
softAbort
(worker_id)Sets the abort status for the wait object of the specified worker.
Starts a worker.
Stops all the workers.
stopWorker
(worker_id)Stops a worker.
takeResult
(worker_id[, deflt])Takes a result from the list of available results.
testMessage
(worker_id)Instructs the worker to print a test message.
testMessageBox
(worker_id)Instructs the worker to display a test message box.
unshareContainer
(name)Removes a shared container.
waitForMessage
(worker_id, timeout)Waits for a message.
Returns the number of workers which are alive.
Returns the number of workers which are busy.
- abortOperation(worker_id: int, timeout: int = 1000) → None¶
Aborts an operation given a maximum time-out.
If the operation couldn’t be aborted within
timeout
, the worker is terminated.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.timeout (int) – The time-out in milliseconds.
See also
createWaitObject()
.
- createWaitObject(worker_id: int, wo: Pro.Core.NTIWait) → Pro.MP.ProManagerWait¶
Wraps a wait object into a wait object which operates on the worker.
Important
This method doesn’t increase the reference count of the input wait object. Therefore, a reference must be maintained for both the input wait object and for the returned wait object.
- Parameters
worker_id (int) – The specific worker to which the input wait object is assigned.
wo (NTIWait) – The input wait object.
- Returns
Returns the multi-processing wait object.
- Return type
See also
Pro.Core.NTIWait
andabortOperation()
.
- evalPythonCode(worker_id: int, script: str) → None¶
Executes Python code.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.script (str) – The Python code.
See also
executePythonScript()
,evalPythonFunction()
andexecutePythonFunction()
.
- evalPythonFunction(worker_id: int, script: str, func_name: str, args: Pro.Core.NTVariantList, enqueue_result: bool = False) → None¶
Calls a Python function.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.script (str) – The Python code.
func_name (str) – The name of the function to call.
args (NTVariantList) – The function argument list.
enqueue_result (bool) – If
True
, transmits the value returned by the function.See also
executePythonFunction()
,evalPythonCode()
andexecutePythonScript()
.
- executePythonFunction(worker_id: int, file_name: str, func_name: str, args: Pro.Core.NTVariantList, enqueue_result: bool = False) → None¶
Calls a Python function from a file.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.file_name (str) – The path of the Python script.
func_name (str) – The name of the function to call.
args (NTVariantList) – The function argument list.
enqueue_result (bool) – If
True
, transmits the value returned by the function.See also
evalPythonFunction()
,executePythonScript()
andevalPythonCode()
.
- executePythonScript(worker_id: int, file_name: str) → None¶
Executes Python code from a file.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.file_name (str) – The path of the Python script.
See also
evalPythonCode()
,executePythonFunction()
andevalPythonFunction()
.
- getManagerTimeout() → int¶
- Returns
Returns the manager time-out interval.
- Return type
int
See also
setManagerTimeout()
andprocessMessages()
.
- getMessage(worker_id: int) → Pro.MP.ProMPMessage¶
Retrieves a pending message without blocking.
- Parameters
worker_id (int) – The worker for which to retrieve the message.
ProWorker_All
orProWorker_Any
can be specified as id.- Returns
Returns a message with a valid id if a message is available; otherwise returns a message with an invalid id.
- Return type
See also
ProMPMessage
,hasMessage()
,waitForMessage()
andsendMessage()
.
- getOptions() → int¶
- Returns
Returns the options for the workers.
- Return type
int
See also
setOptions()
.
- getWorkerFromIncomingMessage(msg: Pro.MP.ProMPMessage) → int¶
Retrieves the worker id for an incoming message.
- Parameters
msg (ProMPMessage) – The incoming message.
- Returns
Returns the worker id if successful; otherwise returns
ProWorker_Invalid
.- Return type
int
See also
getMessage()
andProMPMessage
.
- hasMessage(worker_id: int) → bool¶
Checks the presence of a message without blocking.
- Parameters
worker_id (int) – The worker for which to check the presence of a message.
ProWorker_All
orProWorker_Any
can be specified as id.- Returns
Returns
True
if a message is available; otherwise returnsFalse
.- Return type
bool
See also
getMessage()
,waitForMessage()
andsendMessage()
.
- hasResults() → bool¶
- Returns
Returns
True
if results are available; otherwise returnsFalse
.- Return type
bool
See also
takeResult()
,evalPythonFunction()
andexecutePythonFunction()
.
Checks the availability of a shared container.
- Parameters
name (str) – The public name of the container.
- Returns
Returns
True
if the shared container is available; otherwise returnsFalse
.- Return type
bool
See also
shareContainer()
andunshareContainer()
.
- isBusy() → bool¶
- Returns
Returns
True
if any worker is busy or if there are messages to be processed; otherwise returnsFalse
.- Return type
bool
See also
processMessages()
andworkersBusy()
.
- isWorkerAlive(worker_id: int) → bool¶
Checks whether a worker is alive.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.- Returns
Returns
True
if the worker is alive; otherwise returnsFalse
.- Return type
bool
See also
workersAlive()
andisWorkerBusy()
.
- isWorkerBusy(worker_id: int) → bool¶
Checks whether a worker is busy.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.- Returns
Returns
True
if the worker is busy; otherwise returnsFalse
.- Return type
bool
See also
workersBusy()
andisWorkerAlive()
.
- isWorkerReady(worker_id: int) → bool¶
Check whether a worker is ready to process messages.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.- Returns
Returns
True
if the worker is ready; otherwise returnsFalse
.- Return type
bool
See also
isWorkerAlive()
andisWorkerBusy()
.
- processMessages() → None¶
Processes pending operations which must be processed from within the thread that created the manager.
See also
hasMessage()
,waitForMessage()
,getMessage()
andsendMessage()
.
- sendMessage(worker_id: int, msg: Pro.MP.ProMPMessage) → bool¶
Sends a message to a worker.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.msg (ProMPMessage) – The message to send.
- Returns
Returns
True
if successful; otherwise returnsFalse
.- Return type
bool
See also
ProMPMessage
,getMessage()
,hasMessage()
andwaitForMessage()
.
- setManagerTimeout(timeout: int) → None¶
Sets the manager time-out interval.
Note
The manager time-out interval must be set before starting workers.
- Parameters
timeout (int) – The time-out in milliseconds.
See also
getManagerTimeout()
.
- setOptions(options: int) → None¶
Sets the options for the worker.
Note
The options must be set before starting workers.
- Parameters
options (int) – The options to set.
See also
getOptions()
,ProMPOpt_RedirectOutput
andProMPOpt_AtomicOutput
.
Shares a container with the workers.
Hint
Sharing containers with workers is a very inexpensive operation in terms of resources. Therefore, sharing many containers is not an issue.
- Parameters
name (str) – The public name of the container.
c (NTContainer) – The container to share.
See also
shareWritableContainer()
,unshareContainer()
,hasSharedContainer()
,ProWorker.getSharedContainer()
andPro.Core.NTContainer
.
Shares a writable container with the workers.
Hint
Sharing containers with workers is a very inexpensive operation in terms of resources. Therefore, sharing many containers is not an issue.
- Parameters
name (str) – The public name of the container.
c (NTContainer) – The container to share.
max_size (Optional[int]) – The maximum size that the container can reach.
Available since Cerbero Suite 6.0 and Cerbero Engine 3.0.
See also
shareContainer()
,unshareContainer()
,hasSharedContainer()
,ProWorker.getSharedContainer()
andPro.Core.NTContainer
.
- softAbort(worker_id: int) → None¶
Sets the abort status for the wait object of the specified worker.
Available since Cerbero Suite 5.5 and Cerbero Engine 2.5.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.
- startWorker() → int¶
Starts a worker.
- Returns
Returns the worker id if successful; otherwise returns
ProWorker_Invalid
.- Return type
int
See also
stopWorker()
andstopAllWorkers()
.
- stopAllWorkers() → None¶
Stops all the workers.
Note
Internally this method calls
stopWorker()
withProWorker_All
.See also
stopWorker()
andstartWorker()
.
- stopWorker(worker_id: int) → None¶
Stops a worker.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.See also
startWorker()
andstopAllWorkers()
.
- takeResult(worker_id: int, deflt: Optional[Union[int, float, bool, bytes, str]] = None) → Optional[Union[int, float, bool, bytes, str]]¶
Takes a result from the list of available results.
The returned result is removed from the internal list.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.deflt (BasicType) – The default value in case the result is not available.
- Returns
Returns the result if available; otherwise returns
deflt
.- Return type
BasicType
See also
hasResults()
,evalPythonFunction()
andexecutePythonFunction()
.
- testMessage(worker_id: int) → None¶
Instructs the worker to print a test message.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.See also
testMessageBox()
.
- testMessageBox(worker_id: int) → None¶
Instructs the worker to display a test message box.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.See also
testMessage()
.
Removes a shared container.
- Parameters
name (str) – The public name of the container.
See also
shareContainer()
,hasSharedContainer()
,ProWorker.getSharedContainer()
andPro.Core.NTContainer
.
- waitForMessage(worker_id: int, timeout: int) → bool¶
Waits for a message.
- Parameters
worker_id (int) – The worker id.
ProWorker_All
orProWorker_Any
can be specified as id.timeout (int) – The time-out in milliseconds for the wait.
- Returns
Returns
True
if a message is available; otherwise returnsFalse
.- Return type
bool
See also
hasMessage()
,getMessage()
andsendMessage()
.
- workersAlive() → int¶
- Returns
Returns the number of workers which are alive.
- Return type
int
See also
isWorkerAlive()
andworkersBusy()
.
- workersBusy() → int¶
- Returns
Returns the number of workers which are busy.
- Return type
int
See also
isWorkerBusy()
andworkersAlive()
.
- class ProManagerWait¶
Bases:
Pro.Core.NTIWait
This class represents a multi-process wait object.
See also
ProManager.createWaitObject()
.Methods:
setAbortTimeout
(timeout)Sets the time-out for when the operation is aborted.
setUseSoftAbort
([b])Sets the behavior of
Pro.Core.NTIWait.abort()
.Sets the abort status for the wait object without calling
ProManager.abortOperation()
.
- setAbortTimeout(timeout: int) → None¶
Sets the time-out for when the operation is aborted.
Note
Internally
Pro.Core.NTIWait.abort()
callsProManager.abortOperation()
with the specified time-out. The default time-out is 1 second. To avoid a hard abort, usesetUseSoftAbort()
orsoftAbort()
. When a soft abort is performed, the time-out value is ignored.
- Parameters
timeout (int) – The time-out in milliseconds.
See also
Pro.Core.NTIWait.abort()
,ProManager.abortOperation()
,setUseSoftAbort()
andsoftAbort()
.
- setUseSoftAbort(b: bool = True) → None¶
Sets the behavior of
Pro.Core.NTIWait.abort()
.By default calling
Pro.Core.NTIWait.abort()
results in a hard abort (seeProManager.abortOperation()
).
- Parameters
b (bool) – If
True
,softAbort()
is called when executingPro.Core.NTIWait.abort()
.Available since Cerbero Suite 5.5 and Cerbero Engine 2.5.
See also
Pro.Core.NTIWait.abort()
,ProManager.abortOperation()
,setAbortTimeout()
andsoftAbort()
.
- softAbort() → None¶
Sets the abort status for the wait object without calling
ProManager.abortOperation()
.Available since Cerbero Suite 5.5 and Cerbero Engine 2.5.
See also
Pro.Core.NTIWait.abort()
,ProManager.abortOperation()
,setAbortTimeout()
andsetUseSoftAbort()
.
- class ProWorker¶
This class represents a worker.
See also
ProManager
.Methods:
Returns the TCP address of the manager.
Returns the TCP port of the manager.
Returns the manager time-out interval.
Retrieves a pending message without blocking.
Returns the options for the worker.
getSharedContainer
(name)Obtains a shared container from the manager.
Returns the worker id.
Checks the presence of a message without blocking.
isBusy
()Returns
True
if the worker is busy; otherwise returnsFalse
.
remotePrint
(s)Prints a string in the process of the manager.
sendMessage
(msg)Sends a message to the manager.
sendResult
(result)Sends the result of an operation to the manager.
setManagerTimeout
(timeout)Sets the manager time-out interval.
setOptions
(options)Sets the options of the worker.
setupConnection
(address, port)Sets up the connection to the manager.
waitForMessage
(timeout)Waits for a message.
Returns a wait object which can be used by the worker.
Returns
True
if the worker was stopped by the manager; otherwise returnsFalse
.
- getConnectionAddress() → str¶
- Returns
Returns the TCP address of the manager.
- Return type
str
See also
getConnectionPort()
andsetupConnection()
.
- getConnectionPort() → int¶
- Returns
Returns the TCP port of the manager.
- Return type
int
See also
getConnectionAddress()
andsetupConnection()
.
- getManagerTimeout() → int¶
- Returns
Returns the manager time-out interval.
- Return type
int
See also
setManagerTimeout()
.
- getMessage() → Pro.MP.ProMPMessage¶
Retrieves a pending message without blocking.
- Returns
Returns a message with a valid id if a message is available; otherwise returns a message with an invalid id.
- Return type
See also
ProMPMessage
,hasMessage()
,waitForMessage()
andsendMessage()
.
- getOptions() → int¶
- Returns
Returns the options for the worker.
- Return type
int
See also
setOptions()
.
Obtains a shared container from the manager.
- Parameters
name (str) – The name of the public container.
- Returns
Returns a valid container if successful; otherwise returns an invalid container.
- Return type
See also
ProManager.shareContainer()
,ProManager.shareWritableContainer()
andPro.Core.NTContainer
.
- getWorkerId() → int¶
- Returns
Returns the worker id.
- Return type
int
- hasMessage() → bool¶
Checks the presence of a message without blocking.
- Returns
Returns
True
if a message is available; otherwise returnsFalse
.- Return type
bool
See also
getMessage()
,waitForMessage()
andsendMessage()
.
- isBusy() → bool¶
- Returns
Returns
True
if the worker is busy; otherwise returnsFalse
.- Return type
bool
- remotePrint(s: str) → None¶
Prints a string in the process of the manager.
Important
The manager must have allowed output redirection for the worker.
- Parameters
s (str) – The string to print.
See also
ProManager.setOptions()
,ProMPOpt_RedirectOutput
andProMPOpt_AtomicOutput
.
- sendMessage(msg: Pro.MP.ProMPMessage) → bool¶
Sends a message to the manager.
- Parameters
msg (ProMPMessage) – The message to send.
- Returns
Returns
True
if successful; otherwise returnsFalse
.- Return type
bool
See also
ProMPMessage
,getMessage()
,hasMessage()
andwaitForMessage()
.
- sendResult(result: Optional[Union[int, float, bool, bytes, str]]) → bool¶
Sends the result of an operation to the manager.
- Parameters
result (BasicType) – The result to send.
- Returns
Returns
True
if successful; otherwise returnsFalse
.- Return type
bool
Available since Cerbero Suite 5.3 and Cerbero Engine 2.3.
See also
ProManager.hasResults()
andProManager.takeResult()
.
- setManagerTimeout(timeout: int) → None¶
Sets the manager time-out interval.
- Parameters
timeout (int) – The time-out in milliseconds.
See also
getManagerTimeout()
.
- setOptions(options: int) → None¶
Sets the options of the worker.
- Parameters
options (int) – The options.
See also
getOptions()
.
- setupConnection(address: str, port: int) → None¶
Sets up the connection to the manager.
- Parameters
address (str) – The TCP address of the manager.
port (int) – The TCP port of the manager.
See also
getConnectionAddress()
andgetConnectionPort()
.
- waitForMessage(timeout: int) → bool¶
Waits for a message.
- Parameters
timeout (int) – The time-out in milliseconds for the wait.
- Returns
Returns
True
if a message is available; otherwise returnsFalse
.- Return type
bool
See also
hasMessage()
,getMessage()
andsendMessage()
.
- waitObject() → Pro.Core.NTIWait¶
Returns a wait object which can be used by the worker.
Hint
A wait object is returned even if the manager didn’t set up a wait object for the worker. Calling the methods of a wait object which is not mirrored on the manager side has no effect.
- Returns
Returns the wait object.
- Return type
See also
Pro.Core.NTIWait
andProManager.createWaitObject()
.
- wasStopped() → bool¶
- Returns
Returns
True
if the worker was stopped by the manager; otherwise returnsFalse
.- Return type
bool
- ProWorker_All: Final[int]¶
A special worker id which causes an operation to be performed by all the workers handled by the manager.
See also
ProWorker_Any
.
- ProWorker_Any: Final[int]¶
A special worker id which causes an operation to be performed by the first available worker.
See also
ProWorker_All
.
- ProWorker_Invalid: Final[int]¶
Invalid worker id.
See also
ProManager.startWorker()
.
- proIsManagerProcess() → bool¶
Checks whether the current is the root manager process.
- Returns
Returns
True
ifproWorkerProcessLevel()
returns0
; otherwise returnsFalse
.- Return type
bool
See also
proIsWorkerProcess()
andproWorkerProcessLevel()
.
- proIsWorkerProcess() → bool¶
Checks whether the current is a worker process.
- Returns
Returns
True
ifproWorkerProcessLevel()
returns1
or greater; otherwise returnsFalse
.- Return type
bool
See also
proIsManagerProcess()
,proWorkerProcessLevel()
andproWorkerObject()
.
- proWorkerObject() → Pro.MP.ProWorker¶
- Returns
Returns the global worker object if the current is a worker process; otherwise returns
None
.- Return type
See also
proIsWorkerProcess()
andproWorkerProcessLevel()
.
- proWorkerProcessLevel() → int¶
- Returns
Returns the worker level of the current process.
- Return type
int
See also
proIsManagerProcess()
,proIsWorkerProcess()
andproWorkerObject()
, .
- proZMQContext() → native_pointer¶
- Returns
Returns the global ZeroMQ context.
- Return type
native_pointer