The Wayback Machine - https://web.archive.org/web/20191214130856/https://pub.dev/packages/rxdart

rxdart 0.23.1

  • Readme
  • Changelog
  • Example
  • Installing
  • 100

RxDart #

Build Status codecov Pub Gitter

About #

RxDart adds additional capabilities to Dart Streams and StreamControllers.

Dart comes with a very decent Streams API out-of-the-box; rather than attempting to provide an alternative to this API, RxDart adds functionality from the reactive extensions specification on top of it.

RxDart does not provide its own Observable class as a replacement for Dart Streams. Rather, it provides a number of additional Stream classes, operators (extension methods on the Stream class), and Subjects.

If you are familiar with Observables from other languages, please see the Rx Observables vs Dart Streams comparison chart for notable distinctions between the two.

Upgrading from RxDart 0.22.x to 0.23.x #

RxDart 0.23.x moves away from the Observable class, utilizing Dart 2.6's new extension methods instead. This requires several small refactors that can be easily automated -- which is just what we've done!

Please follow the instructions on the rxdart_codemod package to automatically upgrade your code to support RxDart 0.23.x.

How To Use RxDart #

For Example: Reading the Konami Code #

import 'package:rxdart/rxdart.dart';

void main() {
  const konamiKeyCodes = const <int>[
    KeyCode.UP,
    KeyCode.UP,
    KeyCode.DOWN,
    KeyCode.DOWN,
    KeyCode.LEFT,
    KeyCode.RIGHT,
    KeyCode.LEFT,
    KeyCode.RIGHT,
    KeyCode.B,
    KeyCode.A,
  ];
  final result = querySelector('#result');

  document.onKeyUp
    .map((event) => event.keyCode)
    .bufferCount(10, 1) // An extension method provided by rxdart
    .where((lastTenKeyCodes) => const IterableEquality<int>().equals(lastTenKeyCodes, konamiKeyCodes))
    .listen((_) => result.innerHtml = 'KONAMI!');
}

API Overview #

RxDart adds functionality to Dart Streams in three ways:

  • Stream Classes - create Streams with specific capabilities, such as combining or merging many Streams together.
  • Extension Methods - transform a source Stream into a new Stream with different capabilities, such as throttling or buffering events.
  • Subjects - StreamControllers with additional powers

Stream Classes #

The Stream class provides different ways to create a Stream: Stream.fromIterable or Stream.periodic, for example. RxDart provides additional Stream classes for a variety of tasks, such as combining or merging Streams together!

You can construct the Streams provided by RxDart in two ways. The following examples are equivalent in terms of functionality:

  • Instantiating the Stream class directly.
    • Example: final mergedStream = MergeStream([myFirstStream, mySecondStream]);
  • Using static factories from the Rx class, which are useful for discovering which types of Streams are provided by RxDart. Under the hood, these factories simply call the the corresponding Stream constructor.
    • Example: final mergedStream = Rx.merge([myFirstStream, mySecondStream]);

List of Classes / Static Factories #

Extension Methods #

The extension methods provided by RxDart can be used on any Stream. They convert a source Stream into a new Stream with additional capabilities, such as buffering or throttling events.

Example #

Stream.fromIterable([1, 2, 3])
  .throttleTime(Duration(seconds: 1))
  .listen(print); // prints 3

List of Extension Methods #

Subjects #

Dart provides the StreamController class to create and manage a Stream. RxDart offers two additional StreamControllers with additional capabilities, known as Subjects:

  • BehaviorSubject - A broadcast StreamController that caches the latest added value or error. When a new listener subscribes to the Stream, the latest value or error will be emitted to the listener. Furthermore, you can synchronously read the last emitted value.
  • ReplaySubject - A broadcast StreamController that caches the added values. When a new listener subscribes to the Stream, the cached values will be emitted to the listener.

Rx Observables vs Dart Streams #

