In MQTT how you will you know the device/user received the message when it is published to the subscribed topic? also provide me the entire entire flow of architecture diagram how it will work?

#mqtt#messaging#qos#real-time#architecture

Answer

Overview

MQTT (Message Queuing Telemetry Transport) uses Quality of Service (QoS) levels to ensure message delivery and acknowledgment.


MQTT QoS Levels

QoS LevelNameDelivery GuaranteeAcknowledgment
QoS 0At most onceFire and forget❌ No ACK
QoS 1At least onceGuaranteed delivery✅ PUBACK
QoS 2Exactly onceNo duplicates✅ PUBREC, PUBREL, PUBCOMP

QoS 0: Fire and Forget

text
Publisher                    MQTT Broker                 Subscriber
    │                             │                           │
    │─────PUBLISH (QoS 0)─────────►│                           │
    │                             │─────PUBLISH (QoS 0)───────►│
    │                             │                           │
    │  ❌ No acknowledgment       │  ❌ No acknowledgment     │
    │  ❌ No retry               │  ❌ No retry             │

Use Case: Sensor data where occasional loss is acceptable


QoS 1: At Least Once (With Acknowledgment)

text
Publisher                    MQTT Broker                 Subscriber
    │                             │                           │
    │─────PUBLISH (QoS 1)─────────►│                           │
    │                             │─────PUBLISH (QoS 1)───────►│
    │                             │                           │
    │◄────PUBACK──────────────────│◄────PUBACK───────────────│
    │  ✅ Confirmed!              │  ✅ Confirmed!            │
    │                             │                           │
    │  If no PUBACK received:     │  If no PUBACK received:   │
    │  🔄 Retry with DUP flag    │  🔄 Retry delivery       │

Use Case: Important messages that must be delivered (chat messages, alerts)


QoS 2: Exactly Once (No Duplicates)

text
Publisher                    MQTT Broker                 Subscriber
    │                             │                           │
    │─────PUBLISH (QoS 2)─────────►│                           │
    │                             │                           │
    │◄────PUBREC──────────────────│                           │
    │  (Message received)          │                           │
    │                             │                           │
    │─────PUBREL──────────────────►│                           │
    │  (Release message)           │─────PUBLISH (QoS 2)──────►│
    │                             │                           │
    │                             │◄────PUBREC───────────────│
    │                             │                           │
    │                             │─────PUBREL───────────────►│
    │                             │                           │
    │◄────PUBCOMP─────────────────│◄────PUBCOMP──────────────│
    │  ✅ Exactly once delivered! │  ✅ No duplicates!       │

Use Case: Financial transactions, critical commands


Complete MQTT Architecture Flow

text
┌────────────────────────────────────────────────────────────┐
│                    MQTT Architecture                        │
└────────────────────────────────────────────────────────────┘

┌──────────────┐         ┌──────────────┐         ┌──────────────┐
│  Publisher   │         │  MQTT Broker │         │  Subscriber  │
│  (Device A)  │         │  (Server)    │         │  (Device B)  │
└──────────────┘         └──────────────┘         └──────────────┘
       │                         │                         │
       │  1. CONNECT             │                         │
       │────────────────────────►│                         │
       │  ◄─── CONNACK           │                         │
       │  (Connected)            │                         │
       │                         │  2. CONNECT             │
       │                         │◄────────────────────────│
       │                         │  CONNACK ────►         │
       │                         │  (Connected)            │
       │                         │                         │
       │                         │  3. SUBSCRIBE           │
       │                         │    (topic: "chat/room1")│
       │                         │◄────────────────────────│
       │                         │  SUBACK ────►          │
       │                         │                         │
       │  4. PUBLISH             │                         │
       │    (topic: "chat/room1")│                         │
       │    (QoS: 1)             │                         │
       │────────────────────────►│                         │
       │                         │  5. Store message       │
       │                         │     (if subscriber      │
       │                         │      offline)           │
       │                         │                         │
       │  6. PUBACK              │                         │
       │◄────────────────────────│                         │
       │  ✅ Broker received!    │                         │
       │                         │                         │
       │                         │  7. PUBLISH             │
       │                         │    (to subscriber)      │
       │                         │────────────────────────►│
       │                         │                         │
       │                         │  8. PUBACK              │
       │                         │◄────────────────────────│
       │                         │  ✅ Subscriber received!│

Flutter MQTT Implementation

Setup MQTT Client

dart
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

class MQTTService {
  late MqttServerClient client;
  
