Question #80MediumFlutter BasicsImportant

What is streams and how to use streams in flutter and what is types of stream in flutter ?

#flutter#stream

Answer

Streams in Flutter

Streams are a fundamental concept in Dart and Flutter for handling asynchronous data sequences. A stream is like a pipe that delivers data over time - you can listen to it and receive values whenever they're available.

What is a Stream?

A Stream is a sequence of asynchronous events. It's like a conveyor belt that delivers data packages one by one over time, rather than all at once.

Types of Streams

Stream TypeListenersUse Case
Single SubscriptionOnly one listenerFile I/O, HTTP requests, one-time events
Broadcast StreamMultiple listenersUI events, real-time updates, notifications

Creating and Using Streams

1. Single Subscription Stream

dart
import 'dart:async';

// Creating a stream that emits numbers
Stream<int> numberStream() async* {
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i; // Emit value
  }
}

// Using the stream
void main() async {
  final stream = numberStream();

  // Listen to stream
  await for (int value in stream) {
    print('Received: $value');
  }
  // Output: Received: 1, 2, 3, 4, 5 (one per second)
}

2. Broadcast Stream

dart
import 'dart:async';

void main() {
  // Create a StreamController
  final controller = StreamController<String>.broadcast();

  // First listener
  controller.stream.listen((data) {
    print('Listener 1: $data');
  });

  // Second listener
  controller.stream.listen((data) {
    print('Listener 2: $data');
  });

  // Add data to stream
  controller.add('Hello');
  controller.add('World');

  // Clean up
  controller.close();
}

Stream Components

StreamController

The most common way to create and manage streams:

dart
// Create a controller
final controller = StreamController<int>();

// Add data to stream
controller.add(1);
controller.add(2);
controller.add(3);

// Listen to stream
controller.stream.listen(
  (data) => print('Data: $data'),
  onError: (error) => print('Error: $error'),
  onDone: () => print('Stream closed'),
);

// Close the stream when done
controller.close();

Stream Transformers

Transform stream data before it reaches the listener:

dart
Stream<int> numberStream = Stream.fromIterable([1, 2, 3, 4, 5]);

// Transform: multiply each number by 2
Stream<int> doubledStream = numberStream.map((number) => number * 2);

// Listen
doubledStream.listen((data) => print(data));
// Output: 2, 4, 6, 8, 10

Common Stream Methods

dart
// where() - Filter values
stream.where((value) => value > 10);

// map() - Transform values
stream.map((value) => value * 2);

// take() - Take first N values
stream.take(5);

// skip() - Skip first N values
stream.skip(2);

// distinct() - Remove duplicates
stream.distinct();

// timeout() - Add timeout
stream.timeout(Duration(seconds: 5));

// handleError() - Handle errors
stream.handleError((error) => print(error));

Using Streams in Flutter Widgets

StreamBuilder Widget

dart
class CounterScreen extends StatelessWidget {
  final StreamController<int> _controller = StreamController<int>();

  void _incrementCounter() {
    // Simulate counter updates
    Stream.periodic(Duration(seconds: 1), (count) => count)
        .take(10)
        .listen((count) => _controller.add(count));
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('Stream Counter')),
      body: Center(
        child: StreamBuilder<int>(
          stream: _controller.stream,
          initialData: 0,
          builder: (context, snapshot) {
            if (snapshot.hasError) {
              return Text('Error: ${snapshot.error}');
            }

            if (snapshot.connectionState == ConnectionState.waiting) {
              return CircularProgressIndicator();
            }

            return Text(
              'Count: ${snapshot.data}',
              style: TextStyle(fontSize: 48),
            );
          },
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: _incrementCounter,
        child: Icon(Icons.play_arrow),
      ),
    );
  }

  
  void dispose() {
    _controller.close();
    super.dispose();
  }
}

Real-World Examples

1. Real-Time Chat Application

dart
class ChatService {
  final _messageController = StreamController<List<Message>>.broadcast();

  Stream<List<Message>> get messages => _messageController.stream;