In many situations, Streams and Observables work the same way. However, if you're used to standard Rx Observables, some features of the Stream api may surprise you. We've included a table below to help folks understand the differences.

Additional information about the following situations can be found by reading the Rx class documentation.

SituationRx ObservablesDart Streams
An error is raisedObservable Terminates with ErrorError is emitted and Stream continues
Cold ObservablesMultiple subscribers can listen to the same cold Observable, each subscription will receive a unique Stream of dataSingle subscriber only
Hot ObservablesYesYes, known as Broadcast Streams
Is {Publish, Behavior, Replay}Subject hot?YesYes
Single/Maybe/Complete ?YesNo, uses Dart Future
Support back pressureYesYes
Can emit null?Yes, except RxJavaYes
Sync by defaultYesNo
Can pause/resume a subscription*?NoYes

Examples #

Web and command-line examples can be found in the example folder.

Web Examples #

In order to run the web examples, please follow these steps:

  1. Clone this repo and enter the directory
  2. Run pub get
  3. Run pub run build_runner serve example
  4. Navigate to http://localhost:8080/web/ in your browser

Command Line Examples #

In order to run the command line example, please follow these steps:

  1. Clone this repo and enter the directory
  2. Run pub get
  3. Run dart example/example.dart 10

Flutter Example #

Install Flutter #

In order to run the flutter example, you must have Flutter installed. For installation instructions, view the online documentation.

Run the app #

  1. Open up an Android Emulator, the iOS Simulator, or connect an appropriate mobile device for debugging.
  2. Open up a terminal
  3. cd into the example/flutter/github_search directory
  4. Run flutter doctor to ensure you have all Flutter dependencies working.
  5. Run flutter packages get
  6. Run flutter run

Notable References #

Changelog #

Refer to the Changelog to get all release notes.

0.23.1 #

  • Fix API doc links in README

0.23.0 #

  • Extension Methods replace Observable class!
  • Please upgrade existing code by using the rxdart_codemod package
  • Remove the Observable class. With extensions, you no longer need to wrap Streams in a [Stream]!
    • Convert all factories to static constructors to aid in discoverability of Stream classes
    • Move all factories to an Rx class.
    • Remove Observable.just, use Stream.value
    • Remove Observable.error, use Stream.error
    • Remove all tests that check base Stream methods
    • Subjects and *Observable classes extend Stream instead of base Observable
    • Rename *Observable to *Stream to reflect the fact they're just Streams.
      • ValueObservable -> ValueStream
      • ReplayObservable -> ReplayStream
      • ConnectableObservable -> ConnectableStream
      • ValueConnectableObservable -> ValueConnectableStream
      • ReplayConnectableObservable -> ReplayConnectableStream
  • All transformation methods removed from Observable class
    • Transformation methods are now Extensions of the Stream class
    • Any Stream can make use of the transformation methods provided by RxDart
    • Observable class remains in place with factory methods to create different types of Streams
    • Removed deprecated ofType method, use whereType instead
    • Deprecated concatMap, use standard Stream asyncExpand.
    • Removed AsObservableFuture, MinFuture, MaxFuture, and WrappedFuture
      • This removes asObservable method in chains
      • Use default asStream method from the base Future class instead.
      • min and max now implemented directly on the Stream class

0.23.0-dev.3 #

  • Fix missing exports:
    • ValueStream
    • ReplayStream
    • ConnectableStream
    • ValueConnectableStream
    • ReplayConnectableStream

0.23.0-dev.2 #

  • Remove the Observable class. With extensions, you no longer need to wrap Streams in a [Stream]!
  • Convert all factories to static constructors to aid in discoverability of Stream classes
  • Move all factories to an Rx class.
  • Remove Observable.just, use Stream.value
  • Remove Observable.error, use Stream.error
  • Remove all tests that check base Stream methods
  • Subjects and *Observable classes extend Stream instead of base Observable
  • Rename *Observable to *Stream to reflect the fact they're just Streams.
    • ValueObservable -> ValueStream
    • ReplayObservable -> ReplayStream
    • ConnectableObservable -> ConnectableStream
    • ValueConnectableObservable -> ValueConnectableStream
    • ReplayConnectableObservable -> ReplayConnectableStream

