Questions for Streams

So you're a stream huh? How's that working out for you?

So you’ve encountered a stream! Congratulations, not everyone gets to work with streams. They are curious and divisive creatures, hiding in the shadows of domain specific models all across program land. They can be friendly or foey, so it’s best to understand them quickly. Here are some questions you should ask a stream.

Can I sample you?

Some streams can be sampled at any time. As in:

const x = new MysteryStream()
// unknown code
const currentVal = x.getCurrent()


currentVal <- getCurrent x

If the stream comes with an api like this, then you know the stream is storing a value we can unwrap it in an uncomplicated way.

Is it guaranteed?

Not all streams have a starting value. This means that a sample-able stream might guarantee a value or not.

// Do we need to do a null check?
// Or can we just assume there is a value in there?
const currentVal = x.getCurrent()

Of course, this can be enforced with types.

currentVal :: Maybe _ <- getCurrent x

Though you may need to ask:

Am I seeing a stream with a starting value that happens to be a Maybe? Or must it be a Maybe because there is a chance nothing is in there yet?

Can I subscribe to you?

Of course, you can hun. Actually maybe not. Not all streams can be subscribed to. For example, pull streams cannot be subscribed to since they don’t know about changes until asked (like some other creatures I know).

x.subscribe(y => {
    // unseen code

This does not mean there must be a subscribe function in the api. Other schemes for subscription exist like:

execute :: MysteryStream (IO ()) -> IO ()

If such a thing exists, where a stream containing thunks can be executed, then subscribe might be described in terms of map. For example:

x :: MysteryStream Int

f :: Int -> IO ()
f i = print $ "It was: " ++ show i ++ " I swear."

execute $ f <$> x


execute( => () => {
    // untold effects on the world

This technique is less common in languages without managed IO (almost all languages.) You could still do this in JavaScript as described, or with Ramda Fantasy’s IO Monad. If you are in JavaScript you should be doing this anyway (but you’re, not are you?)

Can I nest you?

Or better put, are you higher order?

const x = new MysteryStream()
const y = new MysteryStream(x)



join :: MysteryStream (MysteryStream a) -> MysteryStream a

The implication here is big. If it’s possible to nest a stream inside a stream, then you essentially have a switch. You could have two streams and decide which one we use based on a third stream!

x :: MysteryStream Int
y :: MysteryStream Int
z :: MysteryStream Bool

xOrY :: MysteryStream Int
xOrY = join $ (\z -> if z then x else y) <$> z

Can I push you?

Hey buddy, not like that. Some streams can be written to after the fact, others cannot.

const x = new MysteryStream()
x.subscribe(y => console.log(y))

// "3" will print to the console

It is not required that you can subscribe for push to work. The above is just an example. The question is, can this stream be written to in some way? I.E., is it actually a stream reference? Note; if your stream can be pushed to in this way, you are not doing functional reactive programming.

Are you Applicative?

Some streams can be combined with other streams using a function. For example

const x = new MysteryStream(1)
const y = new MysteryStream("foo")

const z = zipWith((a,b) => a + b)
x :: MysteryStream Int
y :: MysteryStream String

z :: MysteryStream String
z = (\a b -> show a ++ b) <$> x <*> y

Even if we can’t nest the streams we might be able to combine them with a function.

Are you pressurized?

If it’s a push stream we might need to handle back pressure. Relieving back pressure can take hours of expensive massage therapy, and should be avoided if possible. If you have untreated back pressure your stream may drop values. And no one likes that.

Are you Monoidal?

Can we combine streams of the same type?

x :: MysteryStream Int
y :: MysteryStream Int

z :: MysteryStream Int
z = x <> y
const z = x.merge(y)

Something like that. Can we take two streams of the same type and combine them into a new stream? Is that combinator associative? Is it possible to have an empty stream?

If the answer to all of these is true, then the stream is a Monoid.

What if they fire at the same time?

If your stream has such an operator, then what happens if two input streams fire at exactly the same time? Will one of them win? Will it be the left stream or the right stream?

Are you Foldable?

Some streams can be folded into a new stream encapsulating state. For example:

x :: MysteryStream Int

sumOfx :: MysteryStream Int
sumOfx :: foldp (+) 0 x

As x fires, sumOfx will fire with the running sum of all values that x has fired. This is stateful in nature, and not all streams can do this.

If you are a JavaScript person you might be thinking about Redux right now. And you are right!

Are you Traversable?

No. Streams are never traversable. What is wrong with you?

Do you have relatives?

Some streams are not alone in the world. Some streams are friends with other streams, and they have relationships. Sometimes those relationships go well, other times they end up staring at each other from across the dinner table in silent disdain.

One interesting abstraction is to combine a push stream with a pull stream, into a third stream type.

Or perhaps you can combine different types of streams in other ways. It’s worth finding out.

At this point

The stream should have gone to sleep. Tiring of all your questioning. This is the perfect time to capture the stream and add it to your toolkit for future use.

x :: MysteryStream a

annoy = do
    _ <- sleep 10000 x
    let toolkit' = x:toolkit
    -- unknown usages