  Future<void> connect() async {
    client = MqttServerClient('broker.hivemq.com', '1883');
    client.clientIdentifier = 'flutter_client_${DateTime.now().millisecondsSinceEpoch}';
    client.keepAlivePeriod = 60;
    client.autoReconnect = true;
    
    final connMessage = MqttConnectMessage()
        .withClientIdentifier(client.clientIdentifier)
        .startClean()
        .withWillQos(MqttQos.atLeastOnce);
    
    client.connectionMessage = connMessage;
    
    try {
      await client.connect();
      print('✅ Connected to MQTT broker');
    } catch (e) {
      print('❌ Connection failed: $e');
      client.disconnect();
    }
  }
}

Publishing with QoS 1 (Acknowledgment)

dart
void publishMessage(String topic, String message) {
  final builder = MqttClientPayloadBuilder();
  builder.addString(message);
  
  // Publish with QoS 1 (at least once delivery)
  client.publishMessage(
    topic,
    MqttQos.atLeastOnce,  // ← Ensures acknowledgment
    builder.payload!,
  );
  
  print('📤 Published to $topic: $message');
  
  // Listen for published messages (confirmation)
  client.published?.listen((MqttPublishMessage message) {
    print('✅ Message acknowledged by broker');
    print('   Message ID: ${message.variableHeader?.messageIdentifier}');
  });
}

Subscribing with QoS 1

dart
void subscribe(String topic) {
  // Subscribe with QoS 1 to ensure delivery
  client.subscribe(topic, MqttQos.atLeastOnce);
  
  print('🔔 Subscribed to: $topic');
  
  // Listen for subscription acknowledgment
  client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> messages) {
    final message = messages[0].payload as MqttPublishMessage;
    final payload = MqttPublishPayload.bytesToStringAsString(
      message.payload.message
    );
    
    print('📥 Received: $payload');
    print('   QoS: ${message.header?.qos}');
    print('   Topic: ${messages[0].topic}');
    
    // Send PUBACK automatically by client library
    // (happens internally when QoS 1 or 2)
  });
}

Tracking Message Delivery

Method 1: Using Message ID (QoS 1/2)

dart
class MQTTMessageTracker {
  final Map<int, MessageStatus> _pendingMessages = {};
  
  void publishWithTracking(String topic, String message) {
    final builder = MqttClientPayloadBuilder();
    builder.addString(message);
    
    // Publish and get message ID
    final messageId = client.publishMessage(
      topic,
      MqttQos.atLeastOnce,
      builder.payload!,
    );
    
    // Track message
    _pendingMessages[messageId] = MessageStatus(
      id: messageId,
      topic: topic,
      message: message,
      timestamp: DateTime.now(),
      status: 'pending',
    );
    
    print('📤 Sent message ID: $messageId');
  }
  
  void setupAcknowledgmentListener() {
    client.published?.listen((MqttPublishMessage message) {
      final messageId = message.variableHeader?.messageIdentifier;
      
      if (messageId != null && _pendingMessages.containsKey(messageId)) {
        _pendingMessages[messageId]!.status = 'delivered';
        _pendingMessages[messageId]!.acknowledgedAt = DateTime.now();
        
        print('✅ Message $messageId delivered!');
        
        // Notify UI or callback
        onMessageDelivered(messageId);
        
        // Remove from pending
        _pendingMessages.remove(messageId);
      }
    });
  }
  
  void onMessageDelivered(int messageId) {
    // Update UI, show checkmark, etc.
  }
}

class MessageStatus {
  final int id;
  final String topic;
  final String message;
  final DateTime timestamp;
  String status; // 'pending', 'delivered', 'failed'
  DateTime? acknowledgedAt;
  
  MessageStatus({
    required this.id,
    required this.topic,
    required this.message,
    required this.timestamp,
    required this.status,
    this.acknowledgedAt,
  });
}

Method 2: Application-Level ACK (Custom)

dart
// Publisher side
void publishWithCustomAck(String topic, String message) {
  final messageId = DateTime.now().millisecondsSinceEpoch.toString();
  
  final payload = json.encode({
    'messageId': messageId,
    'content': message,
    'timestamp': DateTime.now().toIso8601String(),
  });
  
  final builder = MqttClientPayloadBuilder();
  builder.addString(payload);
  
  client.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!);
  
  // Subscribe to ACK topic
  client.subscribe('$topic/ack', MqttQos.atLeastOnce);
  
  print('📤 Sent message: $messageId');
}

// Subscriber side
void receiveAndSendAck(String topic) {
  client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> messages) {
    final message = messages[0].payload as MqttPublishMessage;
    final payload = MqttPublishPayload.bytesToStringAsString(
      message.payload.message
    );
    
    final data = json.decode(payload);
    final messageId = data['messageId'];
    
    print('📥 Received message: $messageId');
    
    // Send custom ACK
    final ackBuilder = MqttClientPayloadBuilder();
    ackBuilder.addString(json.encode({
      'messageId': messageId,
      'status': 'received',
      'timestamp': DateTime.now().toIso8601String(),
    }));
    
    client.publishMessage(
      '${messages[0].topic}/ack',
      MqttQos.atLeastOnce,
      ackBuilder.payload!,
    );
    
    print('✅ Sent ACK for message: $messageId');
  });
}