  void sendMessage(Message message) {
    // Send to server and update stream
    _messageController.add([message]);
  }

  void dispose() {
    _messageController.close();
  }
}

2. Search with Debouncing

dart
class SearchScreen extends StatefulWidget {
  
  _SearchScreenState createState() => _SearchScreenState();
}

class _SearchScreenState extends State<SearchScreen> {
  final _searchController = StreamController<String>();
  late Stream<List<String>> _resultsStream;

  
  void initState() {
    super.initState();

    _resultsStream = _searchController.stream
        .debounceTime(Duration(milliseconds: 500)) // Wait 500ms after typing stops
        .distinct() // Ignore duplicate queries
        .switchMap((query) => _performSearch(query)); // Perform search
  }

  Stream<List<String>> _performSearch(String query) async* {
    // Simulate API call
    await Future.delayed(Duration(milliseconds: 300));
    yield ['Result 1 for $query', 'Result 2 for $query'];
  }

  
  Widget build(BuildContext context) {
    return Column(
      children: [
        TextField(
          onChanged: (value) => _searchController.add(value),
          decoration: InputDecoration(hintText: 'Search...'),
        ),
        StreamBuilder<List<String>>(
          stream: _resultsStream,
          builder: (context, snapshot) {
            if (!snapshot.hasData) return CircularProgressIndicator();
            return ListView(
              children: snapshot.data!
                  .map((result) => ListTile(title: Text(result)))
                  .toList(),
            );
          },
        ),
      ],
    );
  }

  
  void dispose() {
    _searchController.close();
    super.dispose();
  }
}

3. Location Tracking

dart
import 'package:geolocator/geolocator.dart';

class LocationService {
  Stream<Position> getLocationStream() {
    return Geolocator.getPositionStream(
      locationSettings: LocationSettings(
        accuracy: LocationAccuracy.high,
        distanceFilter: 10, // Update every 10 meters
      ),
    );
  }
}

// Usage in widget
class LocationTracker extends StatelessWidget {
  final locationService = LocationService();

  
  Widget build(BuildContext context) {
    return StreamBuilder<Position>(
      stream: locationService.getLocationStream(),
      builder: (context, snapshot) {
        if (!snapshot.hasData) {
          return Text('Waiting for location...');
        }

        final position = snapshot.data!;
        return Text(
          'Lat: ${position.latitude}, Lng: ${position.longitude}',
        );
      },
    );
  }
}

Stream Best Practices

  1. Always Close Streams: Prevent memory leaks by closing StreamControllers
  2. Use Broadcast Streams for Multiple Listeners: Single subscription streams can only have one listener
  3. Handle Errors Properly: Use
    text
    onError
    callback in listen() or handleError() transformer
  4. Dispose in Widgets: Close streams in
    text
    dispose()
    method
  5. Use RxDart for Advanced Operations: For debouncing, throttling, and complex transformations
  6. Avoid Blocking Operations: Keep stream operations non-blocking

Common Pitfalls

dart
// ❌ Bad: Listening multiple times to single subscription stream
final stream = Stream.fromIterable([1, 2, 3]);
stream.listen((data) => print(data));
stream.listen((data) => print(data)); // Error!

// ✅ Good: Use broadcast stream
final controller = StreamController<int>.broadcast();
controller.stream.listen((data) => print(data));
controller.stream.listen((data) => print(data)); // Works!

// ❌ Bad: Not closing stream
final controller = StreamController<int>();
// ... use stream
// Forgot to close - memory leak!

// ✅ Good: Always close
final controller = StreamController<int>();
// ... use stream
controller.close(); // Clean up

Popular Stream Packages

  • rxdart: Advanced stream operators (debounce, throttle, combine)
  • stream_transform: Additional stream transformers
  • async: Core async utilities including streams
yaml
dependencies:
  rxdart: ^0.27.7

Important: Streams are essential for reactive programming in Flutter. Master streams to build responsive, real-time applications.

Documentation: Dart Asynchronous Programming: Streams