How to Use Bloc with Streams and Concurrency

Or, how to migrate your blocs and cubits to Bloc >=7.2.0

Flutter
November 1, 2021
updated on
November 18, 2021
and 
November 1, 2021
updated on
November 18, 2021
By 
Guest Contributor

Bloc 7.2.0 and Bloc 8.0.0 introduce a new way to register event handlers. With this change comes several benefits, including reduced boilerplate, better consistency with cubit, and, most of all, concurrent event processing — by default!

Let's take a look at the classic Counter bloc, updated for Bloc 7.2.0.


class CounterBloc extends Bloc<CounterEvent, int> {
  CounterBloc() : super(0) {
    // The future is here.
    on<Increment>((event, emit) => emit(state + 1));
    on<Decrement>((event, emit) => emit(state - 1));
  }
}

Prior to Bloc 7.2.0, events added all at once would be processed sequentially. Bloc >=7.2.0 now processes event handlers at the same time. Let's see what that means in practice.

The New Reality


BlocProvider.of<MyBloc>(context)..add(EventA())..add(EventB());

In the example above, both EventA and EventB will be processed concurrently. If they yield conflicting states, a race condition results. In Bloc >=7.2.0, whichever event finishes processing first in that particular moment will have its state overwritten by the event which took the longest.

If you haven't designed your blocs with this in mind, don't worry! The rest of this article will demonstrate several examples using the old bloc syntax and show how they can be migrated to safely and effectively use Bloc 7.2.0.

In this article, we'll build each scenario the old way, using mapEventToState, before migrating it to Bloc 7.2.0's new event registration system. In total, we will demonstrate three different scenarios, each taking advantage of bloc's new concurrency features.

To avoid making this article too long, we're showing only the relevant snippets of code which explain the concept. You can view the complete source code for all three scenarios in the accompanying code repository, which was created with Very Good CLI. The code for each bloc is included twice: one uses the old mapEventToState approach, and the other uses the new event registration system. In addition, if you run the app, you can examine the code for both the old and new blocs and see how they work with the same exact widgets.

Sequential Events

Prior to Bloc 7.2.0, events were processed sequentially, one after the other. If an event called asynchronous methods and took a long time to finish, the other events just queued up, patiently waiting for their turn.


class MyBloc extends Bloc<MyEvent, MyState> {
  MyBloc(): super(MyState());

  Stream<MyState> mapEventToState(MyEvent event) async* {
    if (event is DoSomething) {
      // If fetchSomething() takes a while, it will block the
      // internal event queue, preventing other events from firing.
      final results = await repo.fetchSomething();
      yield MyState(results: results);
    }
  }
}

If you needed to process events concurrently prior to Bloc 7.2.0, you could have used a cubit.


class MyCubit extends Cubit<MyState> {
  Future<void> doSomething() async {
    final results = await repo.fetchSomething();
    emit(MyState(results: results, otherThings: state.otherThings));
  }

  Future<void> doSomethingElse() async {
    final otherThings = await repo.doSomethingElse();
    emit(MyState(results: results, otherThings: state.otherThings));
  }
}