0.23.0-dev.1 #

  • Feedback on this change appreciated as this is a dev release before 0.23.0 stable!
  • All transformation methods removed from Observable class
    • Transformation methods are now Extensions of the Stream class
    • Any Stream can make use of the transformation methods provided by RxDart
  • Observable class remains in place with factory methods to create different types of Streams
  • Removed deprecated ofType method, use whereType instead
  • Deprecated concatMap, use standard Stream asyncExpand.
  • Removed AsObservableFuture, MinFuture, MaxFuture, and WrappedFuture
    • This removes asObservable method in chains
    • Use default asStream method from the base Future class instead.
    • min and max now implemented directly on the Stream class

0.22.6 #

  • Bugfix: When listening multiple times to aBehaviorSubject that starts with an Error, it emits duplicate events.
  • Linter: public_member_api_docs is now used, we have added extra documentation where required.

0.22.5 #

  • Bugfix: DeferStream created Stream too early
  • Bugfix: TimerStream created Timer too early

0.22.4 #

  • Bugfix: switchMap controller no longer closes prematurely

0.22.3 #

  • Bugfix: whereType failing in Flutter production builds only

0.22.2 #

  • Bugfix: When using a seeded BehaviorSubject and adding an Error, upon listening, the BehaviorSubject emits null instead of the last Error.
  • Bugfix: calling cancel after a switchMap can cause a NoSuchMethodError.
  • Updated Flutter example to match the latest Flutter release
  • Observable.withLatestFrom is now expanded to accept 2 or more Streams thanks to Petrus Nguyễn Thái Học (@hoc081098)!
  • Deprecates ofType in favor of whereType, drop TypeToken.

0.22.1 #

Fixes following issues:

  • Erroneous behavior with scan and BehaviorSubject.
  • Bug where flatMap would cancel inner subscriptions in pause/resume.
  • Updates to make the current "pedantic" analyzer happy.

0.22.0 #

This version includes refactoring for the backpressure operators:

  • Breaking Change: debounce is now split into debounce and debounceTime.
  • Breaking Change: sample is now split into sample and sampleTime.
  • Breaking Change: throttle is now split into throttle and throttleTime.

0.21.0 #

  • Breaking Change: BehaviorSubject now has a separate factory constructor seeded() This allows you to seed this Subject with a null value.
  • Breaking Change: BehaviorSubject will now emit an Error, if the last event was also an Error. Before, when an Error occurred before a listen, the subscriber would not be notified of that Error. To refactor, simply change all occurences of BehaviorSubject(seedValue: value) to BehaviorSubject.seeded(value)
  • Added the groupBy operator
  • Bugix: doOnCancel: will now await the cancel result, if it is a Future.
  • Removed: bufferWithCount, windowWithCount, tween Please use bufferCount and windowCount, tween is removed, because it never was an official Rx spec.
  • Updated Flutter example to work with the latest Flutter stable.

0.20.0 #

  • Breaking Change: bufferCount had buggy behavior when using startBufferEvery (was skip previously) If you were relying on bufferCount with skip greater than 1 before, then you may have noticed erroneous behavior.
  • Breaking Change: repeat is no longer an operator which simply repeats the last emitted event n-times, instead this is now an Observable factory method which takes a StreamFactory and a count parameter. This will cause each repeat cycle to create a fresh Observable sequence.
  • mapTo is a new operator, which works just like map, but instead of taking a mapper Function, it takes a single value where each event is mapped to.
  • Bugfix: switchIfEmpty now correctly calls onDone
  • combineLatest and zip can now take any amount of Streams:
    • combineLatest2-9 & zip2-9 functionality unchanged, but now use a new path for construction.
    • adds combineLatest and zipLatest which allows you to pass through an Iterable<Stream
    • adds combineLatestList / zipList which allows you to take in an Iterable<Stream
    • Constructors are provided by the Stream implementation directly
  • Bugfix: Subjects that are transformed will now correctly return a new Observable where isBroadcast is true (was false before)
  • Remove deprecated operators which were replaced long ago: bufferWithCount, windowWithCount, amb, flatMapLatest

