Pro.MP — Multi-Processing API

Available since Cerbero Suite 5.2 and Cerbero Engine 2.2.

Overview

Our products make use of parallel processing in terms of multi-threading whenever possible, but there are limitations to the capabilities of multi-threading.

Some of the advantages offered by multi-processing are:

  • Possible process isolation

  • Increased stability for 3rd party components

  • Overcoming the Global Interpreter Lock (GIL) in Python

When designing our multi-processing technology, we briefly took into consideration Python’s multiprocessing library, but we discarded the idea, because it wasn’t flexible enough for our intended purposes and we wanted to have an API not limited to Python.

We wanted our API not only to be flexible but also easy to use: when dealing with multi-processing there are challenges which we wanted to solve upfront, so that our users wouldn’t have to worry about them when using our API.

Additionally, since we wanted our multi-processing technology to also be fast and stable, we built it on top of ZeroMQ, an established ultra-fast messaging library which can be also used for clustered solutions.

Core Concepts

In our API there are managers and workers. The manager (ProManager) is the object assigning tasks to workers (ProWorker). A worker is a separate process launched in the background which awaits instructions from the manager.

We can create as many managers as we want from our process and a manager can have as many workers as permitted by the resources of the system.

The manager can be created from within any thread, but must be accessed from within a single thread. Periodically the ProManager.processMessages() method of ProManager should be called to process certain messages.

The worker processes messages from a dedicated thread and every task assigned to it is guaranteed to be executed in the main thread. That’s very important, because it allows workers to access the user-interface API if needed.

The manager and worker maintain a regular communication. When a worker exits, the manager is informed about it. When the manager stops responding to a worker, the worker exits. This behavior guarantees that workers don’t become zombie processes.

The following is a basic code example.

from Pro.MP import *
import time

def main():
    m = ProManager()
    m.startWorker()

    for i in range(3):
        m.processMessages()
        time.sleep(1)

    print("finished!")

main()

This code creates a manager, starts a worker and processes messages for three seconds. It doesn’t do anything apart keeping the worker alive.

We can build upon the previous code by launching a test message box.

from Pro.MP import *
import time

def main():
    m = ProManager()

    worker_id = m.startWorker()
    m.testMessageBox(worker_id)

    while m.isBusy():
        m.processMessages()
        time.sleep(0.5)

    print("finished!")

main()

The main code finishes as soon as the message box is closed.

It is also possible to create multiple workers which all do the same task by using the special id ProWorker_All.

from Pro.MP import *
import time

def main():
    m = ProManager()

    for i in range(3):
        m.startWorker()

    m.testMessageBox(ProWorker_All)

    while m.isBusy():
        m.processMessages()
        time.sleep(0.5)

    print("finished!")

main()

This time the main code finishes when all three message boxes are closed.

Output Redirection

Let’s now print something out from one of the workers.

from Pro.MP import *
import time

def main():
    m = ProManager()
    # we must specify this option in order to obtain the output from the workers
    m.setOptions(ProMPOpt_RedirectOutput)

    m.testMessage(m.startWorker())

    for i in range(3):
        m.processMessages()
        time.sleep(1)

    print("finished!")

main()

The output is of the code is:

Test message.
finished!

As explained in the code, the ProMPOpt_RedirectOutput option must be set to obtain the output from the workers.

This option automatically simplifies one of the challenges when using multi-processing.

Let’s now launch multiple workers with a snippet of Python code to evaluate.

from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_RedirectOutput)

    for i in range(5):
        m.startWorker()

    m.evalPythonCode(ProWorker_All, "print('remote script')")

    while m.isBusy():
        m.processMessages()
        time.sleep(0.5)

    print("finished!")

main()

The output is a bit confusing:

remote scriptremote script

remote scriptremote scriptremote script


finished!

The reason for this is that the print function of Python internally writes the string and the new-line separately. Since in our case the execution is parallel, the strings and new-lines get mixed up.

To remedy this problem we can set the ProMPOpt_AtomicOutput option. This option does nothing else than to discard writes of standalone new-lines and append a new-line to every incoming string if a new-line at the end is missing.

from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_RedirectOutput | ProMPOpt_AtomicOutput)

    for i in range(5):
        m.startWorker()

    m.evalPythonCode(ProWorker_All, "print('remote script')")

    while m.isBusy():
        m.processMessages()
        time.sleep(0.5)

    print("finished!")

main()

Now the output is what would be expected:

remote script
remote script
remote script
remote script
remote script
finished!

ProMPOpt_AtomicOutput can be used in conjunction with ProMPOpt_RedirectOutput or by itself, since it makes ProMPOpt_RedirectOutput implicit.

We can also execute a Python script on disk:

from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    for i in range(5):
        m.startWorker()

    m.executePythonScript(ProWorker_All, r"path/to/remote.py")

    while m.isBusy():
        m.processMessages()
        time.sleep(0.2)

    print("finished!")

main()

Calling Python Functions

If we need to call a function, we can use ProManager.evalPythonFunction() and ProManager.executePythonFunction(). These two methods are the counterparts of ProManager.evalPythonCode() and ProManager.executePythonScript().