Because cubits emit states directly from methods, you can call as many methods on a cubit as you want from a widget (or anything that's using a cubit).

Bloc 7.2.0 aligns bloc and cubit more closely since bloc now processes events concurrently. While cubit itself doesn't use events, you can await multiple methods on it at once, which is essentially the same as processing multiple events at once in a bloc.


class MyWidget extends StatelessWidget {
  Widget build(BuildContext context) {
    return BlocBuilder<MyCubit, MyState>(
      builder: (context, state) {
        return TextButton(
          onPressed: () {
            final cubit = BlocProvider.of<MyCubit>(context);
            // We'll pretend the cubit methods return futures that
            // we are deliberately not awaiting, allowing us to
            // fire them both at the same time.
            cubit.doSomething();
            cubit.doSomethingElse();
          },
          child: Text(state.results),
        );
      }
    )
  }
}

A Structured Approach to Concurrency

Bloc 7.2.0 has deprecated mapEventToState in preparation for Bloc 8.0.0, which will require event handlers to use the on<Event>() registration API.

Below is a demonstration of the new event registration API. Note that we're passing in one of the supported event transformers from the new bloc_concurrency package to demonstrate how concurrency can be handled per event class.


import 'package:bloc/bloc.dart';
import 'package:bloc_concurrency/bloc_concurrency.dart';

class MyBloc extends Bloc<MyEvent, MyState> {
  MyBloc(): super(MyState()) {
    on<DoSomething>(
      (event, emit) async {
        final results = await repo.fetchSomething();
        // Use emit instead of yield.
        emit(MyState(results: results));
      },
      transformer: droppable(),
    );
  }
}

Different types of events registered with on<Event>() are processed concurrently, meaning that blocs and cubits are finally on equal footing.

What about mapEventToState?

While you're migrating old blocs, you may be tempted to just slap the sequential() transformer on all your events to mimic the previous behavior. You could even specify it as the default for your entire app globally.


// Change the default event transformer for all blocs everywhere.
Bloc.transformer = sequential<dynamic>();

However, that statement alone will not make Bloc behave like it used to! Events fall into buckets, based on how you register them. Instead of processing all events sequentially in the order they were added, each event is processed sequentially within its own bucket, and multiple buckets can be processing concurrently.

If you choose to override the global event transformer, as shown above, you must also remember to do it in your tests.

Overriding the default event transformer is equivalent to registering event buckets like this:


on<EventA>((event, emit) { ... }, transformer: sequential());
on<EventB>((event, emit) { ... }, transformer: sequential());

// Only one EventA can be running at once.
// Only one EventB can be running at once.
// An EventA and an EventB can both be running at the same time.

If you need your blocs to behave purely sequentially, you'll have to register a single bucket to catch all of the events. You'll also have to ensure your event bucket transformer is sequential (or change the global transformer). The resulting behavior is equivalent to the way mapEventToState behaved.


abstract class MyEvent {}
class EventA extends MyEvent {}
class EventB extends MyEvent {}

class MyBloc extends Bloc<MyEvent, MyState> {
  MyBloc() : super(MyState()) {
    // To catch every event, we just need to register
    // the parent class of all the event subclasses.
    on<MyEvent>(
      // If you squint, it's just like looking at mapEventToState
      (event, emit) {
        // Don't forget to pass emit to your handlers, though!
        if (event is EventA) _handleEventA(event, emit);
        else if (event is EventB) _handleEventB(event, emit);
      },
      transformer: sequential(),
    );
  }

  void _handleEventA(EventA event, Emitter<MyState> emit) { ... }
  void _handleEventB(EventB event, Emitter<MyState> emit) { ... }
}

For many migration scenarios, you might not need anything else, but, if you want to take advantage of the new features, read on!

Blocs and Streams

Sometimes, we have blocs that listen to a stream. Often, this stream bubbles up from a repository in our domain layer, where it can be listened to by multiple blocs. The blocs listening in use the data stream to make their business logic decisions.

It's not uncommon to find streams in large Flutter apps when dealing with authentication, video streaming, image uploads/downloads, etc.

Often, using a repository that exposes a stream of data — i.e., a reactive repository — is one way to prevent blocs from depending on other blocs. Instead of creating a sibling dependency between blocs, which is extremely hard to manage as a project grows, moving the data to the domain layer and "listening in" from blocs above is often an elegant solution to what would otherwise be a dependency nightmare. There are other solutions to prevent direct bloc-to-bloc coupling, too, such as putting bloc listeners higher in the widget tree and adding events to other blocs when another bloc's state changes.

Video Streaming Example

We can demonstrate Bloc's improved support for working with streams by creating a video streaming app! We'll use a fake video streaming repository that exposes a stream of images. Each image represents a video frame received from over the network.

Before Bloc 7.2.0, this is what our bloc might have looked like:


class VideoStreamBlocOld extends Bloc<VideoStreamEvent, VideoStreamState> {
  VideoStreamBlocOld({
    required this.videoStreamingRepo,
    required VideoData frame,
  }) : super(VideoStreamState.initial(frame));

  final VideoStreamingRepo videoStreamingRepo;
  StreamSubscription<VideoData>? _subscription;

  @override
  Stream<VideoStreamState> mapEventToState(VideoStreamEvent event) async* {
    if (event is VideoStreamPlayPauseEvent) {
      if (event.play) {
        _subscription ??=
            videoStreamingRepo.videoDataStream.listen(_onVideoData);
        yield VideoStreamState(
          currentFrame: state.currentFrame,
          isPlaying: true,
        );
      } else {
        await _subscription?.cancel();
        _subscription = null;
        yield VideoStreamState(
          currentFrame: state.currentFrame,
          isPlaying: false,
        );
      }
    } else if (event is VideoStreamUpdatedEventOld) {
      if (state.isPlaying) {
        yield VideoStreamState(currentFrame: event.frame, isPlaying: true);
      }
    }
  }

  @override
  Future<void> close() async {
    await _subscription?.cancel();
    await super.close();
  }

  void _onVideoData(VideoData data) {
    add(VideoStreamUpdatedEventOld(frame: data));
  }
}

Unfortunately, we have to go through the usual hassle of subscribing to a stream via listen() and canceling the subscription (if it is active) during bloc close(). We can't use await for to easily listen to the stream. If we did, the event handler would run for as long as the stream was alive, blocking any other events in that bloc from executing.

Technically, you can break out of an await for, but Bloc doesn't provide a way for a specific invocation of an event handler to know when it has outlived its usefulness.


// DON'T DO THIS IN A BLOC.
// THIS PREVENTS YOUR EVENT HANDLERS FROM FINISHING.
await for (final frame in videoDataStream) {
  // Beautiful, but not meant to be.
  yield VideoStreamState(currentFrame: event.frame, isPlaying: true);
}

Because we have to manage our stream manually from mapEventToState, it's best to fire another event whenever we receive stream data so that we can yield out states correctly. If we don't, we risk running into the nested async generator bug.

Now, imagine we've upgraded to Bloc 7.2.0 in our app. Here's what our video streaming bloc might look like now.


class VideoStreamBlocNew extends Bloc<VideoStreamEvent, VideoStreamState> {
  VideoStreamBlocNew({
    required this.videoStreamingRepo,
    required VideoData frame,
  }) : super(VideoStreamState.initial(frame)) {
    on<VideoStreamPlayPauseEvent>(
      (event, emit) async {
        if (event.play) {
          await emit.forEach(
            videoStreamingRepo.videoDataStream,
            onData: (VideoData videoStreamData) => VideoStreamState(
              currentFrame: videoStreamData,
              isPlaying: true,
            ),
          );
        } else {
          emit(
            VideoStreamState(
              currentFrame: state.currentFrame,
              isPlaying: false,
            ),
          );
        }
      },
      // Allow only one of these events to ever be active at once, canceling
      // any active `emit.forEach` above.
      transformer: restartable(),
    );
  }

  final VideoStreamingRepo videoStreamingRepo;
}

Wow! That cleaned up nicely.

When working with streams in Bloc >=7.2.0, we can use the methods provided on the emit object to make working with streams easier. The forEach method allows us to receive a callback when a stream has data. When our event handler is canceled or finished, it cleans up after us, ending the stream subscription.

Notice how we were able to eliminate the VideoStreamUpdatedEventOld event entirely. Because of the way emit is given to us in our event handler, we can emit states directly from inside the stream callback without any issues, drastically reducing the amount of boilerplate needed when working with streams and circumventing the synchronization errors that would have resulted from the nested async generator bug.

While you might think emit is just a function used to spit out states, like it is on cubit, it's actually a callable class. If you look at the type information, you'll see that emit is actually an Emitter<State>. When you invoke emit as a function, it's actually calling the call method of the Emitter<State> class.

We also used the restartable() transformer for the VideoStreamPlayPauseEvent. The restartable() event transformer will cancel any previous VideoStreamPlayPauseEvent event still running when a new one is added, ensuring only one event handler is ever active at once (for those particular types of events). Using restartable() is a great way to keep your state up to date with events that might still be processing when another event of the same type is added. Don't use it, however, if the value of previous results is important to your bloc!

Using Bloc Concurrency Transformers

Because Bloc now allows us to specify concurrency per event type, we can more easily influence how events are processed than we did before Bloc 7.2.0. We can even allow some events to be processed concurrently, while others are processed sequentially.

As a rule of thumb, events which "read" data can often be concurrent and restartable, since the UI typically only cares about the most recent state from a bloc. Events which write (or mutate) data will need to be handled on a case-by-case basis. If a mutation event updates or replaces multiple pieces of state, that event might actually be an atomic operation. Generally speaking, atomic operations should be processed sequentially.

That is, if the order of mutation events influences the final state, those events should be processed sequentially.

If, however, the order of mutation events does not influence the final state, the events may be processed concurrently. Such an example is given below.

Deleting Files Concurrently in the Cloud

For our second example, we'll build a stripped-down version of a cloud file storage app, such as OneDrive or Dropbox, that allows the user to manage the files they've uploaded (except ours will be much simpler). Instead of uploading files, we can only delete files and pull to refresh.

We'll use a fake repository implementation which starts off with some placeholder files. If the fake repository receives a request to load more files after all the files have been deleted, it will start over with a fresh set of placeholder files. Now you can fulfill that fantasy of deleting files forever!

To demonstrate the old way of doing things, we'll use a cubit to manage the file logic. Before Bloc 7.2.0, using a cubit was the only practical way to process "events" concurrently without holding futures in the bloc state.

First, we need to define the state we'll be using for our file cubit.


class FileState extends Equatable {
  final Map<FileId, File> fileView;
  late final List<File> files = fileView.values.toList();
  final Set<FileId> pendingDeletions;
  final bool isLoading;
  final Object? error;
}

Later on, we'll use the exact same state for the updated bloc. Of importance is the fileView map, which contains our File model objects, keyed by their IDs. We've used a typedef to define FileId as an int, and we've omitted the File model here for brevity. If you want to check out the full implementation, see the accompanying repository.

The state also exposes the list files to make it easier for widgets to consume in a ListView.builder. We're using the late final modifier on this property to avoid converting the map to a list more than once per state. In a real app, we might have sorted the list based upon filename, timestamp, or a sorting field specified by the user.

When you create a Map in Dart, you're actually creating a LinkedHashMap under the hood, which is sorted by key insertion order. Calling fileView.values.toList() will reflect that ordering in the resulting list.

We'll define two methods on our cubit: loadFiles() and deleteFile(FileId). Both will emit states using the cubit's emit method, which is equivalent to yielding a state from a bloc. Because fileView is a map and pendingDeletions is a set, we can easily update our files and pending deletions without disturbing the rest of the state.


class FileCubitOld extends Cubit<FileState>
    with FileCubitCommon
    implements FileCubit {
  FileCubitOld({required this.fileRepo})
      : super(
          const FileState(fileView: {}, pendingDeletions: {}, isLoading: false),
        );

  @override
  final FileRepo fileRepo;
  
  Future<void> loadFiles() async {
    Map<FileId, File>? files;
    Object? error;
    try {
      emit(
        FileState(
          fileView: state.fileView,
          pendingDeletions: state.pendingDeletions,
          isLoading: true,
        ),
      );
      files = await fileRepo.loadFiles();
    } catch (e) {
      error = e;
    } finally {
      emit(
        FileState(
          fileView: files ?? state.fileView,
          pendingDeletions: state.pendingDeletions,
          isLoading: false,
          error: error,
        ),
      );
    }
  }

  Future<void> deleteFile(FileId fileId) async {
    if (state.pendingDeletions.contains(fileId)) {
      return;
    }
    emit(
      FileState(
        fileView: state.fileView,
        isLoading: state.isLoading,
        pendingDeletions: {
          ...state.pendingDeletions,
          fileId,
        },
      ),
    );
    try {
      await fileRepo.deleteFile(id: fileId);
      emit(
        FileState(
          isLoading: state.isLoading,
          fileView: {...state.fileView}..remove(fileId),
          pendingDeletions: {
            ...state.pendingDeletions,
          }..remove(fileId),
        ),
      );
    } catch (e) {
      emit(
        FileState(
          isLoading: state.isLoading,
          fileView: state.fileView,
          pendingDeletions: {
            ...state.pendingDeletions,
          }..remove(fileId),
          error: e,
        ),
      );
    }
  }

The deleteFile method indicates that a deletion is pending while it waits on the deletion result from the file repository by emitting a state which adds the fileId to the pendingDeletions set. If the deletion succeeds, deleteFile emits a state that removes the file from the fileView map and the fileId from the set pendingDeletions. Likewise, if the deletion fails, deleteFile emits a state with the error object containing the exception that occurred. All that's left to do is migrate it to the new event system!

Fortunately, migrating a cubit to use the new event registration is trivial. The resulting bloc logic is essentially identical. In fact, the actual example uses a mixin to share the loadFiles and deleteFiles method between the old cubit and the new bloc.


class FileBlocNew extends Bloc<FileEvent, FileState> with FileCubitCommon {
  FileBlocNew({required this.fileRepo})
      : super(
          FileState(
            fileView: const {},
            pendingDeletions: const {},
            isLoading: false,
          ),
        ) {
    on<LoadFiles>(
      (event, emit) => loadFiles(event, emit.call),
      transformer: restartable(),
    );
    on<DeleteFile>(
      (event, emit) => deleteFile(event, emit.call),
      transformer: concurrent(),
    );
  }

  @override
  final FileRepo fileRepo;
}

We're using restartable() on the LoadFiles event so that only the latest loading event will ever emit states, keeping our state nice and clean.

If we instead let multiple loading events run at once, an older LoadFiles event, which took longer to complete than a newer one, might emit states which overwrite the ones from the newer event and show older files that may not exist anymore. Whenever that happens, the UI gets out of sync. Further, if the bloc or widgets try to perform operations on data that's not there (or out of date), state errors can creep up out of nowhere, wrecking the application.

Making the LoadFiles event restartable ensures our UI is synchronized with our server, even in the most trying conditions. Such hangups are extremely unlikely, but the whole point of bloc is to provide predictable state management.

Because deleting a file doesn't affect the other files, we're able to process those events concurrently, which keeps the UI unblocked and allows the user to keep doing what they do best.

Checking Username Availability

Back in the old days, transforming bloc events quickly became non-trivial. In this third (and final) scenario, we'll demonstrate how event transformation has improved by creating a user registration form.

When the user types, the form will send username-changed events to the bloc, which will then check to see if the given username is available. The bloc will debounce username availability check requests and restart any pending events when a duplicate event is received.

By debouncing, we mean that the bloc will only actually fire an event if an event of the same type hasn't fired within a given duration. We also want the username availability check events to be restartable, since we only care about the latest result.

For the sake of simplicity, we've omitted some of the details surrounding the form validation (we're using formz), but you can find all of it in the accompanying code repository.

Here's what the old bloc might have looked like.


class RegistrationBlocOld extends Bloc<RegistrationEvent, RegistrationState> {
  RegistrationBlocOld({required this.registrationRepo})
      : super(RegistrationState.initial);

  @override
  final RegistrationRepo registrationRepo;

  @override
  Stream<RegistrationState> mapEventToState(RegistrationEvent event) async* {
    if (event is UsernameChanged) {
      // ...
    } else if (event is Register) {
      // ...
    }
  }

  @override
  Stream<Transition<RegistrationEvent, RegistrationState>> transformEvents(
    Stream<RegistrationEvent> events,
    Stream<Transition<RegistrationEvent, RegistrationState>> Function(
            RegistrationEvent)
        transitionFn,
  ) {
    final deferredEvents = events
        .where((e) => e is UsernameChanged)
        .debounceTime(RegistrationBloc.debounceUsernameDuration)
        .distinct()
        .switchMap(transitionFn);
    final forwardedEvents =
        events.where((e) => e is! UsernameChanged).asyncExpand(transitionFn);
    return forwardedEvents.mergeWith([deferredEvents]);
  }

The transformEvents method above uses Stream extension methods from rxdart to debounce only the UsernameChanged events and make them restartable. If that's not immediately apparent to you, don't worry. You are not alone.

Thankfully, the new way of doing things makes it much easier to transform bloc event streams. In fact, our new bloc might look something like this:


class RegistrationBlocNew extends Bloc<RegistrationEvent, RegistrationState> {
  RegistrationBlocNew({required this.registrationRepo})
      : super(RegistrationState.initial) {
    on<UsernameChanged>(
      (event, emit) async {
        // ...
      },
      // We will use our own debounce/restartable transformer for this event.
      // Other events can use their own appropriate transformers.
      transformer:
          debounceRestartable(RegistrationBloc.debounceUsernameDuration),
    );
    on<Register>(
      (event, emit) async {
        // ...
      },
      transformer: sequential(),
    );
  }

  @override
  final RegistrationRepo registrationRepo;

  EventTransformer<RegistrationEvent> debounceRestartable<RegistrationEvent>(
    Duration duration,
  ) {
    // This feeds the debounced event stream to restartable() and returns that
    // as a transformer.
    return (events, mapper) => restartable<RegistrationEvent>()
        .call(events.debounceTime(duration), mapper);
  }
}

Because the new bloc API applies transformers to event buckets, we don't have to worry about merging the streams back together to only selectively transform certain events, like we did in the old bloc.

Migrating Messy Blocs

Before we finish, let's discuss one more pitfall you might encounter while migrating a bloc. You've probably seen a bloc that looks something like this before, or maybe you've even written one like it:


class BadBloc extends Bloc<MyEvent, MyState> {
  BadBloc(): super(MyState());

  @override Stream<MyState> mapEventToState(MyEvent event) async* {
    if (event is EventA) {
      yield* _handleEventA();
    }
  }

  Stream<MyState> _handleEventA() async* {
    yield* _helperMethod();
    ...
  }

  Stream<MyState> _helperMethod() async* {
    yield MyState( ... );
    ...
  }
}

The bloc above yields states from helper methods — an indicator that the bloc code can be cleaned up. Besides the fact it triggers the nested async generator bug, the code above is also harder to migrate and harder to follow.


class BadBloc extends Bloc<MyEvent, MyState> {
  BadBloc(): super(MyState()) {
    on<EventA>((event, emit) => _eventA(event, emit));
  }

  void _eventA(EventA event, Emitter<MyState> emit) async {
    _helperMethod(emit);
    ...
  }

  void _helperMethod(Emitter<MyState> emit) async {
    emit(MyState( ... ));
    ...
  }
}

Notice how we have to pass the emit all the way from the on handler through _eventA and into _helperMethod? If _helperMethod had relied on yet another helper method to emit states for it, it would also have to keep passing emit down through the call stack. Not only is this a lot of extra typing, it also makes the code harder to follow.

If we only emit or yield states from primary event handlers, we can avoid the aforementioned headache. Let's refactor the original bloc. The helper methods can return any information needed by the calling event handler, but they shouldn't emit states.


@override Stream<MyState> mapEventToState(MyEvent event) async* {
  if (event is EventA) {
    yield* _handleEventA();
  }
}

Stream<MyState> _handleEventA() async* {
  final nextState = _helperMethod();
  yield nextState;
  ...
}

MyState _helperMethod() {
  ...
  return MyState( ... );
}

Here, we're returning a state from _helperMethod, rather than using it to yield states directly. This avoids the nested async generator bug and helps us keep all of our state changes in the actual event handler.

When we migrate the new and improved bloc, we can see that emit only needs to be passed to the handler method. Helper methods will return other types of data or states, but they should never emit states directly.


class GoodBloc extends Bloc<MyEvent, MyState> {
  GoodBloc(): super(MyState()) {
    on<EventA>((event, emit) => _eventA(event, emit));
  }

  void _eventA(EventA event, Emitter<MyState> emit) async {
    final nextState = _helperMethod();
    emit(nextState);
    ...
  }

  MyState _helperMethod() {
    return MyState( ... );
  }
}

While cleaning up the method signatures might seem like a subtle change, we've greatly improved the readability and maintainability of our bloc. All of the state changes for a given event happen in one place, so it's easy to see when something goes wrong.

Wrapping Up

Bloc >=7.2.0 allows us to easily take advantage of concurrent event processing, while still being predictable, easy to test, and indicative of clear and concise business logic contracts. By designing our blocs thoughtfully, we can take advantage of everything Bloc has to offer to solve business logic problems more elegantly than ever before.

If you haven't already, be sure to check out the complete code example.

If you're someone who appreciates subtle details, code readability, and Flutter architecture, you might be interested in a career here at Very Good Ventures.
Additional Contributors

More Stories