Sandbox

class broker.server.MQTTServer(authentication=None, persistence=None, clients=None, ssl_options=None)[source]

This is the highest abstraction of the package and represents the whole MQTT Broker. It’s main roles are handling incoming connections, keeping tabs for the known client sessions and dispatching messages based on subscription matching.

add_client(client)[source]

Register a client to the Broker.

Parameters:client (MQTTClient) – A broker.client.MQTTClient instance.
broadcast_message(msg)[source]

Broadcasts a message to all clients with matching subscriptions, respecting the subscription QoS.

Parameters:msg (Publish) – A broker.messages.Publish instance.
configure_last_will(client, connect_msg)[source]

Configures the last will message options for a given client on its connect message. Both the client and the connect message must point to the same client uid.

Parameters:
  • MQTTClient (client) – A client instance;
  • Connect (connect_msg) – A Connect message that specifies the client.
disconnect_all_clients()[source]

Disconnect all known clients.

disconnect_client(client)[source]

Disconnects a MQTT client. Can be safely called without checking if the client is connected.

Parameters:client (MQTTClient) – The MQTTClient to be disconnect
dispatch_message(client, msg, cache=None)[source]

Dispatches a message to a client based on its subscriptions. It is safe to call this method without checking if the client has matching subscriptions.

Parameters:
  • client (MQTTClient) – The client which will possibly receive the message;
  • msg (Publish) – The message to be delivered.
  • cache (dict) – A dict that will be used for raw data caching. Defaults to a empty dictionary if None.
enqueue_retained_message(client, subscription_mask)[source]

Enqueues all retained messages matching the subscription_mask to be sent to the client.

Parameters:
  • client (MQTTClient) – A known MQTTClient.
  • subscription_mask (str) – The subscription mask to match the messages against.
get_known_client(connect_msg)[source]

Returns a known MQTTClient instance that has the same uid defined on the Connect message.

Caution

If the connect message defines the usage of a clean session, this method will clear any previous session matching this client ID and automatically return None

Parameters:Connect (connect_msg) – A connect message that specifies the client.
handle_incoming_publish(msg)[source]

Handles an incoming publish. This method is normally called by the clients a mechanism of notifying the server that there is a new message to be processed. The processing itself consists of retaining the message according with the msg.retain flag and broadcasting it to the subscribers.

Parameters:msg (Publish) – The Publish message to be processed.
handle_stream(stream, address)[source]

This coroutine is called by the Tornado loop whenever it receives a incoming connection. The server resolves the first message sent, checks if it’s a CONNECT frame and configures the client accordingly.

Parameters:
  • stream (IOStream) – A tornado.iostream.IOStream instance;
  • address (tuple) – A tuple containing the ip and port of the connected client, ie (‘127.0.0.1’, 12345).
remove_client(client)[source]

Removes a client from the know clients list. It’s safe to call this method without checking if the client is already known.

Parameters:client (MQTTClient) – A broker.client.MQTTClient instance;

Caution

It won’t force client disconnection during the process, which can result in a lingering client in the Tornado loop.

class broker.client.MQTTClient(server, connection, authorization=None, uid=None, clean_session=False, keep_alive=60, persistence=None)[source]

Objects of this class encapsulate and abstract all aspects of a given client. A MQTTClient object may refer to a live, connected, client or a known client which albeit disconnected had the clean_sessions flag set to false and, thus, is kept by the server as an end point for routed messages.

One may call self.is_connected() to check whether is there a connected client or not.

Parameters:
  • server (MQTTServer) – The server which the client is bound to;
  • connection (MQTTConnection) – The connection to be used;
  • uid (str) – A string used as the client’s id;
  • clean_session (bool) – The clean session flag, as per MQTT Protocol;
  • keep_alive (int) – The keep alive interval, in seconds.
  • persistence (ClientPersistenceBase) – An object that provides persistence
_on_connection_timeout(connection)[source]

Callback called when the connection times out. Ensures clearing the self.connected event and processing the self.disconnect() method.

_on_stream_close(connection)[source]

Callback called when the stream closes. Ensures clearing the self.connected event and processing the self.disconnect() method.

_process_incoming_packets(connection)[source]

This coroutinte fetches the message raw data from self.incoming_queue, parses it into the corresponding message object (an instance of one of the broker.messages.BaseMQTTMessage subclasses) and passes it to the self.incoming_transaction_manager to be processed.

