MimeMultipart abstract class
A Mime Multipart class representing each part parsed by MimeMultipartTransformer. The data is streamed in as it become available.
abstract class MimeMultipart extends Stream<List<int>> { Map<String, String> get headers; }
Extends
Stream<List<int>> > MimeMultipart
Properties
final Future<T> first #
Returns the first element of the stream.
Stops listening to the stream after the first element has been received.
Internally the method cancels its subscription after the first element. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this getter.
If an error event occurs before the first data event, the resulting future is completed with that error.
If this stream is empty (a done event occurs before the first data event), the resulting future completes with a StateError.
Except for the type of the error, this method is equivalent to
this.elementAt(0)
.
Future<T> get first { _Future<T> future = new _Future<T>(); StreamSubscription subscription; subscription = this.listen( (T value) { _cancelAndValue(subscription, future, value); }, onError: future._completeError, onDone: () { future._completeError(new StateError("No elements")); }, cancelOnError: true); return future; }
final bool isBroadcast #
Reports whether this stream is a broadcast stream.
bool get isBroadcast => false;
final Future<bool> isEmpty #
Reports whether this stream contains any elements.
Stops listening to the stream after the first element has been received.
Internally the method cancels its subscription after the first element. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this getter.
Future<bool> get isEmpty { _Future<bool> future = new _Future<bool>(); StreamSubscription subscription; subscription = this.listen( (_) { _cancelAndValue(subscription, future, false); }, onError: future._completeError, onDone: () { future._complete(true); }, cancelOnError: true); return future; }
final Future<T> last #
Returns the last element of the stream.
If an error event occurs before the first data event, the resulting future is completed with that error.
If this stream is empty (a done event occurs before the first data event), the resulting future completes with a StateError.
Future<T> get last { _Future<T> future = new _Future<T>(); T result = null; bool foundResult = false; StreamSubscription subscription; subscription = this.listen( (T value) { foundResult = true; result = value; }, onError: future._completeError, onDone: () { if (foundResult) { future._complete(result); return; } future._completeError(new StateError("No elements")); }, cancelOnError: true); return future; }
final Future<int> length #
Counts the elements in the stream.
Future<int> get length { _Future<int> future = new _Future<int>(); int count = 0; this.listen( (_) { count++; }, onError: future._completeError, onDone: () { future._complete(count); }, cancelOnError: true); return future; }
final Future<T> single #
Returns the single element.
If an error event occurs before or after the first data event, the resulting future is completed with that error.
If this
is empty or has more than one element throws a StateError.
Future<T> get single { _Future<T> future = new _Future<T>(); T result = null; bool foundResult = false; StreamSubscription subscription; subscription = this.listen( (T value) { if (foundResult) { // This is the second element we get. Error error = new StateError("More than one element"); _cancelAndError(subscription, future, error, null); return; } foundResult = true; result = value; }, onError: future._completeError, onDone: () { if (foundResult) { future._complete(result); return; } future._completeError(new StateError("No elements")); }, cancelOnError: true); return future; }
Methods
Future<bool> any(bool test(T element)) #
Checks whether test accepts any element provided by this stream.
Completes the Future when the answer is known.
If this stream reports an error, the Future reports that error.
Stops listening to the stream after the first matching element has been found.
Internally the method cancels its subscription after this element. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this method.
Future<bool> any(bool test(T element)) { _Future<bool> future = new _Future<bool>(); StreamSubscription subscription; subscription = this.listen( (T element) { _runUserCode( () => test(element), (bool isMatch) { if (isMatch) { _cancelAndValue(subscription, future, true); } }, _cancelAndErrorClosure(subscription, future) ); }, onError: future._completeError, onDone: () { future._complete(false); }, cancelOnError: true); return future; }
Stream<T> asBroadcastStream({void onListen(StreamSubscription<T> subscription), void onCancel(StreamSubscription<T> subscription)}) #
Returns a multi-subscription stream that produces the same events as this.
If this stream is already a broadcast stream, it is returned unmodified.
If this stream is single-subscription, return a new stream that allows multiple subscribers. It will subscribe to this stream when its first subscriber is added, and will stay subscribed until this stream ends, or a callback cancels the subscription.
If onListen is provided, it is called with a subscription-like object that represents the underlying subscription to this stream. It is possible to pause, resume or cancel the subscription during the call to onListen. It is not possible to change the event handlers, including using StreamSubscription.asFuture.
If onCancel is provided, it is called in a similar way to onListen when the returned stream stops having listener. If it later gets a new listener, the onListen function is called again.
Use the callbacks, for example, for pausing the underlying subscription while having no subscribers to prevent losing events, or canceling the subscription when there are no listeners.
Stream<T> asBroadcastStream({ void onListen(StreamSubscription<T> subscription), void onCancel(StreamSubscription<T> subscription) }) { if (isBroadcast) return this; return new _AsBroadcastStream<T>(this, onListen, onCancel); }
Future<bool> contains(Object needle) #
Checks whether needle occurs in the elements provided by this stream.
Completes the Future when the answer is known. If this stream reports an error, the Future will report that error.
Future<bool> contains(Object needle) { _Future<bool> future = new _Future<bool>(); StreamSubscription subscription; subscription = this.listen( (T element) { _runUserCode( () => (element == needle), (bool isMatch) { if (isMatch) { _cancelAndValue(subscription, future, true); } }, _cancelAndErrorClosure(subscription, future) ); }, onError: future._completeError, onDone: () { future._complete(false); }, cancelOnError: true); return future; }
Stream<T> distinct([bool equals(T previous, T next)]) #
Skips data events if they are equal to the previous data event.
The returned stream provides the same events as this stream, except that it never provides two consequtive data events that are equal.
Equality is determined by the provided equals method. If that is omitted, the '==' operator on the last provided data element is used.
The returned stream is not a broadcast stream, even if this stream is.
Stream<T> distinct([bool equals(T previous, T next)]) { return new _DistinctStream(this, equals); }
Future drain([futureValue]) #
Discards all data on the stream, but signals when it's done or an error occured.
When subscribing using drain, cancelOnError will be true. This means that the future will complete with the first error on the stream and then cancel the subscription.
In case of a done
event the future completes with the given
futureValue.
Future drain([var futureValue]) => listen(null, cancelOnError: true) .asFuture(futureValue);
Future<T> elementAt(int index) #
Returns the value of the indexth data event of this stream.
Stops listening to the stream after the indexth data event has been received.
Internally the method cancels its subscription after these elements. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this method.
If an error event occurs before the value is found, the future completes with this error.
If a done event occurs before the value is found, the future completes with a RangeError.
Future<T> elementAt(int index) { if (index is! int || index < 0) throw new ArgumentError(index); _Future<T> future = new _Future<T>(); StreamSubscription subscription; subscription = this.listen( (T value) { if (index == 0) { _cancelAndValue(subscription, future, value); return; } index -= 1; }, onError: future._completeError, onDone: () { future._completeError(new RangeError.value(index)); }, cancelOnError: true); return future; }
Future<bool> every(bool test(T element)) #
Checks whether test accepts all elements provided by this stream.
Completes the Future when the answer is known. If this stream reports an error, the Future will report that error.
Future<bool> every(bool test(T element)) { _Future<bool> future = new _Future<bool>(); StreamSubscription subscription; subscription = this.listen( (T element) { _runUserCode( () => test(element), (bool isMatch) { if (!isMatch) { _cancelAndValue(subscription, future, false); } }, _cancelAndErrorClosure(subscription, future) ); }, onError: future._completeError, onDone: () { future._complete(true); }, cancelOnError: true); return future; }
Stream expand(Iterable convert(T value)) #
Creates a new stream from this stream that converts each element into zero or more events.
Each incoming event is converted to an Iterable of new events, and each of these new events are then sent by the returned stream in order.
The returned stream is not a broadcast stream, even if this stream is.
Stream expand(Iterable convert(T value)) { return new _ExpandStream<T, dynamic>(this, convert); }
Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) #
Finds the first element of this stream matching test.
Returns a future that is filled with the first element of this stream that test returns true for.
If no such element is found before this stream is done, and a defaultValue function is provided, the result of calling defaultValue becomes the value of the future.
Stops listening to the stream after the first matching element has been received.
Internally the method cancels its subscription after the first element that matches the predicate. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this method.
If an error occurs, or if this stream ends without finding a match and with no defaultValue function provided, the future will receive an error.
Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { _Future<dynamic> future = new _Future(); StreamSubscription subscription; subscription = this.listen( (T value) { _runUserCode( () => test(value), (bool isMatch) { if (isMatch) { _cancelAndValue(subscription, future, value); } }, _cancelAndErrorClosure(subscription, future) ); }, onError: future._completeError, onDone: () { if (defaultValue != null) { _runUserCode(defaultValue, future._complete, future._completeError); return; } future._completeError(new StateError("firstMatch ended without match")); }, cancelOnError: true); return future; }
Future fold(initialValue, combine(previous, T element)) #
Reduces a sequence of values by repeatedly applying combine.
Future fold(var initialValue, combine(var previous, T element)) { _Future result = new _Future(); var value = initialValue; StreamSubscription subscription; subscription = this.listen( (T element) { _runUserCode( () => combine(value, element), (newValue) { value = newValue; }, _cancelAndErrorClosure(subscription, result) ); }, onError: (e, st) { result._completeError(e, st); }, onDone: () { result._complete(value); }, cancelOnError: true); return result; }
Future forEach(void action(T element)) #
Executes action on each data event of the stream.
Completes the returned Future when all events of the stream have been processed. Completes the future with an error if the stream has an error event, or if action throws.
Future forEach(void action(T element)) { _Future future = new _Future(); StreamSubscription subscription; subscription = this.listen( (T element) { _runUserCode( () => action(element), (_) {}, _cancelAndErrorClosure(subscription, future) ); }, onError: future._completeError, onDone: () { future._complete(null); }, cancelOnError: true); return future; }
Stream<T> handleError(Function onError, {bool test(error)}) #
Creates a wrapper Stream that intercepts some errors from this stream.
If this stream sends an error that matches
test, then it is intercepted
by the handle
function.
The
onError callback must be of type void onError(error)
or
void onError(error, StackTrace stackTrace)
. Depending on the function
type the the stream either invokes
onError with or without a stack
trace. The stack trace argument might be null
if the stream itself
received an error without stack trace.
An asynchronous error e
is matched by a test function if test(e)
returns true. If
test is omitted, every error is considered matching.
If the error is intercepted, the handle
function can decide what to do
with it. It can throw if it wants to raise a new (or the same) error,
or simply return to make the stream forget the error.
If you need to transform an error into a data event, use the more generic Stream.transform to handle the event by writing a data event to the output sink
The returned stream is not a broadcast stream, even if this stream is.
Stream<T> handleError(Function onError, { bool test(error) }) { return new _HandleErrorStream<T>(this, onError, test); }
Future<String> join([String separator = ""]) #
Collects string of data events' string representations.
If separator is provided, it is inserted between any two elements.
Any error in the stream causes the future to complete with that error. Otherwise it completes with the collected string when the "done" event arrives.
Future<String> join([String separator = ""]) { _Future<String> result = new _Future<String>(); StringBuffer buffer = new StringBuffer(); StreamSubscription subscription; bool first = true; subscription = this.listen( (T element) { if (!first) { buffer.write(separator); } first = false; try { buffer.write(element); } catch (e, s) { _cancelAndError(subscription, result, e, s); } }, onError: (e) { result._completeError(e); }, onDone: () { result._complete(buffer.toString()); }, cancelOnError: true); return result; }
Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) #
Finds the last element in this stream matching test.
As firstWhere, except that the last matching element is found. That means that the result cannot be provided before this stream is done.
Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { _Future<dynamic> future = new _Future(); T result = null; bool foundResult = false; StreamSubscription subscription; subscription = this.listen( (T value) { _runUserCode( () => true == test(value), (bool isMatch) { if (isMatch) { foundResult = true; result = value; } }, _cancelAndErrorClosure(subscription, future) ); }, onError: future._completeError, onDone: () { if (foundResult) { future._complete(result); return; } if (defaultValue != null) { _runUserCode(defaultValue, future._complete, future._completeError); return; } future._completeError(new StateError("lastMatch ended without match")); }, cancelOnError: true); return future; }
abstract StreamSubscription<T> listen(void onData(T event), {Function onError, void onDone(), bool cancelOnError}) #
Adds a subscription to this stream.
On each data event from this stream, the subscriber's onData handler is called. If onData is null, nothing happens.
On errors from this stream, the onError handler is given a object describing the error.
The
onError callback must be of type void onError(error)
or
void onError(error, StackTrace stackTrace)
. If
onError accepts
two arguments it is called with the stack trace (which could be null
if
the stream itself received an error without stack trace).
Otherwise it is called with just the error object.
If this stream closes, the onDone handler is called.
If cancelOnError is true, the subscription is ended when the first error is reported. The default is false.
Stream map(convert(T event)) #
Creates a new stream that converts each element of this stream to a new value using the convert function.
The returned stream is not a broadcast stream, even if this stream is.
Stream map(convert(T event)) { return new _MapStream<T, dynamic>(this, convert); }
Future pipe(StreamConsumer<T> streamConsumer) #
Binds this stream as the input of the provided StreamConsumer.
Future pipe(StreamConsumer<T> streamConsumer) { return streamConsumer.addStream(this).then((_) => streamConsumer.close()); }
Future<T> reduce(T combine(T previous, T element)) #
Reduces a sequence of values by repeatedly applying combine.
Future<T> reduce(T combine(T previous, T element)) { _Future<T> result = new _Future<T>(); bool seenFirst = false; T value; StreamSubscription subscription; subscription = this.listen( (T element) { if (seenFirst) { _runUserCode(() => combine(value, element), (T newValue) { value = newValue; }, _cancelAndErrorClosure(subscription, result)); } else { value = element; seenFirst = true; } }, onError: result._completeError, onDone: () { if (!seenFirst) { result._completeError(new StateError("No elements")); } else { result._complete(value); } }, cancelOnError: true ); return result; }
Future<T> singleWhere(bool test(T element)) #
Finds the single element in this stream matching test.
Like lastMatch
, except that it is an error if more than one
matching element occurs in the stream.
Future<T> singleWhere(bool test(T element)) { _Future<T> future = new _Future<T>(); T result = null; bool foundResult = false; StreamSubscription subscription; subscription = this.listen( (T value) { _runUserCode( () => true == test(value), (bool isMatch) { if (isMatch) { if (foundResult) { _cancelAndError( subscription, future, new StateError('Multiple matches for "single"'), null); return; } foundResult = true; result = value; } }, _cancelAndErrorClosure(subscription, future) ); }, onError: future._completeError, onDone: () { if (foundResult) { future._complete(result); return; } future._completeError(new StateError("single ended without match")); }, cancelOnError: true); return future; }
Stream<T> skip(int count) #
Skips the first count data events from this stream.
The returned stream is not a broadcast stream, even if this stream is.
Stream<T> skip(int count) { return new _SkipStream(this, count); }
Stream<T> skipWhile(bool test(T element)) #
Skip data events from this stream while they are matched by test.
Error and done events are provided by the returned stream unmodified.
Starting with the first data event where test returns false for the event data, the returned stream will have the same events as this stream.
The returned stream is not a broadcast stream, even if this stream is.
Stream<T> skipWhile(bool test(T element)) { return new _SkipWhileStream(this, test); }
Stream<T> take(int count) #
Provides at most the first n
values of this stream.
Forwards the first n
data events of this stream, and all error
events, to the returned stream, and ends with a done event.
If this stream produces fewer than count values before it's done, so will the returned stream.
Stops listening to the stream after the first n
elements have been
received.
Internally the method cancels its subscription after these elements. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this method.
The returned stream is not a broadcast stream, even if this stream is.
Stream<T> take(int count) { return new _TakeStream(this, count); }
Stream<T> takeWhile(bool test(T element)) #
Forwards data events while test is successful.
The returned stream provides the same events as this stream as long
as
test returns true
for the event data. The stream is done
when either this stream is done, or when this stream first provides
a value that
test doesn't accept.
Stops listening to the stream after the accepted elements.
Internally the method cancels its subscription after these elements. This means that single-subscription (non-broadcast) streams are closed and cannot be reused after a call to this method.
The returned stream is not a broadcast stream, even if this stream is.
Stream<T> takeWhile(bool test(T element)) { return new _TakeWhileStream(this, test); }
Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) #
Creates a new stream with the same events as this stream.
Whenever more than timeLimit passes between two events from this stream, the onTimeout function is called.
The countdown doesn't start until the returned stream is listened to. The countdown is reset every time an event is forwarded from this stream, or when the stream is paused and resumed.
The
onTimeout function is called with one argument: an
EventSink that allows putting events into the returned stream.
This EventSink
is only valid during the call to onTimeout
.
If onTimeout
is omitted, a timeout will just put a TimeoutException
into the error channel of the returned stream.
The returned stream is not a broadcast stream, even if this stream is.
Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) { StreamSubscription<T> subscription; _StreamController controller; // The following variables are set on listen. Timer timer; Zone zone; Function timeout; void onData(T event) { timer.cancel(); controller.add(event); timer = zone.createTimer(timeLimit, timeout); } void onError(error, StackTrace stackTrace) { timer.cancel(); controller.addError(error, stackTrace); timer = zone.createTimer(timeLimit, timeout); } void onDone() { timer.cancel(); controller.close(); } controller = new _SyncStreamController( () { // This is the onListen callback for of controller. // It runs in the same zone that the subscription was created in. // Use that zone for creating timers and running the onTimeout // callback. zone = Zone.current; if (onTimeout == null) { timeout = () { controller.addError(new TimeoutException("No stream event", timeLimit)); }; } else { onTimeout = zone.registerUnaryCallback(onTimeout); _ControllerEventSinkWrapper wrapper = new _ControllerEventSinkWrapper(null); timeout = () { wrapper._sink = controller; // Only valid during call. zone.runUnaryGuarded(onTimeout, wrapper); wrapper._sink = null; }; } subscription = this.listen(onData, onError: onError, onDone: onDone); timer = zone.createTimer(timeLimit, timeout); }, () { timer.cancel(); subscription.pause(); }, () { subscription.resume(); timer = zone.createTimer(timeLimit, timeout); }, () { timer.cancel(); Future result = subscription.cancel(); subscription = null; return result; }); return controller.stream; }
Future<List<T>> toList() #
Collects the data of this stream in a List.
Future<List<T>> toList() { List<T> result = <T>[]; _Future<List<T>> future = new _Future<List<T>>(); this.listen( (T data) { result.add(data); }, onError: future._completeError, onDone: () { future._complete(result); }, cancelOnError: true); return future; }
Future<Set<T>> toSet() #
Collects the data of this stream in a Set.
Future<Set<T>> toSet() { Set<T> result = new Set<T>(); _Future<Set<T>> future = new _Future<Set<T>>(); this.listen( (T data) { result.add(data); }, onError: future._completeError, onDone: () { future._complete(result); }, cancelOnError: true); return future; }
Stream transform(StreamTransformer<T, dynamic> streamTransformer) #
Chains this stream as the input of the provided StreamTransformer.
Returns the result of streamTransformer.bind
itself.
Stream transform(StreamTransformer<T, dynamic> streamTransformer) { return streamTransformer.bind(this); }
Stream<T> where(bool test(T event)) #
Creates a new stream from this stream that discards some data events.
The new stream sends the same error and done events as this stream, but it only sends the data events that satisfy the test.
The returned stream is not a broadcast stream, even if this stream is.
Stream<T> where(bool test(T event)) { return new _WhereStream<T>(this, test); }