sporadic.streams
Reactive streams on top of Promises.
This submodule provides an implementation glitch-free (i.e, no lost events) and leak-free (i.e, garbage collection is possible) for reactive programming. The minimal abstraction is called stream, used to associate many producers to many consumers.
Although not real-time (a.k.a, the synchronous / instantaneous axiom /
assumption found on Synchronous Imperative languages, such as Lucid), this
sporadic.streams
submodule offers asynchronous reactivity without glitches.
Future plans include composable stream operations interacting with Channels,
Promises, etc, in the same sense of Perl 6’s Supply class.
The API revolves around an async
/ await
style, and thus, a promise / future
concurrency model. Consumers read deterministically published values (the same
stream point yields the same event), while producers may non-deterministically
race / dispute the order of events arrival. Streams are closed for further writes
and reads by breaking / rejecting the stream in a promise-style through the
close operation.
Garbage collection comes for free by implementing a linked list of streamed /
promisified next
nodes. That is, if the current node is not referenced anymore
and the next node is resolved (by sending some value into the current one), so
the current node becomes prone to garbage collection. So, in short terms, pay
attention on your stream point references, and discard them whenever you can and
whenever you don’t need them anymore.
API Usage
To load this submodule:
const sporadicStreams = require('sporadic').streams
If you’re using the browser build from UNPKG, the sporadic
module will be
available on the global scope of your page. In this context, just replace the
sporadicStreams
variable by the projection/expression sporadic.streams
in
all the examples below.
Creates a new reactive stream:
const stream = await sporadicStreams.open()
Write on a reactive stream:
const next = await sporadicStreams.push(stream, value)
Where next
is the next stream to send values onto. This operation may fail due
a sent close signal. You can reuse many times the stream
reference instead of
tracking next
, for example:
await sporadicStreams.push(stream, 'Hello, Mike!')
await sporadicStreams.push(stream, 'Are you fine?')
await sporadicStreams.push(stream, 'See you later!')
Although it’s a discouraged pattern, ‘cause it forces you to keep all the stream point references in-memory, without discarding any of them, a huge risk of memory leaking for long-running applications. Don’t rely on that except for short running tests. You have read this warning. The good solution, so, is a threading of references (the example below show how it’s possible):
const stream1 = await sporadicStreams.push(stream0, 'Hello, Mike!')
const stream2 = await sporadicStreams.push(stream1, 'Are you fine?')
const stream3 = await sporadicStreams.push(stream2, 'See you later!')
// ...
Through non-determinism on writes, clients can get the next stream point in unpredictable ways (only if there’s more than one writer). The threading above seems deterministic cause there’s no race. It allows garbage collection, only if further references aren’t part of the same scope – the best solution here, tho, would be to iterate the generated stream points through a loop, or use them together with generators+promises (as a task scheduler).
To consume / read a value:
// may throw reason
const { current, next } = await sporadicStreams.pull(stream)
Where next
is the next stream point to read future pushed values, and
current
is the current pushed value at this stream point. This operation may
fail due a sent close signal. The same stream point here would yield the same
current value and the same next stream point.
To destroy an active stream:
// throws reason
await sporadicStreams.close(stream)
Previous clients might not be up-to-date (due late computations), so they will
keep reading values until an Error
is available, then they will break /
fail with that. Further calls on close
are ignored, so close is
idempotent no matter how many races occur (that is, multiple API clients calling
that operation). The Error
message will be the same for every thrown error,
but don’t rely on that message content, it may be prone to future changes.
A ticker is also provided. It fires true
in some given interval. To create
such stream which ticks periodical signals, use the every
operation:
const tickerStream = await sporadicStreams.every(milliseconds)
The interval argument is under milliseconds basis to comply with the well-known
setTimeout
and setInterval
JavaScript functions. To stop the ticker stream
from firing further events, just call close
on such stream (it will also
dispose the internal interval timer associated within).
‘Cause streams resemble quite well lists, there’s also some combinators provided
here. They’re map
and filter
:
const succ = number => number + 1
const even = number => (number % 2) === 0
const producer = await sporadicStreams.open()
const consumer = await sporadicStreams.map(producer, succ)
const filtered = await sporadicStreams.filter(consumer, even)
// fire values/events from producer stream to follow down
// them on filtered stream
// ...
Whenever the parent/origin stream is closed, the children/result streams are
closed as well. In the case above, if producer
is closed, both consumer
and
filtered
will be closed too. And if just consumer
is closed, filtered
will
be closed but not producer
– here, the fired values within producer
will be
ignored for both closed result streams.
To merge two streams in a non-deterministic way, use the merge
operation:
const leftStream = await sporadicStreams.open()
const rightStream = await sporadicStreams.open()
const mergedStream = await sporadicStreams.merge(leftStream, rightStream)
// ...
Here, mergedStream
will contain all the values/events from both leftStream
and
rightStream
(in an unpredictable order, clearly). The mergedStream
is automatically
closed when both input streams are closed beforehand (but nothing prevents us from closing
that thing manually). Pay attention that the merge
operation only listen from events since
this actual stream point onwards. That is, past events are lost - this is a property of stream
points as a whole for every client coming up with an already ongoing stream point. You can
persist the whole thing by using the initial stream point here, tho.
To import all operations on the current scope, you can use the following pattern (from modern JS):
const {
open, push, close, pull,
every, map, filter, merge
} = require('sporadic').streams