Creates a new stream with the events of a stream per original event.
This acts like expand, except that convert returns a Stream
instead of an Iterable.
The events of the returned stream becomes the events of the returned
stream, in the order they are produced.
If convert returns null, no value is put on the output stream,
just as if it returned an empty stream.
The returned stream is a broadcast stream if this stream is.
Source
Stream asyncExpand(Stream convert(T event)) {
StreamController controller;
StreamSubscription subscription;
void onListen() {
assert(controller is _StreamController ||
controller is _BroadcastStreamController);
final eventSink = controller;
subscription = this.listen(
(T event) {
Stream newStream;
try {
newStream = convert(event);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (newStream != null) {
subscription.pause();
controller.addStream(newStream)
.whenComplete(subscription.resume);
}
},
onError: eventSink._addError, // Avoid Zone error replacement.
onDone: controller.close
);
}
if (this.isBroadcast) {
controller = new StreamController.broadcast(
onListen: onListen,
onCancel: () { subscription.cancel(); },
sync: true
);
} else {
controller = new StreamController(
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () { subscription.cancel(); },
sync: true
);
}
return controller.stream;
}