// Publisher listening for ACK
void listenForAck() {
  client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> messages) {
    if (messages[0].topic.endsWith('/ack')) {
      final message = messages[0].payload as MqttPublishMessage;
      final payload = MqttPublishPayload.bytesToStringAsString(
        message.payload.message
      );
      
      final ackData = json.decode(payload);
      print('✅ ACK received for: ${ackData['messageId']}');
      
      // Update UI - show double checkmark
      updateMessageStatus(ackData['messageId'], 'read');
    }
  });
}

Handling Offline Subscribers

Persistent Session (Clean Session = false)

dart
Future<void> connectWithPersistentSession() async {
  client = MqttServerClient('broker.hivemq.com', '1883');
  client.clientIdentifier = 'flutter_user_123'; // Same ID always
  
  final connMessage = MqttConnectMessage()
      .withClientIdentifier(client.clientIdentifier)
      .keepAliveFor(60)
      .startClean(false)  // ← Persistent session
      .withWillQos(MqttQos.atLeastOnce);
  
  client.connectionMessage = connMessage;
  
  await client.connect();
  
  // Subscribe (broker remembers even when offline)
  client.subscribe('chat/room1', MqttQos.atLeastOnce);
}

Flow when subscriber is offline:

text
1. Subscriber connects with cleanSession=false
2. Subscribes to topic
3. Subscriber goes offline
4. Publisher sends message (QoS 1)
5. Broker stores message for offline subscriber
6. Subscriber comes back online
7. Broker delivers all queued messages
8. Subscriber sends PUBACK for each message

Complete Chat Example

dart
class MQTTChatService {
  late MqttServerClient client;
  final String userId;
  final Function(ChatMessage) onMessageReceived;
  final Function(String messageId) onMessageDelivered;
  
  MQTTChatService({
    required this.userId,
    required this.onMessageReceived,
    required this.onMessageDelivered,
  });
  
  Future<void> connect() async {
    client = MqttServerClient('broker.hivemq.com', '1883');
    client.clientIdentifier = 'user_$userId';
    
    final connMessage = MqttConnectMessage()
        .withClientIdentifier(client.clientIdentifier)
        .startClean(false)  // Persistent session
        .keepAliveFor(60)
        .withWillQos(MqttQos.atLeastOnce);
    
    client.connectionMessage = connMessage;
    await client.connect();
    
    // Subscribe to chat room
    client.subscribe('chat/room1', MqttQos.atLeastOnce);
    
    // Listen for messages
    setupMessageListener();
    
    // Listen for delivery confirmations
    setupDeliveryListener();
  }
  
  void setupMessageListener() {
    client.updates?.listen((messages) {
      final message = messages[0].payload as MqttPublishMessage;
      final payload = MqttPublishPayload.bytesToStringAsString(
        message.payload.message
      );
      
      final chatMessage = ChatMessage.fromJson(json.decode(payload));
      
      // Notify app
      onMessageReceived(chatMessage);
      
      // Send read receipt
      sendReadReceipt(chatMessage.id);
    });
  }
  
  void setupDeliveryListener() {
    client.published?.listen((message) {
      final messageId = message.variableHeader?.messageIdentifier;
      if (messageId != null) {
        onMessageDelivered(messageId.toString());
      }
    });
  }
  
  void sendMessage(String text) {
    final message = ChatMessage(
      id: DateTime.now().millisecondsSinceEpoch.toString(),
      senderId: userId,
      text: text,
      timestamp: DateTime.now(),
    );
    
    final builder = MqttClientPayloadBuilder();
    builder.addString(json.encode(message.toJson()));
    
    client.publishMessage(
      'chat/room1',
      MqttQos.atLeastOnce,  // ← Ensures delivery
      builder.payload!,
    );
  }
  
  void sendReadReceipt(String messageId) {
    final builder = MqttClientPayloadBuilder();
    builder.addString(json.encode({
      'messageId': messageId,
      'readBy': userId,
      'timestamp': DateTime.now().toIso8601String(),
    }));
    
    client.publishMessage(
      'chat/room1/receipts',
      MqttQos.atLeastOnce,
      builder.payload!,
    );
  }
}

Summary

MethodProsCons
QoS 0Fast, low overhead❌ No delivery guarantee
QoS 1✅ Delivery confirmedMay receive duplicates
QoS 2✅ Exactly onceSlower, more overhead
Custom ACK✅ Application-level trackingMore complex
Persistent Session✅ Works when offlineRequires same client ID

Best Practice: Use QoS 1 for most applications (chat, notifications). Use QoS 2 only for critical commands. Add custom ACK topics for read receipts and application-level acknowledgments.