0.19.0 #

  • Breaking Change: Subjects onCancel function now returns void instead of Future to properly comply with the StreamController signature.
  • Bugfix: FlatMap operator properly calls onDone for all cases
  • Connectable Observable: An observable that can be listened to multiple times, and does not begin emitting values until the connect method is called
  • ValueObservable: A new interface that allows you to get the latest value emitted by an Observable.
    • Implemented by BehaviorSubject
    • Convert normal observables into ValueObservables via publishValue or shareValue
  • ReplayObservable: A new interface that allows you to get the values emitted by an Observable.
    • Implemented by ReplaySubject
    • Convert normal observables into ReplayObservables via publishReplay or shareReplay

0.18.1 #

  • Add retryWhen operator. Thanks to Razvan Lung (@long1eu)! This can be used for custom retry logic.

0.18.0 #

  • Breaking Change: remove retype method, deprecated as part of Dart 2.
  • Add flatMapIterable

0.17.0 #

  • Breaking Change: stream property on Observable is now private.
    • Avoids API confusion
    • Simplifies Subject implementation
    • Require folks who are overriding the stream property to use a super constructor instead
  • Adds proper onPause and onResume handling for amb/race, combineLatest, concat, concat_eager, merge and zip
  • Add switchLatest operator
  • Add errors and stacktraces to RetryError class
  • Add onErrorResume and onErrorRetryWith operators. These allow folks to return a specific stream or value depending on the error that occurred.

0.16.7 #

  • Fix new buffer and window implementation for Flutter + Dart 2
  • Subject now implements the Observable interface

0.16.6 #

  • Rework for buffer and window, allow to schedule using a sampler
  • added buffer
  • added bufferFuture
  • added bufferTest
  • added bufferTime
  • added bufferWhen
  • added window
  • added windowFuture
  • added windowTest
  • added windowTime
  • added windowWhen
  • added onCount sampler for buffer and window
  • added onFuture sampler for buffer and window
  • added onTest sampler for buffer and window
  • added onTime sampler for buffer and window
  • added onStream sampler for buffer and window

0.16.5 #

  • Renames amb to race
  • Renames flatMapLatest to switchMap
  • Renames bufferWithCount to bufferCount
  • Renames windowWithCount to windowCount

0.16.4 #

  • Adds bufferTime transformer.
  • Adds windowTime transformer.

0.16.3 #

  • Adds delay transformer.

0.16.2 #

  • Fix added events to sink are not processed correctly by Subjects.

0.16.1 #

  • Fix dematerialize method for Dart 2.

0.16.0+2 #

  • Add value to BehaviorSubject. Allows you to get the latest value emitted by the subject if it exists.
  • Add values to ReplayrSubject. Allows you to get the values stored by the subject if any exists.

0.16.0+1 #

  • Update Changelog

0.16.0 #

  • breaks backwards compatibility, this release only works with Dart SDK >=2.0.0.
  • Removed old cast in favour of the now native Stream cast method.
  • Override retype to return an Observable.

0.15.1 #

  • Add exhaustMap map to inner observable, ignore other values until that observable completes.
  • Improved code to be dartdevc compatible.
  • Add upper SDK version limit in pubspec

0.15.0 #

  • Change debounce to emit the last item of the source stream as soon as the source stream completes.
  • Ensure debounce does not keep open any addition async timers after it has been cancelled.

0.14.0+1 #

  • Change DoStreamTransformer to return a Future on cancel for api compatibility.

