qmi.core.pubsub
Publish/subscribe messaging mechanism for QMI.
The publish/subscribe mechanism is suitable for situations where data is spontaneously produced (i.e. not in response to a request). Such data may be sent to other QMI objects by publishing a signal.
Tasks which want to act on signals may create a signal receiver and subscribe it to the corresponding publisher.
Declaring signals
Any subclass of QMI_RpcObject (such as instruments and tasks) can publish signals.
In order to publish a signal, the signal must be declared in the RPC object class definition.
This is done by creating a class attribute and initializing it with an instance of QMI_Signal.
For example:
class MyTask(QMI_Task):
# Declare a signal with name "sig_alice" which takes two parameters, both integers.
sig_alice = QMI_Signal([int, int])
# Declare a signal with name "sig_bob" which takes one string parameter.
sig_bob = QMI_Signal([str])
# ... rest of class definition
Publishing a signal
An RPC object can publish any of the signals it has declared. Publishing a signal means broadcasting a message through the QMI network. This message is automatically received by all signal receivers that are subscribed to the signal.
For example, the task MyTask could publish its signals as follows:
class MyTask(QMI_Task):
sig_alice = QMI_Signal([int, int])
sig_bob = QMI_Signal([str])
def run(self):
# Publish "sig_alice"
self.sig_alice.publish(11, 105)
# Publish "sig_bob"
self.sig_bob.publish("hello receiver")
# Publish "sig_alice" again with different parameters.
self.sig_alice.publish(5, 6)
Subscribing to signals
Any Python script, task or function can subscribe to signals. Subscribing to a signal requires a proxy for the object that publishes the signal, and a signal receiver.
For example, the following code creates an instance of MyTask. It then uses the proxy to subscribe a receiver to the signal sig_alice:
# Create task (and get a proxy to the new task).
my_task_proxy = qmi.make_task("my_task", MyTask)
# Create signal receiver.
receiver = QMI_SignalReceiver()
# Subscribe receiver to the signal "sig_alice".
my_task_proxy.sig_alice.subscribe(receiver)
You can always subscribe to signals that are published in the same QMI context (i.e. the same Python program). It is also possible to subscribe to signals that are published by another Python program, provided that this program is a peer context in the QMI network, and a connection to this peer context has been established.
A receiver can be subscribed to multiple signals, and multiple receivers can be subscribed to the same signal.
Receiving signals
A signal receiver keeps received signals in an internal queue. Each time a signal gets published to which the receiver is currently subscribed, it adds a ReceivedSignal record to its queue.
In order to process a received signal, you can call get_next_signal() to get a ReceivedSignal record from the receive queue. If there are received signals in the queue, this function returns the oldest received signal. Otherwise, if the queue is empty, the function will either wait until the next signal is received, or raise a QMI_TimeoutException.
For example:
# Let's assume that "receiver" is already subscribed to "sig_alice"
# and the signal has been published via "sig_alice.publish(11, 105)".
try:
sig = receiver.get_next_signal(timeout=1.0)
print("Received signal", sig.signal_name, "with arguments", sig.args[0], sig.args[1])
# will print something like "Received signal sig_alice with arguments 11 105"
except QMI_TimeoutException:
print("No signal was received within 1 second")
Reference
Classes
|
Represents a named signal, bound to a specific QMI_RpcObject instance. |
|
Marker for signal declarations in an RpcObject or Task. |
|
Message sent to broadcast a signal between contexts. |
|
An instance of QMI_SignalReceiver contains a queue of received signals. |
|
Message sent to remote subscribers when an object stops publishing a signal. |
|
Represents a named signal, bound to a specific RPC proxy instance. |
|
Message sent in response to a subscribe/unsubscribe request. |
Message sent to subscribe or unsubscribe to a remote publisher. |
|
|
Signal instance received by a QMI_SignalReceiver. |
|
Description of signal type, specifying its name and arguments. |
|
Keeps track of signal subscriptions and handles signal publishing. |
- class qmi.core.pubsub.SignalDescription(name: str, arg_types: tuple[Type, ...])
Description of signal type, specifying its name and arguments.
- name
Signal name.
- Type:
str
- arg_types
Tuple of Python types (e.g. int or str), describing the parameters that will be passed in each published instance of this signal type.
- Type:
tuple[Type, …]
- name: str
Alias for field number 0
- arg_types: tuple[Type, ...]
Alias for field number 1
- count(value, /)
Return number of occurrences of value.
- index(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
- class qmi.core.pubsub.ReceivedSignal(publisher_context: str, publisher_name: str, signal_name: str, args: tuple, receiver_seqnr: int)
Signal instance received by a QMI_SignalReceiver.
When a QMI_SignalReceiver receives a signal, it stores an instance of ReceivedSignal in its receive queue. The application can retrieve this instance and handle it.
- publisher_context
Name of the context that published the signal.
- Type:
str
- publisher_name
Name of the object that published the signal.
- Type:
str
- signal_name
Name of the signal.
- Type:
str
- args
List of signal arguments.
- Type:
tuple
- receiver_seqnr
Sequence number assigned by the QMI_SignalReceiver. This sequence number can be used to detect dropped signals in case of overflow of the receive queue.
- Type:
int
- publisher_context: str
Alias for field number 0
- publisher_name: str
Alias for field number 1
- signal_name: str
Alias for field number 2
- args: tuple
Alias for field number 3
- receiver_seqnr: int
Alias for field number 4
- count(value, /)
Return number of occurrences of value.
- index(value, start=0, stop=9223372036854775807, /)
Return first index of value.
Raises ValueError if the value is not present.
- class qmi.core.pubsub.QMI_Signal(arg_types: list[Type])
Marker for signal declarations in an RpcObject or Task.
An RpcObject or Task that wants to publish a signal, creates a class attribute with the name of the signal and initializes it with an instance of QMI_Signal.
For example:
class MyObject(QMI_RpcObject): my_signal = QMI_Signal([int, str])
- publish(*args: Any) None
Publish this signal.
This method will be implemented by QMI_RegisteredSignal. It is declared here for static type checking.
- class qmi.core.pubsub.QMI_RegisteredSignal(context: qmi.core.context.QMI_Context, publisher_name: str, signal_name: str, arg_types: tuple[Type, ...])
Represents a named signal, bound to a specific QMI_RpcObject instance.
When an instance of an RPC class (task or instrument) is created, QMI automatically detects any QMI_Signal declarations in the RPC class and converts them to QMI_RegisteredSignal instances in the RPC object instance.
An instance of QMI_RegisteredSignal may be used to publish signals.
- publish(*args: Any) None
Publish this signal.
Arguments may be passed when publishing the signal. These must correspond to the argument types registered for this signal.
The published signal will be received by all receivers currently subscribed to this signal type.
This method is thread-safe: it may be called from any thread.
- class qmi.core.pubsub.QMI_SignalSubscriber(context: qmi.core.context.QMI_Context, publisher_context: str, publisher_name: str, signal_name: str, signal_arg_types: str)
Represents a named signal, bound to a specific RPC proxy instance.
When a proxy for an RPC class (task or instrument) is created, QMI automatically detects any QMI_Signal declarations in the RPC class and converts them to QMI_SignalSubscriber instances in the RPC proxy.
An instance of QMI_SignalSubscriber may be used to subscribe to signals or unsubscribe from signals.
- subscribe(receiver: QMI_SignalReceiver) None
Subscribe the specified receiver to this signal type.
While subscribed, the QMI_SignalReceiver instance will receive and queue all published signals that match this subscription.
A QMI_SignalReceiver instance can be simultaneously subscribed to multiple signals (either from the same publisher or from different publishers). Similarly, multiple receivers can be simultaneously subscribed to the same signal. However it is an error to try to subscribe a receiver to a signal to which it is already subscribed.
- Parameters:
receiver – A QMI_SignalReceiver instance which will receive the published signals.
- unsubscribe(receiver: QMI_SignalReceiver) None
Unsubscribe the specified receiver from this signal type.
- Parameters:
receiver – The QMI_SignalReceiver instance to unsubscribe.
- class qmi.core.pubsub.QMI_SignalReceiver(max_queue_length: int = 10000, discard_policy: int = 1)
An instance of QMI_SignalReceiver contains a queue of received signals.
A QMI_SignalReceiver instance can be subscribed to a specific set of signals. When any such signal gets published, the published signal is automatically added to the receive queue of the QMI_SignalReceiver.
A task or script which creates a QMI_SignalReceiver, should periodically check to see if new signals have arrived and handle them.
When an instance of QMI_SignalReceiver is no longer needed, it must be explicitly unsubscribed from all signals.
- discard_all() None
Discard all pending signals currently waiting in the receive queue.
This method is thread-safe.
- has_signal_ready() bool
Return True if at least one received signal is waiting in the receive queue.
This method is thread-safe.
- get_queue_length() int
Return the number of signals currently in the receive queue.
This method is thread-safe. Note however that the queue length can change at any time due to actions of other threads.
- get_next_signal(timeout: float | None = 0) ReceivedSignal
Return the oldest published signal waiting in the receive queue.
If there is no signal waiting in the queue, optionally wait until a new signal is received, subject to the specified timeout.
This method is thread-safe.
- Parameters:
timeout – Maximum time (in seconds) to wait for a new signal if the queue is empty, or None to wait indefinitely.
- Returns:
ReceivedSignal tuple describing the received signal.
- Raises:
QMI_TimeoutException – If the timeout expires before a signal is received.
QMI_TaskStopException – If the calling task receives a stop request before a signal is received.
- class qmi.core.pubsub.QMI_SignalMessage(source_address: QMI_MessageHandlerAddress, destination_address: QMI_MessageHandlerAddress, signal_name: str, args: tuple)
Message sent to broadcast a signal between contexts.
This class is intended for internal use within QMI. Application programs should not interact with this class directly.
- signal_name
Name of the published signal. (The identity of the publisher follows from the source_address attribute.)
- args
Tuple of parameter values passed when publishing this signal.
- class qmi.core.pubsub.QMI_SignalSubscriptionRequest(source_address: QMI_MessageHandlerAddress, destination_address: QMI_MessageHandlerAddress, publisher_name: str, signal_name: str, subscribe: bool)
Message sent to subscribe or unsubscribe to a remote publisher.
This message is sent to a remote SignalManager to subscribe or unsubscribe to the specified type of signal. The remote SignalManager will typically answer by sending a QMI_SignalSubscriptionReply.
This class is intended for internal use within QMI. Application programs should not interact with this class directly.
- publisher_name
Name of the RPC object that publishes the signal.
- signal_name
Signal name.
- subscribe
True to subscribe, False to unsubscribe.
- class qmi.core.pubsub.QMI_SignalSubscriptionReply(source_address: QMI_MessageHandlerAddress, destination_address: QMI_MessageHandlerAddress, request_id: str, success: bool, error_msg: str)
Message sent in response to a subscribe/unsubscribe request.
This class is intended for internal use within QMI. Application programs should not interact with this class directly.
- success
True if the request was processed successfully, False to indicate failure.
- error_msg
Short message describing the error, if success == False.
- class qmi.core.pubsub.QMI_SignalRemovedMessage(source_address: QMI_MessageHandlerAddress, destination_address: QMI_MessageHandlerAddress, publisher_name: str, signal_name: str)
Message sent to remote subscribers when an object stops publishing a signal.
This message is currently sent only when the RPC object that was publishing the signal is removed from QMI. In this case, multiple instances of QMI_SignalRemovedMessage may be sent, one for each signal type that was published by the removed object.
This class is intended for internal use within QMI. Application programs should not interact with this class directly.
- publisher_name
Name of the RPC object name that was publishing the signal.
- signal_name
Name of the signal that is no longer published.
- class qmi.core.pubsub.SignalManager(context: qmi.core.context.QMI_Context)
Keeps track of signal subscriptions and handles signal publishing.
Each context owns exactly one SignalManager.
The SignalManager registers itself as a MessageHandler to receive messages sent to the
"$pubsub"object of the local context.This class is intended for internal use within QMI. Application programs should not interact with this class directly.
- subscribe_signal(publisher_context: str, publisher_name: str, signal_name: str, receiver: QMI_SignalReceiver) None
Subscribe a SignalReceiver to a specified signal.
This method blocks until the subscription is established. This method is thread-safe and may safely be called from any thread.
- Raises:
QMI_UnknownNameException – When the specified publisher does not exist.
QMI_MessageDeliveryException – When the subscription request can not be routed to a remote context.
- unsubscribe_signal(publisher_context: str, publisher_name: str, signal_name: str, receiver: QMI_SignalReceiver) None
Unsubscribe a SignalReceiver from a specified signal.
If the receiver is not currently subscribed to the specified signal, this function will do nothing.
This method is thread-safe and may safely be called from any thread.
- publish_signal(publisher_name: str, signal_name: str, args: tuple) None
Publish the specified signal to the QMI network.
- handle_message(message: QMI_Message) None
Handle messages sent to the signal manager object.
- handle_object_removed(rpc_object_name: str) None
Called when a local RPC object is removed.
This function drops any local subscriptions on signals published by the removed object. This function also drops any remote subscribers on signals published by the removed object, and notifies the remote subscribers that the signal has been removed.
- handle_peer_context_removed(context_name: str) None
Update remote signal administration after disconnecting a peer context.
This method is called by the socket manager when a peer connection (outgoing or incoming) is closed. This method drops any remote subscribers in the removed context. This method also drops any local subscriptions on signals published by the removed context.
- shutdown() None
Called by the QMI message router during unregistering of the handler.
After this call, no more messages will be received.
Subclasses can implement this method to free resources they are using. The default implementation does nothing.
Do not call this method explicitly. It will be called automatically during unregistering of the message handler.