In MQTT how you will you know the device/user received the message when it is published to the subscribed topic? also provide me the entire entire flow of architecture diagram how it will work?
#mqtt#messaging#qos#real-time#architecture
Answer
Overview
MQTT (Message Queuing Telemetry Transport) uses Quality of Service (QoS) levels to ensure message delivery and acknowledgment.
MQTT QoS Levels
| QoS Level | Name | Delivery Guarantee | Acknowledgment |
|---|---|---|---|
| QoS 0 | At most once | Fire and forget | ❌ No ACK |
| QoS 1 | At least once | Guaranteed delivery | ✅ PUBACK |
| QoS 2 | Exactly once | No duplicates | ✅ PUBREC, PUBREL, PUBCOMP |
QoS 0: Fire and Forget
textPublisher MQTT Broker Subscriber │ │ │ │─────PUBLISH (QoS 0)─────────►│ │ │ │─────PUBLISH (QoS 0)───────►│ │ │ │ │ ❌ No acknowledgment │ ❌ No acknowledgment │ │ ❌ No retry │ ❌ No retry │
Use Case: Sensor data where occasional loss is acceptable
QoS 1: At Least Once (With Acknowledgment)
textPublisher MQTT Broker Subscriber │ │ │ │─────PUBLISH (QoS 1)─────────►│ │ │ │─────PUBLISH (QoS 1)───────►│ │ │ │ │◄────PUBACK──────────────────│◄────PUBACK───────────────│ │ ✅ Confirmed! │ ✅ Confirmed! │ │ │ │ │ If no PUBACK received: │ If no PUBACK received: │ │ 🔄 Retry with DUP flag │ 🔄 Retry delivery │
Use Case: Important messages that must be delivered (chat messages, alerts)
QoS 2: Exactly Once (No Duplicates)
textPublisher MQTT Broker Subscriber │ │ │ │─────PUBLISH (QoS 2)─────────►│ │ │ │ │ │◄────PUBREC──────────────────│ │ │ (Message received) │ │ │ │ │ │─────PUBREL──────────────────►│ │ │ (Release message) │─────PUBLISH (QoS 2)──────►│ │ │ │ │ │◄────PUBREC───────────────│ │ │ │ │ │─────PUBREL───────────────►│ │ │ │ │◄────PUBCOMP─────────────────│◄────PUBCOMP──────────────│ │ ✅ Exactly once delivered! │ ✅ No duplicates! │
Use Case: Financial transactions, critical commands
Complete MQTT Architecture Flow
text┌────────────────────────────────────────────────────────────┐ │ MQTT Architecture │ └────────────────────────────────────────────────────────────┘ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Publisher │ │ MQTT Broker │ │ Subscriber │ │ (Device A) │ │ (Server) │ │ (Device B) │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ 1. CONNECT │ │ │────────────────────────►│ │ │ ◄─── CONNACK │ │ │ (Connected) │ │ │ │ 2. CONNECT │ │ │◄────────────────────────│ │ │ CONNACK ────► │ │ │ (Connected) │ │ │ │ │ │ 3. SUBSCRIBE │ │ │ (topic: "chat/room1")│ │ │◄────────────────────────│ │ │ SUBACK ────► │ │ │ │ │ 4. PUBLISH │ │ │ (topic: "chat/room1")│ │ │ (QoS: 1) │ │ │────────────────────────►│ │ │ │ 5. Store message │ │ │ (if subscriber │ │ │ offline) │ │ │ │ │ 6. PUBACK │ │ │◄────────────────────────│ │ │ ✅ Broker received! │ │ │ │ │ │ │ 7. PUBLISH │ │ │ (to subscriber) │ │ │────────────────────────►│ │ │ │ │ │ 8. PUBACK │ │ │◄────────────────────────│ │ │ ✅ Subscriber received!│
Flutter MQTT Implementation
Setup MQTT Client
dartimport 'package:mqtt_client/mqtt_client.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; class MQTTService { late MqttServerClient client; Future<void> connect() async { client = MqttServerClient('broker.hivemq.com', '1883'); client.clientIdentifier = 'flutter_client_${DateTime.now().millisecondsSinceEpoch}'; client.keepAlivePeriod = 60; client.autoReconnect = true; final connMessage = MqttConnectMessage() .withClientIdentifier(client.clientIdentifier) .startClean() .withWillQos(MqttQos.atLeastOnce); client.connectionMessage = connMessage; try { await client.connect(); print('✅ Connected to MQTT broker'); } catch (e) { print('❌ Connection failed: $e'); client.disconnect(); } } }
Publishing with QoS 1 (Acknowledgment)
dartvoid publishMessage(String topic, String message) { final builder = MqttClientPayloadBuilder(); builder.addString(message); // Publish with QoS 1 (at least once delivery) client.publishMessage( topic, MqttQos.atLeastOnce, // ← Ensures acknowledgment builder.payload!, ); print('📤 Published to $topic: $message'); // Listen for published messages (confirmation) client.published?.listen((MqttPublishMessage message) { print('✅ Message acknowledged by broker'); print(' Message ID: ${message.variableHeader?.messageIdentifier}'); }); }
Subscribing with QoS 1
dartvoid subscribe(String topic) { // Subscribe with QoS 1 to ensure delivery client.subscribe(topic, MqttQos.atLeastOnce); print('🔔 Subscribed to: $topic'); // Listen for subscription acknowledgment client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> messages) { final message = messages[0].payload as MqttPublishMessage; final payload = MqttPublishPayload.bytesToStringAsString( message.payload.message ); print('📥 Received: $payload'); print(' QoS: ${message.header?.qos}'); print(' Topic: ${messages[0].topic}'); // Send PUBACK automatically by client library // (happens internally when QoS 1 or 2) }); }
Tracking Message Delivery
Method 1: Using Message ID (QoS 1/2)
dartclass MQTTMessageTracker { final Map<int, MessageStatus> _pendingMessages = {}; void publishWithTracking(String topic, String message) { final builder = MqttClientPayloadBuilder(); builder.addString(message); // Publish and get message ID final messageId = client.publishMessage( topic, MqttQos.atLeastOnce, builder.payload!, ); // Track message _pendingMessages[messageId] = MessageStatus( id: messageId, topic: topic, message: message, timestamp: DateTime.now(), status: 'pending', ); print('📤 Sent message ID: $messageId'); } void setupAcknowledgmentListener() { client.published?.listen((MqttPublishMessage message) { final messageId = message.variableHeader?.messageIdentifier; if (messageId != null && _pendingMessages.containsKey(messageId)) { _pendingMessages[messageId]!.status = 'delivered'; _pendingMessages[messageId]!.acknowledgedAt = DateTime.now(); print('✅ Message $messageId delivered!'); // Notify UI or callback onMessageDelivered(messageId); // Remove from pending _pendingMessages.remove(messageId); } }); } void onMessageDelivered(int messageId) { // Update UI, show checkmark, etc. } } class MessageStatus { final int id; final String topic; final String message; final DateTime timestamp; String status; // 'pending', 'delivered', 'failed' DateTime? acknowledgedAt; MessageStatus({ required this.id, required this.topic, required this.message, required this.timestamp, required this.status, this.acknowledgedAt, }); }
Method 2: Application-Level ACK (Custom)
dart// Publisher side void publishWithCustomAck(String topic, String message) { final messageId = DateTime.now().millisecondsSinceEpoch.toString(); final payload = json.encode({ 'messageId': messageId, 'content': message, 'timestamp': DateTime.now().toIso8601String(), }); final builder = MqttClientPayloadBuilder(); builder.addString(payload); client.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!); // Subscribe to ACK topic client.subscribe('$topic/ack', MqttQos.atLeastOnce); print('📤 Sent message: $messageId'); } // Subscriber side void receiveAndSendAck(String topic) { client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> messages) { final message = messages[0].payload as MqttPublishMessage; final payload = MqttPublishPayload.bytesToStringAsString( message.payload.message ); final data = json.decode(payload); final messageId = data['messageId']; print('📥 Received message: $messageId'); // Send custom ACK final ackBuilder = MqttClientPayloadBuilder(); ackBuilder.addString(json.encode({ 'messageId': messageId, 'status': 'received', 'timestamp': DateTime.now().toIso8601String(), })); client.publishMessage( '${messages[0].topic}/ack', MqttQos.atLeastOnce, ackBuilder.payload!, ); print('✅ Sent ACK for message: $messageId'); }); } // Publisher listening for ACK void listenForAck() { client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> messages) { if (messages[0].topic.endsWith('/ack')) { final message = messages[0].payload as MqttPublishMessage; final payload = MqttPublishPayload.bytesToStringAsString( message.payload.message ); final ackData = json.decode(payload); print('✅ ACK received for: ${ackData['messageId']}'); // Update UI - show double checkmark updateMessageStatus(ackData['messageId'], 'read'); } }); }
Handling Offline Subscribers
Persistent Session (Clean Session = false)
dartFuture<void> connectWithPersistentSession() async { client = MqttServerClient('broker.hivemq.com', '1883'); client.clientIdentifier = 'flutter_user_123'; // Same ID always final connMessage = MqttConnectMessage() .withClientIdentifier(client.clientIdentifier) .keepAliveFor(60) .startClean(false) // ← Persistent session .withWillQos(MqttQos.atLeastOnce); client.connectionMessage = connMessage; await client.connect(); // Subscribe (broker remembers even when offline) client.subscribe('chat/room1', MqttQos.atLeastOnce); }
Flow when subscriber is offline:
text1. Subscriber connects with cleanSession=false 2. Subscribes to topic 3. Subscriber goes offline 4. Publisher sends message (QoS 1) 5. Broker stores message for offline subscriber 6. Subscriber comes back online 7. Broker delivers all queued messages 8. Subscriber sends PUBACK for each message
Complete Chat Example
dartclass MQTTChatService { late MqttServerClient client; final String userId; final Function(ChatMessage) onMessageReceived; final Function(String messageId) onMessageDelivered; MQTTChatService({ required this.userId, required this.onMessageReceived, required this.onMessageDelivered, }); Future<void> connect() async { client = MqttServerClient('broker.hivemq.com', '1883'); client.clientIdentifier = 'user_$userId'; final connMessage = MqttConnectMessage() .withClientIdentifier(client.clientIdentifier) .startClean(false) // Persistent session .keepAliveFor(60) .withWillQos(MqttQos.atLeastOnce); client.connectionMessage = connMessage; await client.connect(); // Subscribe to chat room client.subscribe('chat/room1', MqttQos.atLeastOnce); // Listen for messages setupMessageListener(); // Listen for delivery confirmations setupDeliveryListener(); } void setupMessageListener() { client.updates?.listen((messages) { final message = messages[0].payload as MqttPublishMessage; final payload = MqttPublishPayload.bytesToStringAsString( message.payload.message ); final chatMessage = ChatMessage.fromJson(json.decode(payload)); // Notify app onMessageReceived(chatMessage); // Send read receipt sendReadReceipt(chatMessage.id); }); } void setupDeliveryListener() { client.published?.listen((message) { final messageId = message.variableHeader?.messageIdentifier; if (messageId != null) { onMessageDelivered(messageId.toString()); } }); } void sendMessage(String text) { final message = ChatMessage( id: DateTime.now().millisecondsSinceEpoch.toString(), senderId: userId, text: text, timestamp: DateTime.now(), ); final builder = MqttClientPayloadBuilder(); builder.addString(json.encode(message.toJson())); client.publishMessage( 'chat/room1', MqttQos.atLeastOnce, // ← Ensures delivery builder.payload!, ); } void sendReadReceipt(String messageId) { final builder = MqttClientPayloadBuilder(); builder.addString(json.encode({ 'messageId': messageId, 'readBy': userId, 'timestamp': DateTime.now().toIso8601String(), })); client.publishMessage( 'chat/room1/receipts', MqttQos.atLeastOnce, builder.payload!, ); } }
Summary
| Method | Pros | Cons |
|---|---|---|
| QoS 0 | Fast, low overhead | ❌ No delivery guarantee |
| QoS 1 | ✅ Delivery confirmed | May receive duplicates |
| QoS 2 | ✅ Exactly once | Slower, more overhead |
| Custom ACK | ✅ Application-level tracking | More complex |
| Persistent Session | ✅ Works when offline | Requires same client ID |
Best Practice: Use QoS 1 for most applications (chat, notifications). Use QoS 2 only for critical commands. Add custom ACK topics for read receipts and application-level acknowledgments.