Tuesday, October 26, 2010

Is this an Arrow?

I've been thinking about what a native Haskell middleware would look like for a while. One thing seemed obvious: it was going to have to be event-based. So I started thinking about event

data Event a = Event {
eventTime :: UTCTime
eventValue :: a
} deriving (Show, Eq)

instance Functor Event where
fmap f (Event t v) = Event t (f v)

-- | True if the first event occurs strictly before the second.
isBefore :: Event a → Event b → Bool
isBefore ev1 ev2 = eventTime ev1

The simplest representation of a stream of events is just an infinite list, so you can define an event processor as a function from one stream to another:

type EventProcessor a = [Event a] -> [Event a]

Unfortunately this won't work. Suppose that your program is going to emit an event e1 at time t1, but an incoming event before t1 would make your program do something else. In order to decide whether the next item in the output list is e1 you need to look at the time of the next input event. But since that event hasn't happened yet you have to wait for it. If it arrives after t1 then you have missed the deadline.

What is needed is a way to reify the two possibilities here: either the next input event is before t1 or it is after it. This leads to the following data structures:

   data EventStream b c =
Emit {esEmit :: Event c, esFuture :: EventStream b c, esHandle :: RTSP b c}
| Wait {esHandle :: RTSP b c} deriving Show

newtype RTSP b c = RTSP {runRTSP :: Event b → EventStream b c}

The "Emit" case embodies these two possibilities. An interpreter function will process this as follows:

  1. If no input event intervenes then it will emit the event esEmit and call
    itself on esFuture.

  2. On the other hand if an input event arrives before esEmit then the function
    in esHandle gets called instead, generating a replacement EventStream
    representing a different future.

  3. Finally the Wait case allows the event stream to do nothing until an event

The RTSP stands for "Real Time Stream Processor". It also handles the problem of what an event stream is supposed to do before the first event arrives.

Interestingly, both EventStream and RTSP are Categories. The composition operator for EventStream is simple in principle, although in
practice all the different combinations of value and timings make it rather fiddly. Here it is,along with a block comment that explains the logic if you are really interested.

The (.) operator for EventStream has to deal with several scenarios:

1: (Wait k2)∘(Wait k1). This is simple because the only possible event
is an input that is piped through the two RTSPs.

2: (Wait k2)∘es1@(Emit e1 es1a k1). In this case there are three timelines:
a) e1 : (k2 e1)∘es1a -- Event e1 is passed to k2 and the result composed with es1a.
b) ev e1: (Wait k2)∘(k1 ev) -- e1 never happens. Instead the input is passed to k1, which
generates a new event stream to be composed with k2.

The code for case b is the "k" value in the "let" clause.

3: es2@(Emit e2 es2a k2) es1@(Emit e1 es1a k1). The logic here is similar to scenario
2, except that the timing of e2 has to be taken into account as well. There are five basic
timelines. Let ev = the next input event and es2b = runRTSP k2 e1

a) e2 e1 : Emit e2 (es2a∘es1a) (k2∘k1)
b) e1 e2 : k2 e1 ∘ es1a -- e2 never happens because it is overridden by e1
c) ev ... : es2∘(k1 ev) -- ev overrides e1, and the new output is fed to es2.
d) e1 ev e2 : esProcess (es2b∘es1a) ev -- e1 overrides e2, giving es1a and es2b to process ev.
e) e2 ev e1 : Emit e2 (es2a∘(k1 ev)) -- e2 is emitted, then ev overrides e1.
instance Category EventStream where
-- id :: EventStream b c
id = Wait $ RTSP $ λe → Emit e id id

-- (.) :: EventStream c d → EventStream b c → EventStream b d
(Wait k2)∘(Wait k1) =
Wait $ k2∘k1
es2@(Wait k2)∘es1@(Emit e1 es1a k1) =
let es = (runRTSP k2 e1)∘es1a
k = RTSP $ λev ->
if ev `isBefore` e1
then es2 ∘ runRTSP k1 ev -- Timeline 2b
else runRTSP (esProcess es) ev -- Timeline 2a
in case es of
Emit e2 es2b _ → Emit e2 es2b k
Wait _ → Wait k
es2@(Emit e2 es2a k2)∘es1@(Wait k1) =
Emit e2 (es2a∘es1) (RTSP $ λev → es2∘runRTSP k1 ev)
es2@(Emit e2 es2a k2)∘es1@(Emit e1 es1a k1) =
let k = RTSP $ λev ->
if ev `isBefore` e1
then es2∘runRTSP k1 ev -- Timeline 3c
else runRTSP (esProcess es) ev -- Timeline 3d
es = (runRTSP k2 e1)∘es1a
in if e1 `isBefore` e2
then -- Timelines 3b, 3c and 3d. e2 never happens.
case es of
Emit e3 es3 _ → Emit e3 es3 k
Wait _ → Wait k
else -- Timelines 3a, 3c and 3e.
Emit e2 (es2a∘es1) (esProcess es2∘k1)

