This tutorial will give you an overview over the stream processing language TeSSLa. You will learn everything you need to know in order to write your own TeSSLa specifications. We will start with the basic TeSSLa operators and their usage in potentially recursive equations. Then we will have a look at TeSSLa's macro expansion and type system as well as complex data types and how to use those in TeSSLa.
TeSSLa can be seen as a stream processing engine, which translates input streams into output streams. This stream transformation process is described as an equation system which derives new streams from the given ones. Derived streams can either be internal streams which are translated further or output streams.
Consider the following very simple example of a TeSSLa specification:
in temperature: Events[Int]
def low = temperature < 3
def high = temperature > 8
def unsafe = low || high
out unsafe
This specification consists of an input stream temperature
of integer events and the derived streams low
, high
and unsafe
of which unsafe
is marked as an output stream. For every new event on the input stream temperature
a new event on the derived streams high
and low
is computed. Based on these computed events now a new event on the output stream unsafe
can be computed:
stream temperature: bubbles
stream low: bubbles
stream high: bubbles
stream unsafe: bubbles
---
1: temperature = 6
1: low = false
1: high = false
1: unsafe = false
2: temperature = 2
2: low = true
2: high = false
2: unsafe = true
3: temperature = 1
3: low = true
3: high = false
3: unsafe = true
4: temperature = 5
4: low = false
4: high = false
4: unsafe = false
5: temperature = 9
5: low = false
5: high = true
5: unsafe = true
Note, that the order in which the intermediate streams are defined is completely up to you. TeSSLa will always figure out the dependency chain and derive dependent events in the correct order.
Order of definitions doesn't matter. TeSSLa will figure out dependency chain.
TeSSLa has the one stream type Events[T]
where T
is a value type. Value types can be primitive types such as Bool
, Int
, Float
, String
and Unit
, which is the special type with the only value ()
. TeSSLa also supports complex data types which are discussed later in this tutorial.
Input streams must always be declared with an explicit type annotation. Types of intermediate streams which are directly or indirectly derived from input streams will be inferred. You still may add type annotations to get better error messages.
The expression temperature < 3
is an expression on value types which is automatically lifted to an expression on stream types such that low
is of type Events[Bool]
. This automatic lifting is discussed in detail later in this section.
For the primitive types TeSSLa supports many common operations. Every type can be checked for equality with the operators ==
and !=
, which take two operands of the same type and return a Bool
. On Bool
one can use &&
for logical and, ||
for logical or, !
for negation, and so on. On Int
TeSSLa supports the common comparison operators <
, >
, <=
and >=
which compare two Int
and return a Bool
, as well as the common arithmetic operators such as +
, -
, *
, /
and so on. The same operators are available for Float
, too, but in order to syntactically distinguish the operators from the ones on Int
they are suffixed with a dot, e.g. +.
and >.
instead of +
and >
. This allows automatic type inference for arithmetic expressions. The same holds for literals: 1
is an Int
literal and 1.0
is a Float
literal. See the documentation of the standard library for a comprehensive list of all available operators and types.
Value Type | Operator / Constant | Return Type |
---|---|---|
Any | == , != | Bool |
Bool | true , false | Bool |
Bool | && , || , ! , … | Bool |
Int | 42 , -23 , … | Int |
Int | < , > , <= , >= , … | Bool |
Int | + , - , * , / , … | Int |
Float | 42.0 , -.23.7 , … | Float |
Float | <. , >. , <=. , >=. , … | Bool |
Float | +. , -. , *. , /. , … | Float |
To follow along the examples in this tutorial one can either use the TeSSLa Web IDE or download TeSSLa and use it locally.
The web IDE consists of four text editors:
Copy the temperature example above into the Specification editor on the upper right. Copy the following input trace into the Trace editor in the upper left:
1: temperature = 6
2: temperature = 2
3: temperature = 1
4: temperature = 5
5: temperature = 9
Press the green Run button in the title bar to execute TeSSLa. Notice the output trace in the lower right. Try clicking on TeSSLa Visualization to get a visual representation of the output trace instead.
Try to adjust the specification. You can change the numbers or the definition of unsafe
. You can declare more streams as output streams by adding output declarations, e.g. out low
. If you want to see all defined streams in the output you can add out *
.
Note, that the output is not by accident in the same trace format as the input: TeSSLa can be seen as a stream transformation engine, which derives intermediate and output streams by applying the specification to the input streams.
The output is in the same trace format as the input.
A TeSSLa trace is a text file containing one event per line. Every line
The timestamps must be
Like in this example trace:
# This is a comment
1: a = 2
1: b = 3
2: a = 4
2: b = 2
Multiple events with the same stream name and the same timestamp are not allowed.
You can download TeSSLa and use it locally. The TeSSLa compiler and interpreter are bundled into a single jar file. The executable takes a TeSSLa specification and an input trace, compiles the TeSSLa specification and generates an output trace by applying the compiled specification to the input trace.
Create a file trace.input
and copy the input trace shown in the previous exercise in that file. Create another file specification.tessla
and copy the specification shown above into that file. To apply this TeSSLa specification to this trace you can run the following line. Remember that your tessla.jar
might have a another name.
java -jar tessla.jar interpreter specification.tessla trace.input
For the rest of this tutorial you can either use the web IDE or the command line tool. You can java -jar tessla.jar -h
for more information on the usage of the TeSSLa command line tool.
One of TeSSLa's key feature is the handling of non-synchronized streams. Consider this specification:
in a: Events[Int]
in b: Events[Int]
def c = a + b
out c
If our inputs are nicely synchronized as in the temperature example above, then we can compute a new value for c
in every step, because in every step we get a new value for a
and a new value for b
:
stream a: bubbles
stream b: bubbles
stream c: bubbles
---
1: a = 2
1: b = 3
1: c = 5
2: a = 4
2: b = 3
2: c = 7
3: a = 8
3: b = 1
3: c = 9
But in the reality input streams are not always that nicely arranged. In TeSSLa every event consists of a timestamp and a value. Hence, TeSSLa supports both sparse and high-frequency streams with arbitrary precise timestamps. As an example consider the following slightly adjusted input:
TeSSLa supports both sparse and high-frequency streams with arbitrary precise timestamps.
option axis: true
option timeDomain: [9,38]
stream a: bubbles
stream b: bubbles
---
10: a = 2
10: b = 3
17: a = 4
20: b = 3
30: b = 1
35: a = 8
How can we lift the operation a+b
to such inputs?
TeSSLa supports many possible interpretations, but its default is the signal semantics. We understand the events as those points in time where a piece-wise constant signal changes its value. With that interpretation we get the following diagram, where it becomes obvious what a+b
should mean:
option axis: true
option timeDomain: [9,38]
stream a: signal
stream b: signal
stream c: signal
---
10: a = 2
10: b = 3
10: c = 5
17: a = 4
17: c = 7
20: b = 3
20: c = 7
30: b = 1
30: c = 5
35: a = 8
35: c = 9
Note that the signal semantics is only a different interpretation of discrete events in TeSSLa. The piece-wise constant signals are still represented as sequence of events.
You can easily combine both worlds in TeSSLa, e.g. if your input is a sequence of discrete events, you can count those events and get a piece-wise constant signal:
in x: Events[Unit]
out count(x) as c
option timeDomain: [0,38]
stream x: unit events
stream c: signal
---
0: c = 0
7: c = 1
7: x = ()
17: c = 2
17: x = ()
20: c = 3
20: x = ()
30: c = 4
30: x = ()
35: c = 5
35: x = ()
Note how we used the syntax out count(x) as c
as a shortcut for
def c = count(x)
out c
r
of type Events[Unit]
and use events on that stream to reset the the counter.in x: Events[Unit]
in r: Events[Unit]
def c = resetCount(x, r)
out c
option timeDomain: [0,38]
stream x: unit events
stream r: unit events
stream c: signal
---
0: c = 0
7: c = 1
7: x = ()
17: c = 2
17: x = ()
20: c = 3
20: x = ()
24: r = ()
24: c = 0
30: c = 1
30: x = ()
35: c = 2
35: x = ()
As you've already seen, many operations on values such as +
are automatically interpreted with the signal semantics on streams. You could also do this manually by explicitly using slift
as follows:
in a: Events[Int]
in b: Events[Int]
def c = slift(a, b, Operators.add)
out c
Operators.add
is the function on Int
which is implicitly used when you use the +
operator.
You can also define your own functions on values and pass those to slift
:
in a: Events[Int]
in b: Events[Int]
def f(x: Int, y: Int) = if x > y then x - y else x + y
def c = slift(a, b, f)
out c
option axis: true
option timeDomain: [9,38]
stream a: signal
stream b: signal
stream c: signal
---
10: a = 2
10: b = 3
10: c = 5
17: a = 4
17: c = 1
20: b = 3
20: c = 1
30: b = 1
30: c = 3
35: a = 8
35: c = 7
With TeSSLa's lambda syntax you can specify a function as anonymous lambda inline. You specify the parameter list including their types followed by a rocket and the function's expression: (x: Int) => x + 1
. Rewrite the example above using a lambda instead of the named function f
.
in a: Events[Int]
in b: Events[Int]
def c = slift(a, b, (x, y) => if x > y then x - y else x + y)
out c
As you've seen the signal lift always passes the last known values on both streams to the function and creates a new event on the output stream for every event on any of the input streams. If you do not want the signal semantics you can use the more basic lift
which does not store any seen values on its own. The given function is still called for every event on the input streams, but if the other input stream does not have an event with the exact same timestamp the lift
passes None
to the given function instead of any value for the corresponding argument.
In order to have a type-safe encoding of getting either a value or None
we use the Option
type. If we stick with streams over integers we now need to define a function taking two arguments of type Option[Int]
and returning an Option[Int]
. If the function returns None
, then no event is generated. This can be used for example to filter event streams.
As an example we use merge
from the standard library which takes two event streams and creates an event stream containing events from both given streams:
slift
implements signal semantics f
lift
is the most basic TeSSLa operator None
to f
if there is no event on other streamsin a: Events[Int]
in b: Events[Int]
out merge(a,b) as c
option timeDomain: [7,38]
stream a: bubbles
stream b: bubbles
stream c: bubbles
---
10: a = 2
10: b = 3
10: c = 2
17: a = 4
17: c = 4
20: b = 3
20: c = 3
30: b = 1
30: c = 1
35: a = 8
35: c = 8
merge
is defined in the standard library as follows:
def merge[T](stream1: Events[T], stream2: Events[T]): Events[T] =
lift(stream1, stream2,
(x: Option[T], y: Option[T]) => if isNone(x) then y else x)
Note how merge prefers the first argument if both streams have an event at the same timestamp. Write your own merge which takes the event from the second argument in that case.
def merge[T](stream1: Events[T], stream2: Events[T]): Events[T] =
lift(stream1, stream2,
(x: Option[T], y: Option[T]) => if isNone(y) then x else y)
Until now we only derived streams from existing streams by computing new events from simultaneous events. (The signal semantics is an exception which will be discussed in the next section.) In many specifications you want to access previous events, too. As a first example let's compute the delta between successive events:
in x: Events[Int]
def delta = x - last(x,x)
out delta
stream x: events
stream delta: events
---
10: x = 2
17: delta = 2
17: x = 4
20: delta = -1
20: x = 3
30: delta = -2
30: x = 1
35: delta = 7
35: x = 8
The last
operator takes two arguments: The first argument is the value stream, whose values are accessed at a later time. The second argument is the trigger stream. For every event on the trigger stream last
produces the last known value on the value stream. Note that the last known value does explicitly not include simultaneous events.
in value: Events[Int]
in trigger: Events[Unit]
def lst = last(value, trigger)
out lst
stream value: events
stream trigger: unit events
stream lst: events
---
3: trigger = ()
5: value = 4
7: lst = 4
7: trigger = ()
9: lst = 4
9: trigger = ()
11: value = 7
14: lst = 7
14: trigger = ()
18: lst = 7
18: trigger = ()
18: value = 3
22: lst = 3
22: trigger = ()
Timestamps of events can be accessed with the time operator.
The time operator produces an event stream where all events carry their timestamps as their values, too. The timestamps can then be processed as any other event value without the need of any further explicit syntax. By using time we can adjust the previous delta example so that we get the time passed between two successive events:
in x: Events[Unit]
def t = time(x)
def lst = last(t,x)
def delta = time(x) - lst
out delta
stream x: unit events
stream t: events
stream lst: events
stream delta: events
option axis: true
---
10: t = 10
10: x = ()
17: delta = 7
17: lst = 10
17: t = 17
17: x = ()
20: delta = 3
20: lst = 17
20: t = 20
20: x = ()
30: delta = 10
30: lst = 20
30: t = 30
30: x = ()
35: delta = 5
35: lst = 30
35: t = 35
35: x = ()
The above example contains
def delta = time(x) - last(time(x), x)
What would happen if we use the following instead? Why?
def delta = time(x) - time(last(x, x))
We would take the timestamp of the trigger of last, which would be the timestamp of x. Hence, the delta would always be 0.
Let's assume that we have the following effect cause chain of events a, b, c and d:
option axis: true
stream a: unit events
stream b: unit events
stream c: unit events
stream d: unit events
---
10: a = ()
11: b = ()
12: c = ()
13: d = ()
20: a = ()
22: b = ()
24: c = ()
27: d = ()
30: a = ()
32: b = ()
34: a = ()
34: c = ()
36: b = ()
36: d = ()
37: c = ()
39: d = ()
Write a specification which computes the length of the event chain for every event on d. You can do this by triggering three nested lasts with the events on b, c and d in order to get the timestamp of the starting event on a for the chain. Then compute the delta between the timestamps of start and end of the chain.
in a: Events[Unit]
in b: Events[Unit]
in c: Events[Unit]
in d: Events[Unit]
def t = time(a)
def x = last(last(last(t, b), c), d)
def delta = time(d) - x
out delta
option axis: true
stream a: unit events
stream b: unit events
stream c: unit events
stream d: unit events
stream x: events
stream delta: events
---
10: a = ()
11: b = ()
12: c = ()
13: delta = 3
13: x = 10
13: d = ()
20: a = ()
22: b = ()
24: c = ()
27: delta = 7
27: x = 20
27: d = ()
30: a = ()
32: b = ()
34: a = ()
34: c = ()
36: b = ()
36: delta = 6
36: x = 30
36: d = ()
37: c = ()
39: delta = 5
39: x = 34
39: d = ()
Assume that we get events for every call and return of a specific function:
option axis: true
stream call: unit events
stream ret: unit events
---
10: call = ()
17: ret = ()
25: call = ()
35: ret = ()
57: call = ()
69: ret = ()
Compute the average runtime of the function. You might want to use sum and count from the standard library in order to compute an average value.
in call: Events[Unit]
in ret: Events[Unit]
def x = last(time(call), ret)
def duration = time(ret) - x
def avg = average(duration)
out avg
option axis: true
stream call: unit events
stream ret: unit events
stream x: events
stream duration: events
stream avg: events
---
10: call = ()
17: avg = 7
17: duration = 7
17: x = 10
17: ret = ()
25: call = ()
35: avg = 8
35: duration = 10
35: x = 25
35: ret = ()
57: call = ()
69: avg = 9
69: duration = 12
69: x = 57
69: ret = ()
Another solution using sum and count:
in call: Events[Unit]
in ret: Events[Unit]
def x = last(time(call),ret)
def duration = time(ret) - x
def avg = if duration == 0 then 0 else sum(duration)/count(duration)
out avg
Actually the slift from the standard library is defined using lift, last and merge:
def slift[T,U,V](a: Events[T], b: Events[U], f: (T,U) => V): Events[V] = {
def aa = merge(a, last(a, b))
def bb = merge(b, last(b, a))
lift(aa, bb, (x: Option[T], y: Option[U]) =>
if isNone(x) || isNone(y)
then None[V]
else Some(f(getSome(x), getSome(y))))
}
in a: Events[Int]
in b: Events[Int]
def c = a + b
out c
stream a: signal
stream b: signal
stream aa: bubbles
stream bb: bubbles
stream c: bubbles
---
1: a = 1
1: b = 2
1: aa = 1
1: bb = 2
1: c = 3
2: b = 4
2: aa = 1
2: bb = 4
2: c = 5
3: a = 3
3: aa = 3
3: bb = 4
3: c = 7
The filter defined in the standard library can be used to filter event streams based on a boolean condition stream. Filtering applies the signal semantics only to the condition stream and not the stream of events which is going to filtered. Otherwise filter could introduce new events in some edge cases which would be counter intuitive.
in a: Events[Int]
in cond: Events[Bool]
def f = filter(a, cond)
out f
def filter[A](events: Events[A], condition: Events[Bool]) = {
def cc = merge(condition, last(condition, events))
lift(events, cc, (e: Option[A], c: Option[Bool]) =>
if isSome(c) && getSome(c) then e else None[A])
}
stream a: bubbles
stream cond: signal
stream cc: bubbles
stream f: bubbles
---
0: cond = true
1: cc = true
1: f = 1
1: a = 1
2: cond = false
2: cc = false
3: a = 2
3: cc = false
4: a = 3
4: cc = false
5: cond = true
5: cc = true
5: f = 4
5: a = 4
6: cc = true
6: f = 5
6: a = 5
The equations in a TeSSLa specification can be recursive as long as there is at least one last
guarding every recursive definition.
Only the value argument of last can be recursive, the trigger argument – which is the second argument – must be non-recursive.
Such recursive definitions can be used to aggregate data over time. As an example consider the following definition which sums up the values of all events on x:
in x: Events[Int]
def lastValue: Events[Int] = last(withDefault, x)
def withCurrent = lastValue + x
def withDefault = merge(withCurrent, 0)
out withDefault
option timeDomain: [-1,10]
stream x: bubbles
stream lastValue: bubbles
stream withCurrent: bubbles
stream withDefault: bubbles
---
0: withDefault = 0
3: x = 2
3: lastValue = 0
3: withCurrent = 2
3: withDefault = 2
5: x = 1
5: lastValue = 2
5: withCurrent = 3
5: withDefault = 3
8: x = 3
8: lastValue = 3
8: withCurrent = 6
8: withDefault = 6
Note, that every recursive definition needs at least on explicit type annotation.
We take the events on x
as trigger for the last and the later defined stream withDefault
as value. Since last
only refers to value events which have been strictly before the trigger, this definition can be unrolled over time. In the next line we take the lastValue
of the aggregation and add the current value of x
. Then we merge a default value of 0 into the stream to get a starting point for the recursion. The same recursive definition could be written in one line as
def s: Events[Int] = merge(last(s, x) + x, 0)
Update the above specification so that we count the number of events instead of summing up their values.
in x: Events[Int]
def s: Events[Int] = merge(last(s, x) + 1, 0)
out s
option timeDomain: [-1,10]
stream x: unit events
stream lastValue: bubbles
stream newValue: bubbles
stream withDefault: bubbles
---
0: withDefault = 0
3: x = ()
3: lastValue = 0
3: newValue = 1
3: withDefault = 1
5: x = ()
5: lastValue = 1
5: newValue = 2
5: withDefault = 2
8: x = ()
8: lastValue = 2
8: newValue = 3
8: withDefault = 3
In the above specification we sum up the values or count the number of events. In both specifications we start with a default value of 0 at timestamp 0. Rewrite the specifications so that they do not produce an event at timestamp 0, but start with the first input event.
def s = last(merge(s, 0), x) + x
and
def s = last(merge(s, 0), x) + 1
The standard library provides a generic fold which can be used to express count and sum, too:
in x: Events[Int]
# summing up the event's values
def s = fold(x, 0, Operators.add)
# counting the events
def c = fold(x, 0, (acc, v) => acc + 1)
out s
out c
Assume that there are two input streams x
and y
. Build a specification which aggregates their event's values as follows:
x
add the event's value.y
multiply the event's value.in x: Events[Int]
in y: Events[Int]
def s = merge3(last(s, x) + x, last(s, y) * y, 0)
out s
An equivalent solution with fold can be found below. Therefore it is necessary to zip the values of the two streams together to a single stream. For this we use the stdlib function zip
:
in a: Events[Int]
in b: Events[Int]
def specialSum(acc: Int, v: (Option[Int], Option[Int])): Int = {
if isSome(v._1) then
acc + getSome(v._1)
else
acc * getSome(v._2)
}
out fold(zip(a,b), 0, specialSum)
option timeDomain: [0,20]
stream x: events
stream y: events
stream s: signal
---
0: s = 0
7: x = 4
7: s = 4
9: s = 8
9: y = 2
12: x = 1
12: s = 9
15: s = 27
15: y = 3
17: x = 3
17: s = 30
So far all the TeSSLa operators that we discussed in this tutorial have been timestamp-conservative, i.e. they can only generate an event at a specific timestamp if there has been an event with that timestamp on any of their input streams. The delay
operator is the only operator that can create events with new timestamps.
(See the TeSSLa paper for a more detailed discussion of the expressiveness of different TeSSLa fragments.)
The delay
takes two arguments:
delay
generates an event if that delay is over.There is one additional rule: New delays are only activated if they are set together
delay
.This additional restriction ensures that there is always only one active delay. Hence, TeSSLa engines only keep track of at most one active delay per delay
operator in the specification. Without this restriction they would need to realize a queue of active delays for every delay
operator in the specification.
in amount: Events[Int]
in reset: Events[Unit]
def x = delay(amount, reset)
out x
option axis: true
stream amount: events
stream reset: unit events
stream x: unit events
---
5: amount = 2
9: reset = ()
9: amount = 2
11: x = ()
11: amount = 3
14: x = ()
14: amount = 2
15: reset = ()
17: reset = ()
17: amount = 3
20: x = ()
20: amount = 3
22: reset = ()
22: amount = 2
24: x = ()
26: reset = ()
The delay operator can be used in recursive equations if its second argument – the reset stream – is not recursive.
In its simplest form such a recursion will generate an event in a fixed period, as shown in the example below.
Note that the TeSSLa software interpreter has a synchronized internal progress, i.e. no output events are generated after the last input event. Without input events, no output events are computed, even if they do not depend on the inputs. On the other hand, if the interpreter receives an input event at a certain timestamp, all output events up to that timestamp (exclusively) are computed. In the next examples we will therefore use an explicit input stream progress
to specify how long we want to generate events.
in progress: Events[Unit]
def x: Events[Unit] = merge(delay(const(3, x), ()), ())
out x
option axis: true
option timeDomain: [-0.9,20.9]
stream progress: unit events
stream x: unit events
---
0: x = ()
3: x = ()
6: x = ()
9: x = ()
12: x = ()
15: x = ()
18: x = ()
20: progress = ()
Note the similarities to recursive equations with last
:
We need to merge the recursive stream with a default value to have a starting point for the recursion and we have to arguments to the default operator:
The next canonical example for a recursive equation with the delay operator is a period with a variable frequency:
in progress: Events[Unit]
in f: Events[Int]
def x = merge(f, last(f, delay(x, f)))
out x
option axis: true
stream progress: unit events
stream f: events
stream x: events
---
3: f = 2
3: x = 2
5: x = 2
7: f = 3
7: x = 3
10: x = 3
13: x = 3
14: f = 2
14: x = 2
16: x = 2
18: x = 2
20: x = 2
21: progress = ()
Note how we used the output of the delay
to trigger the last
. It is allowed to trigger a last
with a recursive stream as long as there is at least one operator on the recursive loop that breaks the recursion. In this case the delay prevents the recursion from becoming infinite.
What happens in the above TeSSLa specification if an event on f
happens simultaneously with a generated event on x
? Is the old frequency used for another round or do we immediately use the new delay? How could we change this behavior?
The merge operator prefers events on its first argument. Hence, in the above specification f
would win over last(f, delay(x, f))
. If we exchange these two arguments of the merge operator, we would get a different behavior, where the new frequency would not be used immediately.
At the end of this tutorial we will see an elaborated example where we combine a complex data structure and the delay operator.
You know all the basic TeSSLa operators now. In this section we will shortly discuss some advanced language features.
Remember how we defined functions on primitive values:
def f(x: Int, y: Int) = if x > y then x - y else x + y
In the same way we can also define macros on streams:
def prev(x: Events[Int]) = last(x,x)
The fundamental difference between functions on primitive values and macros on streams is that macros are expanded at compile time and functions are evaluated at runtime.
To use macros in your code add them to your specification. In the following example we defined a macro to add the value five to the last input value.
def add5ToLast(x: Events[Int]) = last(x,x) + 5
in x: Events[Int]
out add5ToLast(x)
Here is a simple trace to use with the example:
10: x = 1
20: x = 2
30: x = 3
40: x = 4
Functions and macros can have multi-line bodies and temporary stream definitions:
def sum(x: Events[Int]) = {
def s = merge(last(s, x) + x, 0)
s
}
Note how we defined a temporary stream s
inside that macro and use that stream as the macros value. A block always consists of a sequence of definitions and a final expression. The block evaluates to that final expression.
Define a macro that can be used to measure the latency of cause effect chains based on the earlier exercise. It should take the four events of the event chain as inputs and generate an event stream of the durations of all seen chains.
def eventChain(a: Events[Unit], b: Events[Unit], c: Events[Unit], d: Events[Unit]) = {
def t = time(a)
def x = last(last(last(t, b), c), d)
def delta = time(d) - x
delta
}
Define a macro that can be used to measure the runtime of a function based on the earlier exercise. It should take the call and return events of the function as inputs and generate an event stream of the current runtimes.
def runtime(call: Events[Unit], ret: Events[Unit]) = {
def x = last(time(call), ret)
time(ret) - x
}
Coming back to the function f
: For many functions on primitive values one wants a corresponding macro which applies the function on a stream using the signal semantics implemented by slift
:
def f(x: Int, y: Int) = if x > y then x - y else x + y
def f2(x: Events[Int], y: Events[Int]) = slift(x, y, f)
Such a macro can be generated automatically using the keyword liftable
:
liftable
def f(x: Int, y: Int) = if x > y then x - y else x + y
Now the slift
will be wrapped automatically around f
if you use f
on streams.
We have already seen the primitive types which are available in TeSSLa. Together with the stream type Events[T]
they form the basic available types.
More complex data structures are discussed in the next sections.
We already used the lift
and slift
macro which take functions as an argument. Functions are first-class citizens in TeSSLa and can be used like any other primitive type. For example the following function
def f(x: Int, y: Int) = x + 10 * y
has the type
(Int, Int) => Int
Types in TeSSLa are inferred wherever possible, but can be explicitly annotated in many places in order to get better error messages from the type checker.
Currently the type inference requires you to annotate
Literals, e.g. 5
or false
, are automatically converted to streams with one event at timestamp 0 with that value.
In the context of the signal semantics this allows you to write x+5
for a stream x
and you end up with x + const(5, ())
which is slift(x, const(5, ()), Operators.add)
which is usually what you want.
TeSSLa supports generic types, so for example instead of
def prev(x: Events[Int]) = last(x,x)
we could also define the more general
def prev[A](x: Events[A]) = last(x,x)
TeSSLa has no subtyping.
Apart from the primitive data types, TeSSLa support different kinds of complex data structures. TeSSLa is defined agnostically with respect to any time or data domain.
Different data structures can be used to represent time and data. Especially the TeSSLa engine written in Scala used in this tutorial supports complex data structures like lists, trees and maps. We first look at the internal data structures, i.e. options, objects and tuples, and then the external data structures, which are provided by the TeSSLa engine.
We already know the Option[T]
type from the lift
operation, but you can use the option type for other purposes, too. There are the usual constructors None
and Some
as well as the usual accessors isNone
, isSome
, getSome
, getSomeOrElse
. Have a look at the documentation of the standard library on how to use them.
Type | Option[T] |
Constructors | None , Some |
Accessors | isNone , isSome , getSome , getSomeOrElse |
Write a function checkedDivision(dividend: Int, divisor: Int): Option[Int]
which returns the result of the division if the divisor is not 0 and None otherwise. Use that function with getSomeOrElse
to realize a division which returns 0 if the divisor is 0.
def checkedDivision(dividend: Int, divisor: Int) =
if divisor == 0 then None[Int]
else Some(dividend / divisor)
out getSomeOrElse(checkedDivision(27, 0), 0)
TeSSLa natively supports objects. Objects are created using the following object literal syntax:
def o = {a = 5, b = false}
This object has the type {a: Int, b: Bool}
. Attributes of an object with that type can be accessed using the dot operator:
def x = if o.b then o.a else - o.a
Tuples are objects with the implicit attribute names _1
, _2
and so on. They can be created using the following tuple literal syntax:
def t = (5, false)
This tuple has the type (Int, Bool)
, which is equivalent to {_1: Int, _2: Bool}
. Elements of a tuple with that type can be accessed using the dot operator:
def x = if o._2 then o._1 else - o._1
It is possible to have objects (and tuples) of streams and even mixed objects, too:
in a: Events[Int]
def mixed = {a = 5, s = (a + 7, a - 7)}
out mixed.s._1
External data structures are provided by the TeSSLa engine.
We can use sets to keep track of all the values that we have seen so far:
in x: Events[Int]
def seen: Events[Set[Int]] = merge(Set.add(last(seen, x), x), Set.empty[Int])
def new = !Set.contains(last(seen, x), x)
out new
stream x: bubbles
stream new: bubbles
---
1: new = true
1: x = 1
2: new = true
2: x = 2
3: new = true
3: x = 3
4: new = false
4: x = 1
5: new = false
5: x = 1
6: new = false
6: x = 1
7: new = false
7: x = 3
8: new = true
8: x = 4
9: new = false
9: x = 2
If we use a map instead of a set we can even keep track how often we have seen that value so far:
in x: Events[Int]
def old = last(seen, x)
def seen: Events[Map[Int, Int]] =
merge(Map.add(old, x,
if Map.contains(old, x) then Map.get(old, x) + 1 else 1),
Map.empty[Int, Int])
def c = Map.get(seen, x)
def new = c == 1
out new
out c
stream x: bubbles
stream new: bubbles
stream c: bubbles
---
1: c = 1
1: new = true
1: x = 1
2: c = 1
2: new = true
2: x = 2
3: c = 1
3: new = true
3: x = 3
4: c = 2
4: new = false
4: x = 1
5: c = 3
5: new = false
5: x = 1
6: c = 4
6: new = false
6: x = 1
7: c = 2
7: new = false
7: x = 3
8: c = 1
8: new = true
8: x = 4
9: c = 2
9: new = false
9: x = 2
With a list we can compute the average over all the values that we have seen so far:
in x: Events[Int]
def old = last(seen, x)
def seen: Events[List[Int]] = merge(List.append(old, x), List.empty[Int])
def sums = List.fold(seen, 0, (a: Int, b: Int) => a + b)
def avg = if List.size(seen) > 0 then sums / List.size(seen) else 0
out avg
stream x: plot
stream avg: dots
---
0: avg = 0
1: avg = 10
1: x = 10
2: avg = 15
2: x = 20
3: avg = 20
3: x = 30
4: avg = 17
4: x = 10
5: avg = 16
5: x = 10
6: avg = 15
6: x = 10
7: avg = 17
7: x = 30
8: avg = 20
8: x = 40
9: avg = 20
9: x = 20
With a few changes we can adopt this specification so that we only keep track of the last three elements:
in x: Events[Int]
def old = last(seen, x)
def rem = if List.size(old) > 3 then List.tail(old) else old
def seen: Events[List[Int]] = merge(List.append(rem, x), List.empty[Int])
def sum_seen = List.fold(seen, 0, (a: Int, b: Int) => a + b)
def avg = sum_seen / 3
out avg
stream x: plot
stream avg: dots
---
0: avg = 0
1: avg = 3
1: x = 10
2: avg = 10
2: x = 20
3: avg = 20
3: x = 30
4: avg = 23
4: x = 10
5: avg = 23
5: x = 10
6: avg = 20
6: x = 10
7: avg = 20
7: x = 30
8: avg = 30
8: x = 40
9: avg = 33
9: x = 20
We can use the data structures presented in this section to define custom data structures in TeSSLa. In this example we want to compute the average load of a processor in the last five time units. Other than in the last example where we always kept three elements in our list, in this scenario the amount of data points that we have to keep track of heavily depends on the timestamps of the incoming events. In theory there is not even an upper bound for the number of events that might have happened in the last five time units. Thus, we realize this moving window using a queue storing all events that happened in the time window. The stream load
contains an event every time the input load changes.
Such a queue with corresponding operators can be defined as follows in TeSSLa
type Queue[D] = List[(Int, D)]
def enq[D](t: Int, d: D, q: Queue[D]) =
List.append(q, (t, d))
def Queue_empty[D] = List.empty[(Int, D)]
def Queue_fold[Acc,D](q: Queue[D], zero: Acc, until: Int, f: (Acc, Int, Int, D) => Acc) = {
def result = List.fold(q, {acc = zero, last = None[(Int,D)]},
(accum: {acc: Acc, last: Option[(Int, D)]}, el: (Int, D)) =>
if isNone(accum.last)
then {acc = accum.acc, last = Some(el)}
else {acc = f(accum.acc, getSome(accum.last)._1, el._1, getSome(accum.last)._2), last = Some(el)})
if isNone(result.last) then zero
else f(result.acc, getSome(result.last)._1, until, getSome(result.last)._2)
}
def remOlder[D](t: Int, q: Queue[D]) =
Queue_fold(q, Queue_empty[D], -1, (acc: Queue[D], from: Int, to: Int, d: D) =>
if 0 < to && to <= t then acc
else enq(max(t, from), d, acc))
Using these operators we can define the following specification:
in load: Events[Float]
def qLen = 5
def stripped: Events[Queue[Float]] =
slift(time(load), merge(last(queue, load), Queue_empty[Float]),
(t: Int, q: Queue[Float]) => remOlder(t - qLen, q))
def queue: Events[Queue[Float]] =
slift3(time(load), load, stripped, (t: Int, d: Float, q: Queue[Float]) => enq(t,d,q))
def Queue_integral(queue: Queue[Float], until: Int) =
Queue_fold(queue, 0.0, until,
(acc: Float, from: Int, to: Int, d: Float) => acc +. d *. intToFloat(to - from) /. intToFloat(qLen))
def avg = slift(queue, time(load), Queue_integral)
out avg
option axis: true
stream load: plot
stream avg: graph
---
1: avg = 0.00
1: load = 0.30
3: avg = 0.12
3: load = 0.50
7: avg = 0.46
7: load = 0.70
10: avg = 0.62
10: load = 0.40
13: avg = 0.52
13: load = 0.30
16: avg = 0.34
16: load = 0.80
Note how the results are slightly inaccurate if an event should be dropped out of the queue, because we only update the generated average with every change in the load. We can fix that using a delay
which will remove outdated elements from the queue automatically. First we need a new operator on queues that can compute the timeout, i.e. the timestamp when the next element is going to drop out of the queue.
def Queue_timeout[D](q: Queue[D]) =
if List.size(q) < 2 then 0
else List.head(List.tail(q))._1
Now we can add a stream timeout
to the specification which contains the delays used to trigger an update of the queue:
in load: Events[Float]
in progress: Events[Unit]
def qLen = 5
def stripped: Events[Queue[Float]] =
slift(time(load), merge(last(queue, load), Queue_empty[Float]),
(t: Int, q: Queue[Float]) => remOlder(t - qLen, q))
def queue: Events[Queue[Float]] =
slift3(time(load), load, stripped,
(t: Int, d: Float, q: Queue[Float]) => enq(t,d,q))
def to = slift(time(updated), updated,
(t: Int, q: Queue[Float]) => Queue_timeout(q) - t + qLen)
def timeout = filter(to, to > 0)
def merged = merge(queue, last(updated, delay(timeout, load)))
def updated: Events[Queue[Float]] = slift(time(merged), merged,
(t: Int, q: Queue[Float]) => remOlder(t - qLen, q))
def Queue_integral(queue: Queue[Float], until: Int) =
Queue_fold(queue, 0.0, until,
(acc: Float, from: Int, to: Int, d: Float) => acc +. d *. intToFloat(to - from) /. intToFloat(qLen))
def avg = slift(updated, time(updated), Queue_integral)
out avg
option axis: true
stream progress: unit events
stream load: plot
stream timeout: events
stream avg: graph
---
1: avg = 0.0
1: load = 0.3
3: avg = 0.12
3: timeout = 5
3: load = 0.5
7: avg = 0.46
7: timeout = 1
7: load = 0.7
8: avg = 0.54
8: timeout = 4
10: avg = 0.62
10: timeout = 2
10: load = 0.4
12: avg = 0.58
12: timeout = 3
13: avg = 0.52
13: timeout = 2
13: load = 0.3
15: avg = 0.36
15: timeout = 3
16: avg = 0.34
16: timeout = 2
16: load = 0.8
18: avg = 0.5
18: timeout = 3
21: avg = 0.8
22: progress = ()
In TeSSLa it is possible to define macros, streams and variables in dedicated modules. Outside of these modules the functions can be accessed via Module.Submodule.identifier
like shown in the following example:
in x: Events[Int]
module Mod {
module SMod {
import StreamFunctions
def myCount[A](x: Events[A]) = fold(x, 0, (curr: Int, _: A) => curr + 1)
}
}
out Mod.SMod.myCount(x)
To access module members directly one can use the import keyword to make these names accessible in the current scope:
in x: Events[Int]
module Mod {
module SMod {
import StreamFunctions
def myCount[A](x: Events[A]) = fold(x, 0, (curr: Int, _: A) => curr + 1)
}
}
import Mod
out SMod.myCount(x)
Note that imported namespaces are not accessible from submodules of the importing modules. If you want to use them there you have to import them again. This is why in the example we explicitly have to import the stdlib module StreamFunctions
in Mod.SMod
, which contains for example the fold
macro. While the StreamFunctions
is imported in the outermost scope automatically by the standard library, it is not automatically available in modules.
It is also possible to access definitions from other TeSSLa files. This can be done by the include
keyword followed by the relative path of the file which shall be included as string.
include.tessla:
def myCount[A](x: Events[A]) = fold(x, 0, (curr: Int, _: A) => curr + 1)
spec.tessla:
include "include.tessla"
in x: Events[Int]
out myCount(x)
Since version 2.1.0 the TeSSLa compiler also includes a package manager. With the keywords include lib
a whole library (which may consist of several TeSSLa files) can be automatically retrieved from the central TeSSLa library repository and included in the current specification. Therefore one provides the name and version of the library in the format "
include lib "tddl:1.0.1"
import TDDL
in x: Events[String]
def p1 = O("a", (Some(2), Some(4)))
def monitor = Monitor(p1)
def res = monitor(x)
out res
When executing this code the compiler first checks if the desired version (1.0.1) of the TeSSLa TDDL library is already available in a folder called dependencies
at the same location as the compiled TeSSLa specification. If this is not the case the package manager is trying to download the library from the central TeSSLa library repository. Further it checks if the library is compatible with the current TeSSLa compiler version and then includes the TeSSLa sources of the library as specified in the library.json
of the corresponding library.
A list of available TeSSLa libraries can be found here.
Now you learned how to write complex TeSSLa specifications. You can find more practical information on how to use TeSSLa in the documentation of TeSSLa's standard library. More information on the language TeSSLa can be found in the TeSSLa language specification. The theoretical background and expressiveness of TeSSLa is discussed in several publications on TeSSLa.
You can now continue with the tutorial on doing runtime verification of C code with TeSSLa.