Creates a StreamTransformer.
The returned instance takes responsibility of implementing (bind).
When the user invokes bind it returns a new "bound" stream. Only when
the user starts listening to the bound stream, the listen method
invokes the given closure transformer.
The transformer closure receives the stream, that was bound, as argument
and returns a StreamSubscription. In almost all cases the closure
listens itself to the stream that is given as argument.
The result of invoking the transformer closure is a StreamSubscription.
The bound stream-transformer (created by the bind method above) then sets
the handlers it received as part of the listen call.
Conceptually this can be summarized as follows:
-
var transformer = new StreamTransformer(transformerClosure);creates aStreamTransformerthat supports thebindmethod. -
var boundStream = stream.transform(transformer);binds thestreamand returns a bound stream that has a pointer tostream. -
boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)starts the listening and transformation. This is accomplished in 2 steps: first theboundStreaminvokes thetransformerClosurewith thestreamit captured:transformerClosure(stream, b). The resultsubscription, a StreamSubscription, is then updated to receive its handlers:subscription.onData(f1),subscription.onError(f2),subscription(f3). Finally the subscription is returned as result of thelistencall.
There are two common ways to create a StreamSubscription:
-
by creating a new class that implements StreamSubscription. Note that the subscription should run callbacks in the Zone the stream was listened to.
-
by allocating a StreamController and to return the result of listening to its stream.
Example use of a duplicating transformer:
stringStream.transform(new StreamTransformer<String, String>(
(Stream<String> input, bool cancelOnError) {
StreamController<String> controller;
StreamSubscription<String> subscription;
controller = new StreamController<String>(
onListen: () {
subscription = input.listen((data) {
// Duplicate the data.
controller.add(data);
controller.add(data);
},
onError: controller.addError,
onDone: controller.close,
cancelOnError: cancelOnError);
},
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () { subscription.cancel(); },
sync: true);
return controller.stream.listen(null);
});
Source
const factory StreamTransformer(
StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
= _StreamSubscriptionTransformer;