from Pro.Core import NTVariantList
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    m.startWorker()

    code = """
def sum(a, b):
    print(a + b)
"""

    args = NTVariantList()
    args.append(4)
    args.append(5)

    m.evalPythonFunction(ProWorker_All, code, "sum", args)

    for i in range(10):
        m.processMessages()
        time.sleep(0.2)

    print("finished!")

main()

The result of the call is outputted, but what if we want to retrieve the result from the remote call in our code?

In that case we can set the last argument of ProManager.evalPythonFunction() to True, which will cause the result of the call to be sent to the manager.

from Pro.Core import NTVariantList
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    worker_id = m.startWorker()

    code = """
def sum(a, b):
    return a + b
"""

    args = NTVariantList()
    args.append(4)
    args.append(5)

    m.evalPythonFunction(worker_id, code, "sum", args, True)

    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)

    res = m.takeResult(worker_id)
    print("result:", res)

    print("finished!")

main()

Similarly, we can launch multiple workers and collect the results from all of them:

from Pro.Core import NTVariantList
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    for i in range(10):
        m.startWorker()

    code = """
import random

def genRandom():
    return random.randint(0, 1000)
"""

    m.evalPythonFunction(ProWorker_All, code, "genRandom", NTVariantList(), True)

    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)

    while m.hasResults():
        res = m.takeResult(ProWorker_Any)
        print("result:", res)

    print("finished!")

main()

The random output:

result: 4
result: 619
result: 277
result: 141
result: 542
result: 670
result: 541
result: 506
result: 248
result: 803
finished!

Custom Messaging

Many times we would want to establish a custom communication between the manager and the worker. For this purpose, we can define our own messages and send them.

A ProMPMessage consists of an id and optional data. We can define our own message ids in the range of 0 - 0x7FFFFFFF (higher values are reserved for internal purposes).

The following snippet of code launches a worker with a snippet of Python code which waits for a request and sends a response. The manager sends a request and waits for a response. If the response is received, it prints out the content as a string.

from Pro.MP import *

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    worker_id = m.startWorker()

    code = """
from Pro.MP import *

w = proWorkerObject()
if w.waitForMessage(1000):
    msg = w.getMessage()
    if msg.id == 1:
        resp = ProMPMessage(2)
        resp.data = b'remote message'
        w.sendMessage(resp)
"""
    m.evalPythonCode(worker_id, code)

    req = ProMPMessage(1)
    m.sendMessage(worker_id, req)

    if m.waitForMessage(worker_id, 1000):
        msg = m.getMessage(worker_id)
        if msg.id == 2:
            print(msg.data.decode("utf-8"))
        else:
            print("unknown message:", msg.id)
    else:
        print("no message")

    print("finished!")

main()

The output is:

remote message
finished!

Multi-level Workers

As already mentioned, a single process can create multiple managers. That’s true even for worker processes.

Let’s take into consideration the following snippet which must be launched from the command-line using the ‘-r’ argument:

from Pro.MP import *
import time

if proWorkerProcessLevel() < 5:
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    m.startWorker()
    m.executePythonScript(ProWorker_Any, __file__)

    while m.isBusy():
        m.processMessages()
        time.sleep(0.2)

    m = None
else:
    # last worker
    print("Hello, world!")

proWorkerProcessLevel() returns the level of the worker process. The first process we launch has a level of 0, which means it’s the manager or, in this case, the root manager.

As long as proWorkerProcessLevel() is less than 5, the code creates a manager, starts a worker and tells the worker to run itself. The last worker (level 5) prints out a message.

The output of the root process is:

Hello, world!

The reason is that the output is forwarded among each worker until it reaches the root manager.

Also important to notice is the following line in the script:

m = None

Since the code is not in a function, we don’t want to leave a reference to the manager as otherwise the root process may not terminate and so won’t its workers.

Remote Containers

There might be occasions in which a manager wants to share a container with a worker. The API to accomplish this is very simple: all the manager has to do is to share the container using ProManager.shareContainer() or ProManager.shareWritableContainer() and the worker can access the container using ProWorker.getSharedContainer().

Hint

Sharing containers with workers is a very inexpensive operation in terms of resources. Therefore, sharing many containers is not an issue.

In the following example a 10 mega-byte container is created with a signature appended at the end. A local and remote search is performed to find the signature. Both operations are timed.

from Pro.Core import NTContainer, MB_SIZE, NTTime
from Pro.MP import *
import time

remote_code = r"""
from Pro.Core import NTTime
from Pro.MP import *

def main():
    c = proWorkerObject().getSharedContainer('NAME')
    # remote search
    magic = b'\xAA\xBB\xCC\xDD'
    t = NTTime()
    t.start()
    match = c.findFirst(magic)
    print('remote search (ms): ' + str(t.elapsed()))

main()
"""

