Module Lwt_stream
Data streams
type 'a tA stream holding values of type
'a.Naming convention: in this module, all functions applying a function to each element of a stream are suffixed by:
_swhen the function returns a thread and calls are serialised_pwhen the function returns a thread and calls are parallelised
Construction
val from : (unit -> 'a option Lwt.t) -> 'a tfrom fcreates a stream from the given input function.fis called each time more input is needed, and the stream ends whenfreturnsNone.If
f, or the thread produced byf, raises an exception, that exception is forwarded to the consumer of the stream (for example, a caller ofget). Note that this does not end the stream. A subsequent attempt to read from the stream will cause another call tof, which may succeed with a value.
val from_direct : (unit -> 'a option) -> 'a tfrom_direct fdoes the same asfrombut with a function that does not return a thread. It is preferred that this function be used rather than wrappingfinto a function which returns a thread.The behavior when
fraises an exception is the same as forfrom, except thatfdoes not produce a thread.
exceptionClosedException raised by the push function of a push-stream when pushing an element after the end of stream (
= None) has been pushed.
val create : unit -> 'a t * ('a option -> unit)create ()returns a new stream and a push function.To notify the stream's consumer of errors, either use a separate communication channel, or use a
resultstream. There is no way to push an exception into a push-stream.
val create_with_reference : unit -> 'a t * ('a option -> unit) * ('b -> unit)create_with_reference ()returns a new stream and a push function. The last function allows a reference to be set to an external source. This prevents the external source from being garbage collected.For example, to convert a reactive event to a stream:
let stream, push, set_ref = Lwt_stream.create_with_reference () in set_ref (map_event push event)
exceptionFullException raised by the push function of a bounded push-stream when the stream queue is full and a thread is already waiting to push an element.
class type 'a bounded_push = object ... endType of sources for bounded push-streams.
val create_bounded : int -> 'a t * 'a bounded_pushcreate_bounded sizereturns a new stream and a bounded push source. The stream can hold a maximum ofsizeelements. When this limit is reached, pushing a new element will block until one is consumed.Note that you cannot clone or parse (with
parse) a bounded stream. These functions will raiseInvalid_argumentif you try to do so.It raises
Invalid_argumentifsize < 0.
val of_seq : 'a Stdlib.Seq.t -> 'a tof_seq screates a stream returning all elements ofs. The elements are evaluated fromsand pushed onto the stream as the stream is consumed.- since
- 4.2.0
val of_list : 'a list -> 'a tof_list lcreates a stream returning all elements ofl. The elements are pushed into the stream immediately, resulting in a closed stream (in the sense ofis_closed).
val of_array : 'a array -> 'a tof_array acreates a stream returning all elements ofa. The elements are pushed into the stream immediately, resulting in a closed stream (in the sense ofis_closed).
val of_string : string -> char tof_string strcreates a stream returning all characters ofstr. The characters are pushed into the stream immediately, resulting in a closed stream (in the sense ofis_closed).
val clone : 'a t -> 'a tclone stclone the given stream. Operations on each stream will not affect the other.For example:
# let st1 = Lwt_stream.of_list [1; 2; 3];; val st1 : int Lwt_stream.t = <abstr> # let st2 = Lwt_stream.clone st1;; val st2 : int Lwt_stream.t = <abstr> # lwt x = Lwt_stream.next st1;; val x : int = 1 # lwt y = Lwt_stream.next st2;; val y : int = 1It raises
Invalid_argumentifstis a bounded push-stream.
Destruction
Data retrieval
val peek : 'a t -> 'a option Lwt.tpeek streturns the first element of the stream, if any, without removing it.
val npeek : int -> 'a t -> 'a list Lwt.tnpeek n streturns at most the firstnelements ofst, without removing them.
val get : 'a t -> 'a option Lwt.tget stremoves and returns the first element of the stream, if any.
val nget : int -> 'a t -> 'a list Lwt.tnget n stremoves and returns at most the firstnelements ofst.
val get_while : ('a -> bool) -> 'a t -> 'a list Lwt.tval get_while_s : ('a -> bool Lwt.t) -> 'a t -> 'a list Lwt.tget_while f streturns the longest prefix ofstwhere all elements satisfyf.
val next : 'a t -> 'a Lwt.tnext stremoves and returns the next element of the stream or fails withEmpty, if the stream is empty.
val last_new : 'a t -> 'a Lwt.tlast_new streturns the last element that can be obtained without sleeping, or wait for one if none is available.It fails with
Emptyif the stream has no more elements.
val junk_while : ('a -> bool) -> 'a t -> unit Lwt.tval junk_while_s : ('a -> bool Lwt.t) -> 'a t -> unit Lwt.tjunk_while f stremoves all elements at the beginning of the streams which satisfyf.
val junk_old : 'a t -> unit Lwt.tjunk_old stremoves all elements that are ready to be read without yielding fromst.For example, the
read_passwordfunction ofLwt_read_lineuses it to flush keys previously typed by the user.
val get_available : 'a t -> 'a listget_available streturns all available elements oflwithout blocking.
val get_available_up_to : int -> 'a t -> 'a listget_available_up_to n streturns up tonelements oflwithout blocking.
val is_closed : 'a t -> boolis_closed streturns whether the given stream has been closed. A closed stream is not necessarily empty. It may still contain unread elements. Ifis_closed s = true, then all subsequent reads until the end of the stream are guaranteed not to block.- since
- 2.6.0
val closed : 'a t -> unit Lwt.tclosed streturns a thread that will sleep until the stream has been closed.- since
- 2.6.0
val on_termination : 'a t -> (unit -> unit) -> uniton_termination st fexecutesfwhen the end of the streamstis reached. Note that the stream may still contain elements ifpeekor similar was used.- deprecated
Use
closed.
val on_terminate : 'a t -> (unit -> unit) -> unitSame as
on_termination.- deprecated
Use
closed.
Stream transversal
val choose : 'a t list -> 'a tchoose lcreates an stream from a list of streams. The resulting stream will return elements returned by any stream oflin an unspecified order.
val map : ('a -> 'b) -> 'a t -> 'b tval map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b tmap f stmaps the value returned bystwithf
val filter : ('a -> bool) -> 'a t -> 'a tval filter_s : ('a -> bool Lwt.t) -> 'a t -> 'a tfilter f stkeeps only values,x, such thatf xistrue
val filter_map : ('a -> 'b option) -> 'a t -> 'b tval filter_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b tfilter_map f stfilter and mapstat the same time
val map_list : ('a -> 'b list) -> 'a t -> 'b tval map_list_s : ('a -> 'b list Lwt.t) -> 'a t -> 'b tmap_list f stappliesfon each element ofstand flattens the lists returned
val fold : ('a -> 'b -> 'b) -> 'a t -> 'b -> 'b Lwt.tval fold_s : ('a -> 'b -> 'b Lwt.t) -> 'a t -> 'b -> 'b Lwt.tfold f s xfold_like function for streams.
val iter : ('a -> unit) -> 'a t -> unit Lwt.tval iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.tval iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.titer f siterates over all elements of the stream.
val iter_n : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.titer_n ?max_concurrency f siterates over all elements of the streams. Iteration is performed concurrently with up tomax_threadsconcurrent instances off.Iteration is not guaranteed to be in order as this function will attempt to always process
max_concurrencyelements fromsat once.- parameter max_concurrency
defaults to
1.
- raises Invalid_argument
if
max_concurrency < 1.
- since
- 3.3.0
val find : ('a -> bool) -> 'a t -> 'a option Lwt.tval find_s : ('a -> bool Lwt.t) -> 'a t -> 'a option Lwt.tfind f sfind an element in a stream.
val find_map : ('a -> 'b option) -> 'a t -> 'b option Lwt.tval find_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b option Lwt.tfind_map f sfind and map at the same time.
val combine : 'a t -> 'b t -> ('a * 'b) tcombine s1 s2combines two streams. The stream will end when either stream ends.
val append : 'a t -> 'a t -> 'a tappend s1 s2returns a stream which returns all elements ofs1, then all elements ofs2
val wrap_exn : 'a t -> 'a Lwt.result twrap_exn sis a streams'such that each timesyields a valuev,s'yieldsResult.Ok v, and when the source ofsraises an exceptione,s'yieldsResult.Error e.Note that push-streams (as returned by
create) never raise exceptions.If the stream source keeps raising the same exception
eeach time the stream is read,s'is unbounded. Reading it will produceResult.Error eindefinitely.- since
- 2.7.0
Parsing
Misc
Deprecated
type 'a result=|Value of 'a|Error of exnA value or an error.
- deprecated
Replaced by
wrap_exn, which usesLwt.result.
val map_exn : 'a t -> 'a result tmap_exn sreturns a stream that captures all exceptions raised by the source of the stream (the function passed tofrom).Note that for push-streams (as returned by
create) all elements of the mapped streams are values.If the stream source keeps raising the same exception
eeach time the stream is read, the stream produced bymap_exnis unbounded. Reading it will produceLwt_stream.Error eindefinitely.- deprecated
Use
wrap_exn.