TP
6. PARTIE 5
1. Architecture en Blocs Choregraphe
Puisque Choregraphe ne gère pas nativement le protocole MQTT, la meilleure approche est d'utiliser une boîte Python personnalisée qui servira de "pont" entre le réseau et les mouvements du robot.
La boîte "MQTT Supervisor"
Créez une nouvelle boîte de type Script (Python).
Ajoutez-lui deux entrées :
onStart(pour lancer la surveillance) etonStop.Ajoutez-lui plusieurs sorties :
onStatusUpdate(pour les messages normaux),onError(pour les alertes) etonFinished.
Le flux visuel (Flow Diagram)
Entrée du programme ➔ Bloc [Animated Say] ("Je vais surveiller le lave-vitre").
Animated Say ➔ Bloc [MQTT Supervisor].
Sortie
onStatusUpdate➔ Bloc [Set LEDs] (Vert) + Bloc [Say] (Le message d'état).Sortie
onError➔ Bloc [Set LEDs] (Rouge) + Bloc [Animated Say] (L'alerte avec geste).
2. Le script Python optimisé pour le bloc Choregraphe
Voici comment adapter votre code pour qu'il s'insère parfaitement dans une boîte Choregraphe. Ce code utilise la bibliothèque paho-mqtt que vous devrez peut-être installer sur votre environnement.
class MyClass(GeneratedClass):
def __init__(self):
GeneratedClass.__init__(self)
self.mqtt_client = None
def onLoad(self):
import paho.mqtt.client as mqtt
self.mqtt_client = mqtt.Client("NAO_Supervisor")
self.mqtt_client.on_message = self.on_message_lavevitre
self.mqtt_client.connect("192.168.1.100", 1883, 60)
def onUnload(self):
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
def onInput_onStart(self):
self.mqtt_client.subscribe("maison/exterieur/lavevitre/#")
self.mqtt_client.loop_start()
# Envoyer la commande de départ
self.mqtt_client.publish("maison/exterieur/lavevitre/commande/start", "1")
self.onStart() # Sortie standard de la boîte
def on_message_lavevitre(self, client, userdata, message):
payload = message.payload.decode()
if "etat" in message.topic:
# On envoie le texte vers la sortie de la boîte pour que NAO le dise
self.onStatusUpdate(payload)
elif "erreur" in message.topic and payload != "aucune":
self.onError(payload)
def onInput_onStop(self):
self.onUnload()
self.onStopped()