def main():
    magic = b"\xAA\xBB\xCC\xDD"
    buf = b"\xFF" * 10 * MB_SIZE + magic
    c = NTContainer()
    c.setData(buf)

    # local search
    t = NTTime()
    t.start()
    match = c.findFirst(magic)
    print("local search (ms):", t.elapsed())

    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    m.shareContainer("NAME", c)

    worker_id = m.startWorker()

    m.evalPythonCode(worker_id, remote_code)

    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)

main()

The output is:

local search (ms): 17
remote search (ms): 351

The reason for the time difference is, of course, that accessing the remote data is comparatively slower. This factor needs to be taken into consideration when working with remote containers.

Important

Remote containers shared using ProManager.shareContainer() are read-only. This is for security reasons, as it wouldn’t be safe to allow other processes to change the original container. To allow workers to write into a remote container, it must have been shared using ProManager.shareWritableContainer().

In the next example the code asks the user to choose a Windows executable (PE), opens it and shares the container. The import table of the PE is then parsed from the worker process.

from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
import time

remote_code = r'''
from Pro.Core import *
from Pro.MP import *
from Pro.PE import *

def main():
    c = proWorkerObject().getSharedContainer("PE")
    # print imported modules
    obj = PEObject()
    if not obj.Load(c):
        print("error: couldn't load file")
        return
    imp = obj.ImportDirectory()

    it = CFFStructIt(imp)
    while it.hasNext():
        cur = it.next()
        name_rva = cur.Uns("Name")
        name_offs = obj.RvaToOffset(name_rva)
        if name_offs != INVALID_STREAM_OFFSET:
            name = obj.ReadUInt8String(name_offs, 1000)[0]
            name = name.decode("utf-8", errors="ignore")
            print("imported module: " + name)

main()
'''

def main():
    fname = proContext().getOpenFileName("Select Windows executable...", str(), "Executable files (*.exe)")
    if not fname:
        return

    c = createContainerFromFile(fname)
    if c.isNull():
        return

    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    m.shareContainer("PE", c)

    worker_id = m.startWorker()

    m.evalPythonCode(worker_id, remote_code)

    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)

main()

An example of output is:

imported module: KERNEL32.dll
imported module: SHLWAPI.dll

In the following example the shared container is shown in a hex view from the worker process.

from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
import time

remote_code = r'''
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *

def main():
    c = proWorkerObject().getSharedContainer("DATA")
    ctx = proContext()
    hv = ctx.createView(ProView.Type_Hex, "Remote Container Data")
    hv.setData(c)
    dlg = ctx.createDialog(hv)
    dlg.show()

main()
'''

def main():
    fname = proContext().getOpenFileName("Select a file...")
    if not fname:
        return

    c = createContainerFromFile(fname)
    if c.isNull():
        return

    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    m.shareContainer("DATA", c)

    worker_id = m.startWorker()

    m.evalPythonCode(worker_id, remote_code)

    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)

main()

In this case, the code blocks the manager application. This can be avoided by displaying a wait dialog and by processing UI events, which brings us to our next paragraph.

Wait Objects

Managers and workers support wait objects. A wait object can be a wait dialog box or any other type of wait object.

Let’s take this basic code snippet which runs in a single process. The function doSomething performs a task until it finishes or until the user aborts the operation from the wait dialog.

from Pro.UI import *

def doSomething(wo):
    import time
    i = 1
    while not wo.wasAborted() and i < 101:
        time.sleep(0.05)
        wo.msg("Completed: " + str(i) + "%")
        wo.progress(i)
        wo.processEvents()
        i += 1

def main():
    wait = proContext().startWait("Doing something...")
    doSomething(wait)
    wait.stop()

main()

Let’s now write the same sample using multi-processing. This time doSomething is executed in a different process.

from Pro.Core import NTVariantList
from Pro.MP import *
from Pro.UI import *
import time

remote_code = """
def doSomething(wo):
    import time
    i = 1
    while not wo.wasAborted() and i < 101:
        time.sleep(0.05)
        wo.msg('Completed: ' + str(i) + '%')
        wo.progress(i)
        i += 1

def stub():
    from Pro.MP import proWorkerObject
    doSomething(proWorkerObject().waitObject())
"""

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    worker_id = m.startWorker()

    ui_wait = proContext().startWait("Doing something...")
    wait = m.createWaitObject(worker_id, ui_wait)

    m.evalPythonFunction(worker_id, remote_code, "stub", NTVariantList())

    while m.isBusy():
        m.processMessages()
        time.sleep(0.02)
        wait.processEvents()

    wait.stop()

main()

The code of doSomething remained the same. We only removed the call to Pro.Core.NTIWait.processEvents() as we didn’t need to process UI events any longer, but we could have left it there as it wouldn’t have had any effect.

The important thing to remember is that both wait objects must remain referenced as long as we need them, since ProManager.createWaitObject() doesn’t add a reference to ui_wait.

User Interface

We can now further expand our use of managers and workers to the context of a user interface. Let’s say we want to keep an interface responsive while also performing some CPU-intensive operation.

An obvious solution is to launch a worker to do the heavy lifting for us and just wait for a response. The way we process messages in a UI context is to start idle processing using Pro.UI.ProCustomView.startIdleNotifications() on a custom view. This will enable the custom view to receive Pro.UI.pvnIdle notifications, which in turn can be used to call ProManager.processMessages() at fixed intervals.