It is started by calling self.start() and stops upon client disconnection.

_process_outgoing_packets(connection)[source]

This coroutinte fetches the message raw data from self.outgoing_queue, parses it into the corresponding message object (an instance of one of the broker.messages.BaseMQTTMessage subclasses) and passes it to the self.outgoing_transaction_manager to be processed.

It is started by calling self.start() and stops upon client disconnection.

configure_last_will(topic, payload, qos, retain=False)[source]

Configures a message to be send as the client’s last will. This message will be send when the connected is disconnected by a connection timeout, protocol error or an unexpected disconnection.

Parameters:
  • topic (str) – The topic which the message will be delivered;
  • payload (bytes) – Message payload;
  • retain (bool) – Message’s retain flag.
connected[source]

An toro.Event instance that is set whenever the client is connected and clear on disconnection. It’s safe to wait on this property before stream related operations.

disconnect()[source]

Closes the socket and disconnects the client. If self.clean_session is set, ensures that the incoming and outgoing queues are cleared and calls the server client removing routine.

Hint

It’s safe to call this function without checking whether the connection is open or not.

dispatch_to_server(pub_msg)[source]

Dispatches a Publish message to the server for further processing, ie. delivering it to the appropriate subscribers.

Parameters:pub_msg (Publish) – A broker.messages.Publish instance.
get_list_of_delivery_qos(msg)[source]

Matches the msg.topic against all the current subscriptions and returns a list containing the QoS level for each matched subscription.

Parameters:msg (Publish) – A MQTT valid message.
Return type:tuple
Returns:A list of QoS levels, ie [0, 0, 1, 2, 0, 2]
get_matching_qos(msg, subscriptions_mask)[source]

Matches the msg.topic against a single subscription defined by the subscription mask and returns the QoS level on which the message should be delivered.

Parameters:
  • msg (Publish) – Message to be analysed;
  • subscriptions_mask – A subscription mask that identifies one of the client’s subscriptions.
Returns:

QoS Level or None, in case it doesn’t match.

handle_last_will()[source]

Checks if a client has a pending last will message and dispatches it for server processing.

is_connected()[source]

A shorthand for :meth:connected.is_set().

publish(msg)[source]

Puts a publish packet on the self.outgoing_queue to be sent to the client.

Parameters:msg (Publish) – The message to be set or a iterable of its bytes.
send_packet(packet)[source]

Puts a packet on the self.outgoing_queue to be sent to the client.

start()[source]

Starts the client fetching, processing and dispatching routines. Should be called after object instantiation or a self.update_connection() call.

The following coroutines are started:

  • self._process_incoming_messages()
  • self._process_outgoing_messages()
subscribe(subscription_mask, qos)[source]

Subscribes the client to a topic or wildcarded mask at the informed QoS level. Calling this method also signalizes the server to enqueue the matching retained messages.

When called for a (subscripition_mask, qos) pair for which the client has already a subscription it will silently ignore the command and return a suback.

Parameters:
  • subscription_mask (string) – A MQTT valid topic or wildcarded mask;
  • qos (int) – A valid QoS level (0, 1 or 2).
Return type:

int

Returns:

The granted QoS level (0, 1 or 2) or 0x80 for failed subscriptions.

unsubscribe(topics)[source]

Unsubscribes the client from each topic in topics. Safely ignores topics which the client is not subscribed to.

Parameters:topics (iterable) – An iterable of MQTT valid topic strings.
update_configuration(clean_session=False, keep_alive=60)[source]

Updates the internal attributes.

Parameters:
  • clean_session (bool) – A flag indicating whether this session should be brand new or attempt to reuse the last known session for a client with the same self.uid as this.
  • keep_alive (int) – Connection’s keep alive setting, in seconds.
update_connection(connection)[source]

Updates the client’s connection by disconnecting the previous one and configuring the new one’s keep alive time according to keep_alive.

Parameters:connection (MQTTConnection) – The new connection to be used;
write(msg)[source]

Writes a MQTT Message to the client. If the client isn’t connected, waits for the self.connected event to be set.

Parameters:Message msg (MQTT) – The message to be send. It must be a instance of broker.messages.BaseMQTTMessage or it’s subclasses.
Read the Docs v: latest
Versions
latest
Downloads
pdf
htmlzip
epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.