0.14.0 #

  • Add PublishSubject (thanks to @pauldemarco)
  • Fix bug with doOnX operators where callbacks were fired too often

0.13.1 #

  • Fix error with FlatMapLatest where it was not properly cancelled in some scenarios
  • Remove additional async methods on Stream handlers unless they're shown to solve a problem

0.13.0 #

  • Remove call operator / StreamTransformer entirely
  • Important bug fix: Errors thrown within any Stream or Operator will now be properly sent to the StreamSubscription.
  • Improve overall handling of errors throughout the library to ensure they're handled correctly

0.12.0 #

  • Added doOn* operators in place of call.
  • Added DoStreamTransformer as a replacement for CallStreamTransformer
  • Deprecated call and CallStreamTransformer. Please use the appropriate doOnX operator / transformer.
  • Added distinctUnique. Emits items if they've never been emitted before. Same as to Rx#distinct.

0.11.0 #

  • !!!Breaking Api Change!!!
    • Observable.groupBy has been removed in order to be compatible with the next version of the Stream class in Dart 1.24.0, which includes this method

0.10.2 #

  • BugFix: The new Subject implementation no longer causes infinite loops when used with ng2 async pipes.

0.10.1 #

  • Documentation fixes

0.10.0 #

  • Api Changes
    • Observable
      • Remove all deprecated methods, including:
        • observable factory -- replaced by the constructor new Observable()
        • combineLatest -- replaced by Strong-Mode versions combineLatest2 - combineLatest9
        • zip -- replaced by Strong-Mode versions zip2 - zip9
      • Support asObservable conversion from Future-returning methods. e.g. new Observable.fromIterable([1, 2]).first.asObservable()
      • Max and Min now return a Future of the Max or Min value, rather than a stream of increasing or decreasing values.
      • Add cast operator
      • Remove ConcatMapStreamTransformer -- functionality is already supported by asyncExpand. Keep the concatMap method as an alias.
    • Subjects
      • BehaviourSubject has been renamed to BehaviorSubject
      • The subjects have been rewritten and include far more testing
      • In keeping with the Rx idea of Subjects, they are broadcast-only
  • Documentation -- extensive documentation has been added to the library with explanations and examples for each Future, Stream & Transformer.
    • Docs detailing the differences between RxDart and raw Observables.

0.9.0 #

  • Api Changes:
    • Convert all StreamTransformer factories to proper classes
      • Ensure these classes can be re-used multiple times
    • Retry has moved from an operator to a constructor. This is to ensure the stream can be properly re-constructed every time in the correct way.
    • Streams now properly enforce the single-subscription contract
  • Include example Flutter app. To run it, please follow the instructions in the README.

0.8.3+1 #

  • rename examples map to example

0.8.3 #

  • added concatWith, zipWith, mergeWith, skipUntil
  • cleanup of the examples folder
  • cleanup of examples code
  • added fibonacci example
  • added search GitHub example

0.8.2+1 #

  • moved repo into ReactiveX
  • update readme badges accordingly

0.8.2 #

  • added materialize/dematerialize
  • added range (factory)
  • added timer (factory)
  • added timestamp
  • added concatMap

0.8.1 #

  • added never constructor
  • added error constructor
  • moved code coverage to codecov.io

0.8.0 #

  • BREAKING: tap is replaced by call(onData)
  • added call, which can take any combination of the following event methods: onCancel, onData, onDone, onError, onListen, onPause, onResume

0.7.1+1 #

  • improved the README file

0.7.1 #

  • added ignoreElements
  • added onErrorResumeNext
  • added onErrorReturn
  • added switchIfEmpty
  • added empty factory constructor

0.7.0 #

  • BREAKING: rename combineXXXLatest and zipXXX to a numbered equivalent, for example: combineThreeLatest becomes combineLatest3
  • internal refactoring, expose streams/stream transformers as a separate library

0.6.3+4 #

  • changed ofType to use TypeToken

0.6.3+3 #

  • added ofType