The following code sample creates a custom view with a text control and inserts the text of every incoming message from the worker into the text control.

from Pro.UI import *
from Pro.MP import *

def MPViewCallback(cv, m, code, view, data):
    if code == pvnInit:
        cv.startIdleNotifications()
        return 1
    elif code == pvnIdle:
        m.processMessages()
        if m.hasMessage(ProWorker_Any):
            text_view = cv.getView(1)
            while True:
                msg = m.getMessage(ProWorker_Any)
                if msg.id == 1:
                    text_view.setSelectedText(msg.data.decode("utf-8"))
                if not m.hasMessage(ProWorker_Any):
                    break
    return 0

def main():
    ctx = proContext()
    v = ctx.createView(ProView.Type_Custom, "MP View")

    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    worker_id = m.startWorker()

    code = """
from Pro.MP import *
import time

w = proWorkerObject()
wo = w.waitObject()
msg = ProMPMessage(1)
i = 0
while not wo.wasAborted():
    msg.data = b'remote message ' + str(i).encode('utf-8') + b'\\n'
    w.sendMessage(msg)
    time.sleep(1)
    i += 1
"""
    m.evalPythonCode(worker_id, code)

    v.setup("<ui><hs><text id='1'/></hs></ui>", MPViewCallback, m)
    ctx.addView(v)

main()

For the final code example we not only work with the UI, but also with wait objects.

We launch 10 workers. Each worker has a custom wait object which updates a progress bar in our view. The user can abort each worker by clicking on a ‘Cancel’ button next to the progress bar.

from Pro.Core import NTSimpleWait
from Pro.UI import *
from Pro.MP import *

remote_code = """
def doSomething(wo):
    import time
    i = 1
    while not wo.wasAborted() and i < 101:
        time.sleep(0.05)
        wo.progress(i)
        i += 1

from Pro.MP import proWorkerObject
doSomething(proWorkerObject().waitObject())
"""

class ProgressWait(NTSimpleWait):

    def __init__(self, ctrl):
        super(ProgressWait, self).__init__()
        self.ctrl = ctrl

    def progress(self, i):
        self.ctrl.setValue(i)

class MPView(object):

    def __init__(self):
        pass

    @staticmethod
    def callback(cv, self, code, view, data):
        if code == pvnInit:
            self.worker_ids = []
            # note: we must keep references to all wait objects
            self.ui_wait_objects = []
            self.mp_wait_object = []
            # create workers
            for i in range(self.worker_count):
                worker_id = self.manager.startWorker()
                self.worker_ids.append(worker_id)
                ui_wo = ProgressWait(self.view.getView(i))
                self.ui_wait_objects.append(ui_wo)
                mp_wo = self.manager.createWaitObject(worker_id, ui_wo)
                self.mp_wait_object.append(mp_wo)
                self.manager.evalPythonCode(worker_id, remote_code)
            cv.startIdleNotifications()
            return 1
        elif code == pvnIdle:
            # process messages
            self.manager.processMessages()
        elif code == pvnButtonClicked:
            view.setEnabled(False)
            worker_id = self.worker_ids[view.id() - 1000]
            self.manager.abortOperation(worker_id, 1000)
        return 0

    @staticmethod
    def create():
        ctx = proContext()
        self = MPView()
        self.worker_count = 10

        # create manager
        self.manager = ProManager()
        self.manager.setOptions(ProMPOpt_AtomicOutput)

        # create view
        self.view = ctx.createView(ProView.Type_Custom, "MP View")
        ui = "<ui><gl margin='20' spacing='20' align='top'>"
        for i in range(self.worker_count):
            ui += "<progbar id='%d'/><btn id='%d' text='Stop'/><nl/>" % (i, i + 1000)
        ui += "</gl></ui>"
        self.view.setup(ui, MPView.callback, self)
        ctx.addView(self.view)

MPView.create()

Module API

Pro.MP module API.

Classes:

ProMPMessage(id)

This class represents a multi-processing message.

ProManager()

This class represents a manager.

ProManagerWait()

This class represents a multi-process wait object.

ProWorker()

This class represents a worker.

Attributes:

ProMPMessage_Invalid

Id for an invalid message.

ProMPOpt_AtomicOutput

Improves the redirected output of workers by guaranteeing that each write results in a separate line.

ProMPOpt_RedirectOutput

Redirects the output of the worker to the process of the manager.

ProWorker_All

A special worker id which causes an operation to be performed by all the workers handled by the manager.

ProWorker_Any

A special worker id which causes an operation to be performed by the first available worker.

ProWorker_Invalid

Invalid worker id.

Functions:

proIsManagerProcess()

Checks whether the current is the root manager process.

proIsWorkerProcess()

Checks whether the current is a worker process.

proWorkerObject()

Returns the global worker object if the current is a worker process; otherwise returns None.

proWorkerProcessLevel()

Returns the worker level of the current process.