-- | Given a new input event to an existing event stream, this returns the
-- modified event stream.
esProcess :: EventStream b c → RTSP b c
esProcess (Wait k) = k
esProcess (Emit eOut rest k) = RTSP $ λeIn ->
if eIn `isBefore` eOut
then runRTSP k eIn
else Emit eOut (runRTSP (esProcess rest) eIn) k

And here is the matching RTSP instance:

The (.) operator for RTSP has to deal with a similar range of scenarios to that of EventStream.
The event that fires off the composed stream is e0. This is passed to r1, giving es1, and the
first event from es1 is passed to r2, giving es2. Hence these events must occur in order. However
ev is a second input event that can arrive at any time after e0.

The possible timelines are therefore:

1: e0 e1 e2 ev: Emit e2 (es2a∘es1a) _ -- The basic case. ev is passed to (es2a∘es1a)
2: e0 e1 ev e2: es2∘es1a -- e1 has happened, but ev occurs before e2 is emitted.
3: e0 ev e1 e2: r2∘esProcess es1 -- e1 is overriden by ev.
instance Category RTSP where
-- id :: RTSP b c
id = RTSP $ λev → Emit ev id id
-- (.) :: RTSP c d → RTSP b c → RTSP b d
r2∘r1 = RTSP $ λe0 ->
let es1 = runRTSP r1 e0
in case es1 of
Emit e1 es1a k1a ->
let es2 = runRTSP r2 e1
k = RTSP $ λev ->
runRTSP (if ev `isBefore` e1 then r2∘k1a else r2∘esProcess es1) ev
in case es2 of
Emit e2 es2a k2a ->
Emit e2 (es2a∘es1a) k
Wait k2a ->
Wait k
Wait k1a ->
Wait (r2∘k1a)

So now we have shown that these two things are both Categories, can we make them Arrows? The Arrow class requires a definition of first with the following types:

  first :: EventStream b c -> EventStream (b, d) (c, d)  
first :: RTSP b c → RTSP (b, d) (c, d)

This is where I run into problems. EventStream can't be an Arrow because the first argument might emit a value before any input is received. At this point first is supposed to pair up the output with some other value of type d, but no such value exists. So EventStream can't be an Arrow.

RTSP doesn't have this problem because some value of type d must have been passed in at the start, so this lets us define first. However from a semantic point of view it is not clear how this should deal with delays and subsequent events.

Its easy to write a delay arrow: it simply echoes every incoming event with a fixed delay added to the event time. Other variations on the theme are possible, such as a dup arrow that echoes every input immediately and then again after a fixed delay, or an arrow that echoes every input as a Right value, but also emits a Left timeout value if no input is received after a defined delay. There is clearly a whole family of combinators waiting to be written here. But first things first.

What should first do if its argument is delay 1? Intuitively, first puts the first item of the pair through its argument, but the second argument bypasses this processing. However this would mean emitting one part of the result before the
second. What should it do with dup 1? And what do the answers imply for this

  delay 4 &&& delay 5

I don't have any good answers. Maybe RTSP isn't an arrow, but that is a real nuisance. Any ideas?


Jason Hart Priestley said...

Hi Paul,

I have done some thinking about arrows in the context of concurrent operation. The conclusion I came to was that the &&& operation corresponds to two threads of operation joining (waiting for each other) to create a single value. This doesn't agree with the 'intuitive' meaning of &&&, but it does agree with the categorical idea of a product, so in this case our intuitions are wrong IMO.

I'm not quite sure how your representation is intended to interact with time, but I wonder if you could get where you want to be by using something like a ReaderArrow Time over an Automaton (Maybe a) (Maybe b). Each input to the arrow would be a (time, Maybe event) pair, with the event as Nothing if there is not input at that time. If the arrow decides to send an output, based on either time or signal input, it sets its output to (Just x).

This representation requires the arrow to be run for every 'tick', which is certainly impossible if you require fine-grained timing. In that case you might mark when the next 'interesting' time was, for example by adding a WriterArrow (Min Time), and only call the arrow again once that tick had occurred.

Paul Johnson said...

Right, that makes a lot of sense. &&& still doesn't do the Right Thing because

delay 4 &&& delay 5

will create a delay of 9 rather than 5. However that can be documented. For parallel computations something like this would be better:

fork :: RTSP a (Either a a)

which will turn one event into two, a Left and a Right. Then you can use |||.

The next question is: what happens with "first (dup 1)"? "dup" emits two events for each input, with the second delayed by the argument time. At present the output from "dup" is paired with whatever came in on the last event. But that makes the output sensitive to the timing. The Right Thing would be to have the output from "dup" paired with whatever event "caused" the output. However that needs an extra parameter, with a separate type, to carry this information across arrow composition. Still, looks doable.

Thanks for the help.

beroal said...

So now we have shown that these two things are both Categories
Can you actually prove it? E.g. take 'RTSP'. r2∘id==r2. So you go to "Emit e1 es1a k1a ->" branch with e1 == e0. Then you should return @runRTSP r2 e1@. But you analyze result and e.g. in the 'Wait' branch replace 'k2a' with 'k'. So if @'k2a' /= 'k'@, then axiom is not satisfied. Therefore 'RTSP' is not a category. If it is not a category, everything else does not make sense. Please prove you claims.