Returning now to the definition of "first", consider:
first ((*2) >>> dup 5)
where "dup 5" emits two copies of every input event, the second one delayed by 5 seconds. So if the input event was
[Event t (3, 3)]
you would want the output to be [Event t (6, 3), Event (t+5) (6, 3)]
".But suppose that the input had two events like this:
[Event t (3,3), Event (t+1) (4,4)]
. What should the output be now?It seems to me that the output of the argument to "first" should be paired with the second value of the events that "caused" those outputs. So the result should be:
[Event t (3,6),
Event (t+1) (4,8),
Event (t+5) (3,6),
Event (t+6) (4,8)]
However to get this you need to have every arrow track the "cause" of every output and tag the event with that information. So you get
type TagEvent t a = (t, Event a)
data EventStream t b c =
Emit {esEmit :: TagEvent t c, esFuture :: EventStream t b c, esHandle :: RTSP t b c}
| Wait {esHandle :: RTSP t b c} deriving Show
newtype RTSP t b c = RTSP {runRTSP :: TagEvent t b → EventStream t b c}
The type "t" indicates the type of the tag for this kind of event. Whenever an arrow emits an event in response to an input then it puts the tag from the input on the output. In the case of "dup" the EventStream it returns from handling a new event will contain both the pending outputs (with the tags from previous input events) and the two new events with tags from the latest input.
So now I can define two functions:
esFirst :: EventStream (d, t) b c → EventStream t (b, d) (c, d)
esFirst (Emit (Event t va, (vb, tag)) es k) =
Emit (Event t (va, vb), tag) (esFirst es) (rtspFirst k)
esFirst (Wait k) = Wait $ rtspFirst k
rtspFirst :: RTSP (d, t) b c → RTSP t (b, d) (c, d)
rtspFirst (RTSP r) = RTSP $ λ(Event t (va,vb), tag) ->
esFirst $ r (Event t va, (vb, tag))
The second item in the pair (of type "d") gets tacked on to the tag for the whole arrow and re-appended at the end. Problem solved? No.
instance Arrow (RTSP t) where
arr f = RTSP $ λ(e, tag) → Emit (fmap f e, tag) (Wait $ arr f) (arr f)
first = rtspFirst
GHC complains about the infinite type "t = (d, t)". The problem is that the argument to rtspFirst has the type "RTSP (d, t) b c" but it needs to have the type "RTSP t b c". So this version of RTSP isn't an arrow. I have get rid of the type tag "t", but still have some way of having an arrow match causes with effects. Is there some kind of type hackery that will do this?
5 comments:
Out of curiosity, why are EventStream and RTSP separate data types? The fact that they're mutually recursive and both used in the other's Category instance suggests to me that what you really want is some single data type that's currently distributed between them.
For instance, consider what the "id" instance for EventStream is doing: It generates a Wait wrapper around an RTSP value that produces an EventStream which simply re-emits an event and has the aforementioned Wait wrapper as its future. This alternation seems very peculiar.
So, why not consider something like this: Make the decision between Wait/Emit part of the output of the function in RTSP, and replace the direct recursion and single Event values in EventStream with a separate stream type (perhaps even just a list). The output of the RTSP could also provide the information about when to stop waiting for new input and produce another stream instead.
This would leave you with two more workable types: a simple stream of events, and an incremental stream processor that receives an event and produces its own updated replacement, along with an optional output stream. Most usefully, the stream processor type will have very obvious instances for Arrow and its related type classes.
So, something like:
newtype RTSP b c = Event b -> ([Event c], RTSP b c)
The input event returns a list of output events and the RTSP that will handle the next event.
Looks good, except that if the handler is going to add a new event to the existing stream then it will do a "dropWhile (`isBefore` ev) oldOutput", which means it has to hold its entire history since the last event, which is a memory leak. The EventStream type doesn't do that, so the future does not need to retain the past.
newtype RTSP b c = RTSP (Event b -> [(Event c, RTSP)]
won't work either. What happens if an RTSP never emits an event? This is well on the way to re-inventing EventStream.
Or have I misunderstood your suggestion?
Nah, more likely I've misunderstood the details of what you're trying to do. In my own experiments with this sort of thing, I've generally avoided both explicit and absolute time, in part because of the zombie history issue. Instead, I'd have stream processors operating (by definition) in the "present" and only knowing how long it's been since the last event they saw, producing constant values or closed-form functions of time as output. A bit roundabout in some ways, but makes other things very tidy, including lots of straightforward type class instances.
And all this isn't breaking new ground, of course--I assume you're familiar with the existing work on FRP?
I'm fairly familiar with the various flavours of FRP, but they all seem to have continuous models of time (apart from Conal Elliot's "unambiguous choice" which I confess I didn't understand. I want to produce distributed systems, so continuous time won't work but an event based system will. Hence the focus on events.
Post a Comment