proZMQContext()

Returns the global ZeroMQ context.

class ProMPMessage(id: Optional[int] = None)

This class represents a multi-processing message.

Parameters

id (Optional[int]) – The id of the message.

See also id.

Methods:

clear()

Clears the fields of the message.

Attributes:

data

The data of the message.

id

The id of the message.

clear()None

Clears the fields of the message.

data

The data of the message.

See also id.

id

The id of the message.

Valid values range between 0 and 0x7FFFFFFF.

See also data.

ProMPMessage_Invalid: Final[int]

Id for an invalid message.

See also ProMPMessage.id.

ProMPOpt_AtomicOutput: Final[int]

Improves the redirected output of workers by guaranteeing that each write results in a separate line.

This option makes the ProMPOpt_RedirectOutput option implicit.

See also ProMPOpt_RedirectOutput and ProManager.setOptions().

ProMPOpt_RedirectOutput: Final[int]

Redirects the output of the worker to the process of the manager.

See also ProMPOpt_AtomicOutput and ProManager.setOptions().

class ProManager

This class represents a manager.

See also ProWorker.

Methods:

abortOperation(worker_id[, timeout])

Aborts an operation given a maximum time-out.

createWaitObject(worker_id, wo)

Wraps a wait object into a wait object which operates on the worker.

evalPythonCode(worker_id, script)

Executes Python code.

evalPythonFunction(worker_id, script, …[, …])

Calls a Python function.

executePythonFunction(worker_id, file_name, …)

Calls a Python function from a file.

executePythonScript(worker_id, file_name)

Executes Python code from a file.

getManagerTimeout()

Returns the manager time-out interval.

getMessage(worker_id)

Retrieves a pending message without blocking.

getOptions()

Returns the options for the workers.

getWorkerFromIncomingMessage(msg)

Retrieves the worker id for an incoming message.

hasMessage(worker_id)

Checks the presence of a message without blocking.

hasResults()

Returns True if results are available; otherwise returns False.

hasSharedContainer(name)

Checks the availability of a shared container.

isBusy()

Returns True if any worker is busy or if there are messages to be processed; otherwise returns False.

isWorkerAlive(worker_id)

Checks whether a worker is alive.

isWorkerBusy(worker_id)

Checks whether a worker is busy.

isWorkerReady(worker_id)

Check whether a worker is ready to process messages.

processMessages()

Processes pending operations which must be processed from within the thread that created the manager.

sendMessage(worker_id, msg)

Sends a message to a worker.

setManagerTimeout(timeout)

Sets the manager time-out interval.

setOptions(options)

Sets the options for the worker.

shareContainer(name, c)

Shares a container with the workers.

shareWritableContainer(name, c[, max_size])

Shares a writable container with the workers.

softAbort(worker_id)

Sets the abort status for the wait object of the specified worker.

startWorker()

Starts a worker.

stopAllWorkers()

Stops all the workers.

stopWorker(worker_id)

Stops a worker.

takeResult(worker_id[, deflt])

Takes a result from the list of available results.

testMessage(worker_id)

Instructs the worker to print a test message.

testMessageBox(worker_id)

Instructs the worker to display a test message box.

unshareContainer(name)

Removes a shared container.

waitForMessage(worker_id, timeout)

Waits for a message.

workersAlive()

Returns the number of workers which are alive.

workersBusy()

Returns the number of workers which are busy.

abortOperation(worker_id: int, timeout: int = 1000)None

Aborts an operation given a maximum time-out.

If the operation couldn’t be aborted within timeout, the worker is terminated.

Parameters
  • worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

  • timeout (int) – The time-out in milliseconds.

See also createWaitObject().

createWaitObject(worker_id: int, wo: Pro.Core.NTIWait)Pro.MP.ProManagerWait

Wraps a wait object into a wait object which operates on the worker.

Important

This method doesn’t increase the reference count of the input wait object. Therefore, a reference must be maintained for both the input wait object and for the returned wait object.

Parameters
  • worker_id (int) – The specific worker to which the input wait object is assigned.

  • wo (NTIWait) – The input wait object.

Returns

Returns the multi-processing wait object.

Return type

ProManagerWait

See also Pro.Core.NTIWait and abortOperation().

evalPythonCode(worker_id: int, script: str)None

Executes Python code.

Parameters

See also executePythonScript(), evalPythonFunction() and executePythonFunction().

evalPythonFunction(worker_id: int, script: str, func_name: str, args: Pro.Core.NTVariantList, enqueue_result: bool = False)None

Calls a Python function.

Parameters
  • worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

  • script (str) – The Python code.

  • func_name (str) – The name of the function to call.

  • args (NTVariantList) – The function argument list.

  • enqueue_result (bool) – If True, transmits the value returned by the function.

See also executePythonFunction(), evalPythonCode() and executePythonScript().

executePythonFunction(worker_id: int, file_name: str, func_name: str, args: Pro.Core.NTVariantList, enqueue_result: bool = False)None

Calls a Python function from a file.