0.6.3+2 #

  • added defaultIfEmpty

0.6.3+1 #

  • changed concat, old concat is now concatEager, new concat behaves as expected

0.6.3 #

  • Added withLatestFrom
  • Added defer ctr (both thanks to brianegan)

0.6.2 #

  • Added just (thanks to brianegan)
  • Added groupBy
  • Added amb

0.6.1 #

  • Added concat

0.6.0 #

  • BREAKING: startWith now takes just one parameter instead of an Iterable. To add multiple starting events, please use startWithMany.
  • Added BehaviourSubject and ReplaySubject. These implement StreamController.
  • BehaviourSubject will notify the last added event upon listening.
  • ReplaySubject will notify all past events upon listening.
  • DEPRECATED: zip and combineLatest, use their strong-type-friendly alternatives instead (available as static methods on the Observable class, i.e. Observable.combineThreeLatest, Observable.zipFour, ...)

0.5.1 #

  • Added documentation (thanks to dustinlessard-wf)
  • Fix tests breaking due to deprecation of expectAsync
  • Fix tests to satisfy strong mode requirements

0.5.0 #

  • As of this version, rxdart depends on SDK v1.21.0, to support the newly added generic method type syntax

example/example.dart

import 'package:rxdart/rxdart.dart';

/// generate n-amount of fibonacci numbers
///
/// for example: dart fibonacci.dart 10
/// outputs:
/// 1: 1
/// 2: 1
/// 3: 2
/// 4: 3
/// 5: 5
/// 6: 8
/// 7: 13
/// 8: 21
/// 9: 34
/// 10: 55
/// done!
void main(List<String> arguments) {
  // read the command line argument, if none provided, default to 10
  var n = (arguments.length == 1) ? int.parse(arguments.first) : 10;

  // seed value: this value will be used as the
  // starting value for the [scan] method
  const seed = IndexedPair(1, 1, 0);

  Rx
          // amount of numbers to compute
          .range(1, n)
      // accumulator: computes a new accumulated
      // value each time a [Stream] event occurs
      // in this case, the accumulated value is always
      // the latest Fibonacci number
      .scan((IndexedPair seq, _, __) => IndexedPair.next(seq), seed)
      // finally, print the output
      .listen(print, onDone: () => print('done!'));
}

class IndexedPair {
  final int n1, n2, index;

  const IndexedPair(this.n1, this.n2, this.index);

  factory IndexedPair.next(IndexedPair prev) => IndexedPair(
      prev.n2, prev.index <= 1 ? prev.n1 : prev.n1 + prev.n2, prev.index + 1);

  @override
  String toString() => '$index: $n2';
}

Use this package as a library

1. Depend on it

Add this to your package's pubspec.yaml file:


dependencies:
  rxdart: ^0.23.1

2. Install it

You can install packages from the command line:

with pub:


$ pub get

with Flutter:


$ flutter pub get

Alternatively, your editor might support pub get or flutter pub get. Check the docs for your editor to learn more.

3. Import it

Now in your Dart code, you can use:


import 'package:rxdart/rxdart.dart';
  
Popularity:
Describes how popular the package is relative to other packages. [more]
100
Health:
Code health derived from static analysis. [more]
100
Maintenance:
Reflects how tidy and up-to-date the package is. [more]
100
Overall:
Weighted score of the above. [more]
100
Learn more about scoring.

We analyzed this package on Dec 13, 2019, and provided a score, details, and suggestions below. Analysis was completed with status completed using:

  • Dart: 2.7.0
  • pana: 0.13.1+4

Dependencies

Package Constraint Resolved Available
Direct dependencies
Dart SDK >=2.6.0 <3.0.0
Dev dependencies
build_runner >=1.7.0 <2.0.0
build_web_compilers >=2.7.0 <3.0.0
mockito >=3.0.0 <4.0.0
pedantic 1.9.0
stack_trace >=1.9.2 <2.0.0
test >=1.9.0 <2.0.0