I've been thinking about what a native Haskell middleware would look like for a while. One thing seemed obvious: it was going to have to be event-based. So I started thinking about event
streams.
data Event a = Event {
eventTime :: UTCTime
eventValue :: a
} deriving (Show, Eq)
instance Functor Event where
fmap f (Event t v) = Event t (f v)
-- | True if the first event occurs strictly before the second.
isBefore :: Event a → Event b → Bool
isBefore ev1 ev2 = eventTime ev1
The simplest representation of a stream of events is just an infinite list, so you can define an event processor as a function from one stream to another:
type EventProcessor a = [Event a] -> [Event a]
Unfortunately this won't work. Suppose that your program is going to emit an event e1
at time t1
, but an incoming event before t1
would make your program do something else. In order to decide whether the next item in the output list is e1
you need to look at the time of the next input event. But since that event hasn't happened yet you have to wait for it. If it arrives after t1
then you have missed the deadline.
What is needed is a way to reify the two possibilities here: either the next input event is before t1
or it is after it. This leads to the following data structures:
data EventStream b c =
Emit {esEmit :: Event c, esFuture :: EventStream b c, esHandle :: RTSP b c}
| Wait {esHandle :: RTSP b c} deriving Show
newtype RTSP b c = RTSP {runRTSP :: Event b → EventStream b c}
The "Emit" case embodies these two possibilities. An interpreter function will process this as follows:
- If no input event intervenes then it will emit the event
esEmit
and call
itself on esFuture
.
- On the other hand if an input event arrives before
esEmit
then the function
in esHandle
gets called instead, generating a replacement EventStream
representing a different future.
- Finally the
Wait
case allows the event stream to do nothing until an event
arrives.
The RTSP
stands for "Real Time Stream Processor". It also handles the problem of what an event stream is supposed to do before the first event arrives.
Interestingly, both EventStream
and RTSP
are Categories. The composition operator for EventStream
is simple in principle, although in
practice all the different combinations of value and timings make it rather fiddly. Here it is,along with a block comment that explains the logic if you are really interested.
{-
The (.) operator for EventStream has to deal with several scenarios:
1: (Wait k2)∘(Wait k1). This is simple because the only possible event
is an input that is piped through the two RTSPs.
2: (Wait k2)∘es1@(Emit e1 es1a k1). In this case there are three timelines:
a) e1 : (k2 e1)∘es1a -- Event e1 is passed to k2 and the result composed with es1a.
b) ev e1: (Wait k2)∘(k1 ev) -- e1 never happens. Instead the input is passed to k1, which
generates a new event stream to be composed with k2.
The code for case b is the "k" value in the "let" clause.
3: es2@(Emit e2 es2a k2) es1@(Emit e1 es1a k1). The logic here is similar to scenario
2, except that the timing of e2 has to be taken into account as well. There are five basic
timelines. Let ev = the next input event and es2b = runRTSP k2 e1
a) e2 e1 : Emit e2 (es2a∘es1a) (k2∘k1)
b) e1 e2 : k2 e1 ∘ es1a -- e2 never happens because it is overridden by e1
c) ev ... : es2∘(k1 ev) -- ev overrides e1, and the new output is fed to es2.
d) e1 ev e2 : esProcess (es2b∘es1a) ev -- e1 overrides e2, giving es1a and es2b to process ev.
e) e2 ev e1 : Emit e2 (es2a∘(k1 ev)) -- e2 is emitted, then ev overrides e1.
-}
instance Category EventStream where
-- id :: EventStream b c
id = Wait $ RTSP $ λe → Emit e id id
-- (.) :: EventStream c d → EventStream b c → EventStream b d
(Wait k2)∘(Wait k1) =
Wait $ k2∘k1
es2@(Wait k2)∘es1@(Emit e1 es1a k1) =
let es = (runRTSP k2 e1)∘es1a
k = RTSP $ λev ->
if ev `isBefore` e1
then es2 ∘ runRTSP k1 ev -- Timeline 2b
else runRTSP (esProcess es) ev -- Timeline 2a
in case es of
Emit e2 es2b _ → Emit e2 es2b k
Wait _ → Wait k
es2@(Emit e2 es2a k2)∘es1@(Wait k1) =
Emit e2 (es2a∘es1) (RTSP $ λev → es2∘runRTSP k1 ev)
es2@(Emit e2 es2a k2)∘es1@(Emit e1 es1a k1) =
let k = RTSP $ λev ->
if ev `isBefore` e1
then es2∘runRTSP k1 ev -- Timeline 3c
else runRTSP (esProcess es) ev -- Timeline 3d
es = (runRTSP k2 e1)∘es1a
in if e1 `isBefore` e2
then -- Timelines 3b, 3c and 3d. e2 never happens.
case es of
Emit e3 es3 _ → Emit e3 es3 k
Wait _ → Wait k
else -- Timelines 3a, 3c and 3e.
Emit e2 (es2a∘es1) (esProcess es2∘k1)
-- | Given a new input event to an existing event stream, this returns the
-- modified event stream.
esProcess :: EventStream b c → RTSP b c
esProcess (Wait k) = k
esProcess (Emit eOut rest k) = RTSP $ λeIn ->
if eIn `isBefore` eOut
then runRTSP k eIn
else Emit eOut (runRTSP (esProcess rest) eIn) k
And here is the matching RTSP instance:
{-
The (.) operator for RTSP has to deal with a similar range of scenarios to that of EventStream.
The event that fires off the composed stream is e0. This is passed to r1, giving es1, and the
first event from es1 is passed to r2, giving es2. Hence these events must occur in order. However
ev is a second input event that can arrive at any time after e0.
The possible timelines are therefore:
1: e0 e1 e2 ev: Emit e2 (es2a∘es1a) _ -- The basic case. ev is passed to (es2a∘es1a)
2: e0 e1 ev e2: es2∘es1a -- e1 has happened, but ev occurs before e2 is emitted.
3: e0 ev e1 e2: r2∘esProcess es1 -- e1 is overriden by ev.
-}
instance Category RTSP where
-- id :: RTSP b c
id = RTSP $ λev → Emit ev id id
-- (.) :: RTSP c d → RTSP b c → RTSP b d
r2∘r1 = RTSP $ λe0 ->
let es1 = runRTSP r1 e0
in case es1 of
Emit e1 es1a k1a ->
let es2 = runRTSP r2 e1
k = RTSP $ λev ->
runRTSP (if ev `isBefore` e1 then r2∘k1a else r2∘esProcess es1) ev
in case es2 of
Emit e2 es2a k2a ->
Emit e2 (es2a∘es1a) k
Wait k2a ->
Wait k
Wait k1a ->
Wait (r2∘k1a)
So now we have shown that these two things are both Categories, can we make them Arrows? The Arrow
class requires a definition of first
with the following types:
first :: EventStream b c -> EventStream (b, d) (c, d)
first :: RTSP b c → RTSP (b, d) (c, d)
This is where I run into problems. EventStream
can't be an Arrow because the first argument might emit a value before any input is received. At this point first
is supposed to pair up the output with some other value of type d
, but no such value exists. So EventStream
can't be an Arrow.
RTSP doesn't have this problem because some value of type d
must have been passed in at the start, so this lets us define first
. However from a semantic point of view it is not clear how this should deal with delays and subsequent events.
Its easy to write a delay
arrow: it simply echoes every incoming event with a fixed delay added to the event time. Other variations on the theme are possible, such as a dup
arrow that echoes every input immediately and then again after a fixed delay, or an arrow that echoes every input as a Right
value, but also emits a Left timeout
value if no input is received after a defined delay. There is clearly a whole family of combinators waiting to be written here. But first
things first
.
What should first
do if its argument is delay 1
? Intuitively, first
puts the first item of the pair through its argument, but the second argument bypasses this processing. However this would mean emitting one part of the result before the
second. What should it do with dup 1
? And what do the answers imply for this
expression?
delay 4 &&& delay 5
I don't have any good answers. Maybe RTSP isn't an arrow, but that is a real nuisance. Any ideas?