Parameters
  • worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

  • file_name (str) – The path of the Python script.

  • func_name (str) – The name of the function to call.

  • args (NTVariantList) – The function argument list.

  • enqueue_result (bool) – If True, transmits the value returned by the function.

See also evalPythonFunction(), executePythonScript() and evalPythonCode().

executePythonScript(worker_id: int, file_name: str)None

Executes Python code from a file.

Parameters
  • worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

  • file_name (str) – The path of the Python script.

See also evalPythonCode(), executePythonFunction() and evalPythonFunction().

getManagerTimeout()int
Returns

Returns the manager time-out interval.

Return type

int

See also setManagerTimeout() and processMessages().

getMessage(worker_id: int)Pro.MP.ProMPMessage

Retrieves a pending message without blocking.

Parameters

worker_id (int) – The worker for which to retrieve the message. ProWorker_All or ProWorker_Any can be specified as id.

Returns

Returns a message with a valid id if a message is available; otherwise returns a message with an invalid id.

Return type

ProMPMessage

See also ProMPMessage, hasMessage(), waitForMessage() and sendMessage().

getOptions()int
Returns

Returns the options for the workers.

Return type

int

See also setOptions().

getWorkerFromIncomingMessage(msg: Pro.MP.ProMPMessage)int

Retrieves the worker id for an incoming message.

Parameters

msg (ProMPMessage) – The incoming message.

Returns

Returns the worker id if successful; otherwise returns ProWorker_Invalid.

Return type

int

See also getMessage() and ProMPMessage.

hasMessage(worker_id: int)bool

Checks the presence of a message without blocking.

Parameters

worker_id (int) – The worker for which to check the presence of a message. ProWorker_All or ProWorker_Any can be specified as id.

Returns

Returns True if a message is available; otherwise returns False.

Return type

bool

See also getMessage(), waitForMessage() and sendMessage().

hasResults()bool
Returns

Returns True if results are available; otherwise returns False.

Return type

bool

See also takeResult(), evalPythonFunction() and executePythonFunction().

hasSharedContainer(name: str)bool

Checks the availability of a shared container.

Parameters

name (str) – The public name of the container.

Returns

Returns True if the shared container is available; otherwise returns False.

Return type

bool

See also shareContainer() and unshareContainer().

isBusy()bool
Returns

Returns True if any worker is busy or if there are messages to be processed; otherwise returns False.

Return type

bool

See also processMessages() and workersBusy().

isWorkerAlive(worker_id: int)bool

Checks whether a worker is alive.

Parameters

worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

Returns

Returns True if the worker is alive; otherwise returns False.

Return type

bool

See also workersAlive() and isWorkerBusy().

isWorkerBusy(worker_id: int)bool

Checks whether a worker is busy.

Parameters

worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

Returns

Returns True if the worker is busy; otherwise returns False.

Return type

bool

See also workersBusy() and isWorkerAlive().

isWorkerReady(worker_id: int)bool

Check whether a worker is ready to process messages.

Parameters

worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

Returns

Returns True if the worker is ready; otherwise returns False.

Return type

bool

See also isWorkerAlive() and isWorkerBusy().

processMessages()None

Processes pending operations which must be processed from within the thread that created the manager.

See also hasMessage(), waitForMessage(), getMessage() and sendMessage().

sendMessage(worker_id: int, msg: Pro.MP.ProMPMessage)bool

Sends a message to a worker.

Parameters
Returns

Returns True if successful; otherwise returns False.

Return type

bool

See also ProMPMessage, getMessage(), hasMessage() and waitForMessage().

setManagerTimeout(timeout: int)None

Sets the manager time-out interval.

Note

The manager time-out interval must be set before starting workers.

Parameters

timeout (int) – The time-out in milliseconds.

See also getManagerTimeout().

setOptions(options: int)None

Sets the options for the worker.

Note

The options must be set before starting workers.

Parameters

options (int) – The options to set.

See also getOptions(), ProMPOpt_RedirectOutput and ProMPOpt_AtomicOutput.

shareContainer(name: str, c: Pro.Core.NTContainer)None

Shares a container with the workers.

Hint

Sharing containers with workers is a very inexpensive operation in terms of resources. Therefore, sharing many containers is not an issue.

Parameters
  • name (str) – The public name of the container.

  • c (NTContainer) – The container to share.

See also shareWritableContainer(), unshareContainer(), hasSharedContainer(), ProWorker.getSharedContainer() and Pro.Core.NTContainer.

shareWritableContainer(name: str, c: Pro.Core.NTContainer, max_size: Optional[int] = None)None

Shares a writable container with the workers.

Hint

Sharing containers with workers is a very inexpensive operation in terms of resources. Therefore, sharing many containers is not an issue.

Parameters
  • name (str) – The public name of the container.

  • c (NTContainer) – The container to share.

  • max_size (Optional[int]) – The maximum size that the container can reach.

Available since Cerbero Suite 6.0 and Cerbero Engine 3.0.

See also shareContainer(), unshareContainer(), hasSharedContainer(), ProWorker.getSharedContainer() and Pro.Core.NTContainer.

softAbort(worker_id: int)None

Sets the abort status for the wait object of the specified worker.

Available since Cerbero Suite 5.5 and Cerbero Engine 2.5.

Parameters

worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

startWorker()int

Starts a worker.

Returns

Returns the worker id if successful; otherwise returns ProWorker_Invalid.

Return type

int

See also stopWorker() and stopAllWorkers().

stopAllWorkers()None

Stops all the workers.

Note

Internally this method calls stopWorker() with ProWorker_All.

See also stopWorker() and startWorker().

stopWorker(worker_id: int)None

Stops a worker.

Parameters

worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

See also startWorker() and stopAllWorkers().

takeResult(worker_id: int, deflt: Optional[Union[int, float, bool, bytes, str]] = None)Optional[Union[int, float, bool, bytes, str]]

Takes a result from the list of available results.

The returned result is removed from the internal list.

Parameters
  • worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

  • deflt (BasicType) – The default value in case the result is not available.

Returns

Returns the result if available; otherwise returns deflt.

Return type

BasicType

See also hasResults(), evalPythonFunction() and executePythonFunction().

testMessage(worker_id: int)None

Instructs the worker to print a test message.

Parameters

worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

See also testMessageBox().

testMessageBox(worker_id: int)None

Instructs the worker to display a test message box.

Parameters

worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

See also testMessage().

unshareContainer(name: str)None

Removes a shared container.

Parameters

name (str) – The public name of the container.

See also shareContainer(), hasSharedContainer(), ProWorker.getSharedContainer() and Pro.Core.NTContainer.

waitForMessage(worker_id: int, timeout: int)bool

Waits for a message.

Parameters
  • worker_id (int) – The worker id. ProWorker_All or ProWorker_Any can be specified as id.

  • timeout (int) – The time-out in milliseconds for the wait.

Returns

Returns True if a message is available; otherwise returns False.

Return type

bool

See also hasMessage(), getMessage() and sendMessage().

workersAlive()int
Returns

Returns the number of workers which are alive.

Return type

int

See also isWorkerAlive() and workersBusy().

workersBusy()int
Returns

Returns the number of workers which are busy.

Return type

int

See also isWorkerBusy() and workersAlive().

class ProManagerWait

Bases: Pro.Core.NTIWait

This class represents a multi-process wait object.

See also ProManager.createWaitObject().

Methods:

setAbortTimeout(timeout)

Sets the time-out for when the operation is aborted.

setUseSoftAbort([b])

Sets the behavior of Pro.Core.NTIWait.abort().

softAbort()

Sets the abort status for the wait object without calling ProManager.abortOperation().

setAbortTimeout(timeout: int)None

Sets the time-out for when the operation is aborted.

Note

Internally Pro.Core.NTIWait.abort() calls ProManager.abortOperation() with the specified time-out. The default time-out is 1 second. To avoid a hard abort, use setUseSoftAbort() or softAbort(). When a soft abort is performed, the time-out value is ignored.

Parameters

timeout (int) – The time-out in milliseconds.

See also Pro.Core.NTIWait.abort(), ProManager.abortOperation(), setUseSoftAbort() and softAbort().

setUseSoftAbort(b: bool = True)None

Sets the behavior of Pro.Core.NTIWait.abort().

By default calling Pro.Core.NTIWait.abort() results in a hard abort (see ProManager.abortOperation()).

Parameters

b (bool) – If True, softAbort() is called when executing Pro.Core.NTIWait.abort().

Available since Cerbero Suite 5.5 and Cerbero Engine 2.5.

See also Pro.Core.NTIWait.abort(), ProManager.abortOperation(), setAbortTimeout() and softAbort().

softAbort()None

Sets the abort status for the wait object without calling ProManager.abortOperation().

Available since Cerbero Suite 5.5 and Cerbero Engine 2.5.

See also Pro.Core.NTIWait.abort(), ProManager.abortOperation(), setAbortTimeout() and setUseSoftAbort().

class ProWorker

This class represents a worker.

See also ProManager.

Methods:

getConnectionAddress()

Returns the TCP address of the manager.

getConnectionPort()

Returns the TCP port of the manager.

getManagerTimeout()

Returns the manager time-out interval.

getMessage()

Retrieves a pending message without blocking.

getOptions()

Returns the options for the worker.

getSharedContainer(name)

Obtains a shared container from the manager.

getWorkerId()

Returns the worker id.

hasMessage()

Checks the presence of a message without blocking.

isBusy()

Returns True if the worker is busy; otherwise returns False.

remotePrint(s)

Prints a string in the process of the manager.

sendMessage(msg)

Sends a message to the manager.

sendResult(result)

Sends the result of an operation to the manager.

setManagerTimeout(timeout)

Sets the manager time-out interval.

setOptions(options)

Sets the options of the worker.

setupConnection(address, port)

Sets up the connection to the manager.

waitForMessage(timeout)

Waits for a message.

waitObject()

Returns a wait object which can be used by the worker.

wasStopped()

Returns True if the worker was stopped by the manager; otherwise returns False.

getConnectionAddress()str
Returns

Returns the TCP address of the manager.

Return type

str

See also getConnectionPort() and setupConnection().

getConnectionPort()int
Returns

Returns the TCP port of the manager.

Return type

int

See also getConnectionAddress() and setupConnection().

getManagerTimeout()int
Returns

Returns the manager time-out interval.

Return type

int

See also setManagerTimeout().

getMessage()Pro.MP.ProMPMessage

Retrieves a pending message without blocking.

Returns

Returns a message with a valid id if a message is available; otherwise returns a message with an invalid id.

Return type

ProMPMessage

See also ProMPMessage, hasMessage(), waitForMessage() and sendMessage().

getOptions()int
Returns

Returns the options for the worker.

Return type

int

See also setOptions().

getSharedContainer(name: str)Pro.Core.NTContainer

Obtains a shared container from the manager.

Parameters

name (str) – The name of the public container.

Returns

Returns a valid container if successful; otherwise returns an invalid container.

Return type

NTContainer

See also ProManager.shareContainer(), ProManager.shareWritableContainer() and Pro.Core.NTContainer.

getWorkerId()int
Returns

Returns the worker id.

Return type

int

hasMessage()bool

Checks the presence of a message without blocking.

Returns

Returns True if a message is available; otherwise returns False.

Return type

bool

See also getMessage(), waitForMessage() and sendMessage().

isBusy()bool
Returns

Returns True if the worker is busy; otherwise returns False.

Return type

bool

remotePrint(s: str)None

Prints a string in the process of the manager.

Important

The manager must have allowed output redirection for the worker.

Parameters

s (str) – The string to print.

See also ProManager.setOptions(), ProMPOpt_RedirectOutput and ProMPOpt_AtomicOutput .

sendMessage(msg: Pro.MP.ProMPMessage)bool

Sends a message to the manager.

Parameters

msg (ProMPMessage) – The message to send.

Returns

Returns True if successful; otherwise returns False.

Return type

bool

See also ProMPMessage, getMessage(), hasMessage() and waitForMessage().

sendResult(result: Optional[Union[int, float, bool, bytes, str]])bool

Sends the result of an operation to the manager.

Parameters

result (BasicType) – The result to send.

Returns

Returns True if successful; otherwise returns False.

Return type

bool

Available since Cerbero Suite 5.3 and Cerbero Engine 2.3.

See also ProManager.hasResults() and ProManager.takeResult().

setManagerTimeout(timeout: int)None

Sets the manager time-out interval.

Parameters

timeout (int) – The time-out in milliseconds.

See also getManagerTimeout().

setOptions(options: int)None

Sets the options of the worker.

Parameters

options (int) – The options.

See also getOptions().

setupConnection(address: str, port: int)None

Sets up the connection to the manager.

Parameters
  • address (str) – The TCP address of the manager.

  • port (int) – The TCP port of the manager.

See also getConnectionAddress() and getConnectionPort().

waitForMessage(timeout: int)bool

Waits for a message.

Parameters

timeout (int) – The time-out in milliseconds for the wait.

Returns

Returns True if a message is available; otherwise returns False.

Return type

bool

See also hasMessage(), getMessage() and sendMessage().

waitObject()Pro.Core.NTIWait

Returns a wait object which can be used by the worker.

Hint

A wait object is returned even if the manager didn’t set up a wait object for the worker. Calling the methods of a wait object which is not mirrored on the manager side has no effect.

Returns

Returns the wait object.

Return type

NTIWait

See also Pro.Core.NTIWait and ProManager.createWaitObject().

wasStopped()bool
Returns

Returns True if the worker was stopped by the manager; otherwise returns False.

Return type

bool

ProWorker_All: Final[int]

A special worker id which causes an operation to be performed by all the workers handled by the manager.

See also ProWorker_Any.

ProWorker_Any: Final[int]

A special worker id which causes an operation to be performed by the first available worker.

See also ProWorker_All.

ProWorker_Invalid: Final[int]

Invalid worker id.

See also ProManager.startWorker().

proIsManagerProcess()bool

Checks whether the current is the root manager process.

Returns

Returns True if proWorkerProcessLevel() returns 0; otherwise returns False.

Return type

bool

See also proIsWorkerProcess() and proWorkerProcessLevel().

proIsWorkerProcess()bool

Checks whether the current is a worker process.

Returns

Returns True if proWorkerProcessLevel() returns 1 or greater; otherwise returns False.

Return type

bool

See also proIsManagerProcess(), proWorkerProcessLevel() and proWorkerObject().

proWorkerObject()Pro.MP.ProWorker
Returns

Returns the global worker object if the current is a worker process; otherwise returns None.

Return type

ProWorker

See also proIsWorkerProcess() and proWorkerProcessLevel().

proWorkerProcessLevel()int
Returns

Returns the worker level of the current process.

Return type

int

See also proIsManagerProcess(), proIsWorkerProcess() and proWorkerObject(), .

proZMQContext()native_pointer
Returns

Returns the global ZeroMQ context.

Return type

native_pointer