TeSSLa Tutorial

This tutorial will give you an overview over the stream processing language TeSSLa. You will learn everything you need to know in order to write your own TeSSLa specifications. We will start with the basic TeSSLa operators and their usage in potentially recursive equations. Then we will have a look at TeSSLa's macro expansion and type system as well as complex data types and how to use those in TeSSLa.

Stream Processing with TeSSLa

TeSSLa can be seen as a stream processing engine, which translates input streams into output streams. This stream transformation process is described as an equation system which derives new streams from the given ones. Derived streams can either be internal streams which are translated further or output streams.

Consider the following very simple example of a TeSSLa specification:

in temperature: Events[Int]

def low = temperature < 3
def high = temperature > 8
def unsafe = low || high

out unsafe

This specification consists of an input stream temperature of integer events and the derived streams low, high and unsafe of which unsafe is marked as an output stream. For every new event on the input stream temperature a new event on the derived streams high and low is computed. Based on these computed events now a new event on the output stream unsafe can be computed:

stream temperature: bubbles
stream low: bubbles
stream high: bubbles
stream unsafe: bubbles
---
1: temperature = 6
1: low = false
1: high = false
1: unsafe = false
2: temperature = 2
2: low = true
2: high = false
2: unsafe = true
3: temperature = 1
3: low = true
3: high = false
3: unsafe = true
4: temperature = 5
4: low = false
4: high = false
4: unsafe = false
5: temperature = 9
5: low = false
5: high = true
5: unsafe = true

Note, that the order in which the intermediate streams are defined is completely up to you. TeSSLa will always figure out the dependency chain and derive dependent events in the correct order.

Order of definitions doesn't matter. TeSSLa will figure out dependency chain.

TeSSLa has the one stream type Events[T] where T is a value type. Value types can be primitive types such as Bool, Int, Float, String and Unit, which is the special type with the only value (). TeSSLa also supports complex data types which are discussed later in this tutorial.

Input streams must always be declared with an explicit type annotation. Types of intermediate streams which are directly or indirectly derived from input streams will be inferred. You still may add type annotations to get better error messages.

The expression temperature < 3 is an expression on value types which is automatically lifted to an expression on stream types such that low is of type Events[Bool]. This automatic lifting is discussed in detail later in this section.

For the primitive types TeSSLa supports many common operations. Every type can be checked for equality with the operators == and !=, which take two operands of the same type and return a Bool. On Bool one can use && for logical and, || for logical or, ! for negation, and so on. On Int TeSSLa supports the common comparison operators <, >, <= and >= which compare two Int and return a Bool, as well as the common arithmetic operators such as +, -, *, / and so on. The same operators are available for Float, too, but in order to syntactically distinguish the operators from the ones on Int they are suffixed with a dot, e.g. +. and >. instead of + and >. This allows automatic type inference for arithmetic expressions. The same holds for literals: 1 is an Int literal and 1.0 is a Float literal. See the documentation of the standard library for a comprehensive list of all available operators and types.

Value Type Operator / Constant Return Type
Any ==, != Bool
Bool true, false Bool
Bool &&, ||, !, … Bool
Int 42, -23, … Int
Int <, >, <=, >=, … Bool
Int +, -, *, /, … Int
Float 42.0, -.23.7, … Float
Float <., >., <=., >=., … Bool
Float +., -., *., /., … Float

Tools

To follow along the examples in this tutorial one can either use the TeSSLa Web IDE or download TeSSLa and use it locally.

Web IDE

The web IDE consists of four text editors:

Exercise. Usage of the web IDE

Copy the temperature example above into the Specification editor on the upper right. Copy the following input trace into the Trace editor in the upper left:

1: temperature = 6
2: temperature = 2
3: temperature = 1
4: temperature = 5
5: temperature = 9

Press the green Run button in the title bar to execute TeSSLa. Notice the output trace in the lower right. Try clicking on TeSSLa Visualization to get a visual representation of the output trace instead.

Try to adjust the specification. You can change the numbers or the definition of unsafe. You can declare more streams as output streams by adding output declarations, e.g. out low. If you want to see all defined streams in the output you can add out *.

Trace Format

Note, that the output is not by accident in the same trace format as the input: TeSSLa can be seen as a stream transformation engine, which derives intermediate and output streams by applying the specification to the input streams.

The output is in the same trace format as the input.

A TeSSLa trace is a text file containing one event per line. Every line

The timestamps must be

Like in this example trace:

# This is a comment
1: a = 2
1: b = 3
2: a = 4
2: b = 2

Multiple events with the same stream name and the same timestamp are not allowed.

Java Command Line Tool

You can download TeSSLa and use it locally. The TeSSLa compiler and interpreter are bundled into a single jar file. The executable takes a TeSSLa specification and an input trace, compiles the TeSSLa specification and generates an output trace by applying the compiled specification to the input trace.

Exercise. Usage of the TeSSLa command line tool

Create a file trace.input and copy the input trace shown in the previous exercise in that file. Create another file specification.tessla and copy the specification shown above into that file. To apply this TeSSLa specification to this trace you can run the following line. Remember that your tessla.jar might have a another name.

java -jar tessla.jar interpreter specification.tessla trace.input

For the rest of this tutorial you can either use the web IDE or the command line tool. You can java -jar tessla.jar -h for more information on the usage of the TeSSLa command line tool.

Non-Synchronized Streams

One of TeSSLa's key feature is the handling of non-synchronized streams. Consider this specification:

in a: Events[Int]
in b: Events[Int]
def c = a + b
out c

If our inputs are nicely synchronized as in the temperature example above, then we can compute a new value for c in every step, because in every step we get a new value for a and a new value for b:

stream a: bubbles
stream b: bubbles
stream c: bubbles
---
1: a = 2
1: b = 3
1: c = 5
2: a = 4
2: b = 3
2: c = 7
3: a = 8
3: b = 1
3: c = 9

But in the reality input streams are not always that nicely arranged. In TeSSLa every event consists of a timestamp and a value. Hence, TeSSLa supports both sparse and high-frequency streams with arbitrary precise timestamps. As an example consider the following slightly adjusted input:

TeSSLa supports both sparse and high-frequency streams with arbitrary precise timestamps.

option axis: true
option timeDomain: [9,38]
stream a: bubbles
stream b: bubbles
---
10: a = 2
10: b = 3
17: a = 4
20: b = 3
30: b = 1
35: a = 8

How can we lift the operation a+b to such inputs?

TeSSLa supports many possible interpretations, but its default is the signal semantics. We understand the events as those points in time where a piece-wise constant signal changes its value. With that interpretation we get the following diagram, where it becomes obvious what a+b should mean:

option axis: true
option timeDomain: [9,38]
stream a: signal
stream b: signal
stream c: signal
---
10: a = 2
10: b = 3
10: c = 5
17: a = 4
17: c = 7
20: b = 3
20: c = 7
30: b = 1
30: c = 5
35: a = 8
35: c = 9

Note that the signal semantics is only a different interpretation of discrete events in TeSSLa. The piece-wise constant signals are still represented as sequence of events.

You can easily combine both worlds in TeSSLa, e.g. if your input is a sequence of discrete events, you can count those events and get a piece-wise constant signal:

in x: Events[Unit]
out count(x) as c
option timeDomain: [0,38]
stream x: unit events
stream c: signal
---
0: c = 0
7: c = 1
7: x = ()
17: c = 2
17: x = ()
20: c = 3
20: x = ()
30: c = 4
30: x = ()
35: c = 5
35: x = ()

Note how we used the syntax out count(x) as c as a shortcut for

def c = count(x)
out c
Exercise. Count With a Reset
  • Take the above specification and an appropriate input stream and execute it. You can double click on the stream diagrams to get a text representation of their input streams.
  • Our standard library provides next to count also resetCount which takes an additional argument. Events on that reset stream reset the counter to 0. Add another input stream r of type Events[Unit] and use events on that stream to reset the the counter.
in x: Events[Unit]
in r: Events[Unit]
def c = resetCount(x, r)
out c
option timeDomain: [0,38]
stream x: unit events
stream r: unit events
stream c: signal
---
0: c = 0
7: c = 1
7: x = ()
17: c = 2
17: x = ()
20: c = 3
20: x = ()
24: r = ()
24: c = 0
30: c = 1
30: x = ()
35: c = 2
35: x = ()

Signal Lift and Lift

As you've already seen, many operations on values such as + are automatically interpreted with the signal semantics on streams. You could also do this manually by explicitly using slift as follows:

in a: Events[Int]
in b: Events[Int]
def c = slift(a, b, Operators.add)
out c

Operators.add is the function on Int which is implicitly used when you use the + operator.

You can also define your own functions on values and pass those to slift:

in a: Events[Int]
in b: Events[Int]
def f(x: Int, y: Int) = if x > y then x - y else x + y
def c = slift(a, b, f)
out c
option axis: true
option timeDomain: [9,38]
stream a: signal
stream b: signal
stream c: signal
---
10: a = 2
10: b = 3
10: c = 5
17: a = 4
17: c = 1
20: b = 3
20: c = 1
30: b = 1
30: c = 3
35: a = 8
35: c = 7
Exercise. Lambda Syntax

With TeSSLa's lambda syntax you can specify a function as anonymous lambda inline. You specify the parameter list including their types followed by a rocket and the function's expression: (x: Int) => x + 1. Rewrite the example above using a lambda instead of the named function f.

in a: Events[Int]
in b: Events[Int]
def c = slift(a, b, (x, y) => if x > y then x - y else x + y)
out c

As you've seen the signal lift always passes the last known values on both streams to the function and creates a new event on the output stream for every event on any of the input streams. If you do not want the signal semantics you can use the more basic lift which does not store any seen values on its own. The given function is still called for every event on the input streams, but if the other input stream does not have an event with the exact same timestamp the lift passes None to the given function instead of any value for the corresponding argument.

In order to have a type-safe encoding of getting either a value or None we use the Option type. If we stick with streams over integers we now need to define a function taking two arguments of type Option[Int] and returning an Option[Int]. If the function returns None, then no event is generated. This can be used for example to filter event streams.

As an example we use merge from the standard library which takes two event streams and creates an event stream containing events from both given streams:

in a: Events[Int]
in b: Events[Int]
out merge(a,b) as c
option timeDomain: [7,38]
stream a: bubbles
stream b: bubbles
stream c: bubbles
---
10: a = 2
10: b = 3
10: c = 2
17: a = 4
17: c = 4
20: b = 3
20: c = 3
30: b = 1
30: c = 1
35: a = 8
35: c = 8

merge is defined in the standard library as follows:

def merge[T](stream1: Events[T], stream2: Events[T]): Events[T] =
  lift(stream1, stream2,
    (x: Option[T], y: Option[T]) => if isNone(x) then y else x)
Exercise. Merge Preferring the Second Argument

Note how merge prefers the first argument if both streams have an event at the same timestamp. Write your own merge which takes the event from the second argument in that case.

def merge[T](stream1: Events[T], stream2: Events[T]): Events[T] =
  lift(stream1, stream2,
    (x: Option[T], y: Option[T]) => if isNone(y) then x else y)

Previous Events and Time

Until now we only derived streams from existing streams by computing new events from simultaneous events. (The signal semantics is an exception which will be discussed in the next section.) In many specifications you want to access previous events, too. As a first example let's compute the delta between successive events:

in x: Events[Int]
def delta = x - last(x,x)
out delta
stream x: events
stream delta: events
---
10: x = 2
17: delta = 2
17: x = 4
20: delta = -1
20: x = 3
30: delta = -2
30: x = 1
35: delta = 7
35: x = 8

The last operator takes two arguments: The first argument is the value stream, whose values are accessed at a later time. The second argument is the trigger stream. For every event on the trigger stream last produces the last known value on the value stream. Note that the last known value does explicitly not include simultaneous events.

in value: Events[Int]
in trigger: Events[Unit]
def lst = last(value, trigger)
out lst
stream value: events
stream trigger: unit events
stream lst: events
---
3: trigger = ()
5: value = 4
7: lst = 4
7: trigger = ()
9: lst = 4
9: trigger = ()
11: value = 7
14: lst = 7
14: trigger = ()
18: lst = 7
18: trigger = ()
18: value = 3
22: lst = 3
22: trigger = ()

Timestamps of events can be accessed with the time operator.

The time operator produces an event stream where all events carry their timestamps as their values, too. The timestamps can then be processed as any other event value without the need of any further explicit syntax. By using time we can adjust the previous delta example so that we get the time passed between two successive events:

in x: Events[Unit]
def t = time(x)
def lst = last(t,x)
def delta = time(x) - lst
out delta
stream x: unit events
stream t: events
stream lst: events
stream delta: events
option axis: true
---
10: t = 10
10: x = ()
17: delta = 7
17: lst = 10
17: t = 17
17: x = ()
20: delta = 3
20: lst = 17
20: t = 20
20: x = ()
30: delta = 10
30: lst = 20
30: t = 30
30: x = ()
35: delta = 5
35: lst = 30
35: t = 35
35: x = ()
Exercise. Combining time and last

The above example contains

def delta = time(x) - last(time(x), x)

What would happen if we use the following instead? Why?

def delta = time(x) - time(last(x, x))

We would take the timestamp of the trigger of last, which would be the timestamp of x. Hence, the delta would always be 0.

Exercise. Event Chain

Let's assume that we have the following effect cause chain of events a, b, c and d:

option axis: true
stream a: unit events
stream b: unit events
stream c: unit events
stream d: unit events
---
10: a = ()
11: b = ()
12: c = ()
13: d = ()
20: a = ()
22: b = ()
24: c = ()
27: d = ()
30: a = ()
32: b = ()
34: a = ()
34: c = ()
36: b = ()
36: d = ()
37: c = ()
39: d = ()

Write a specification which computes the length of the event chain for every event on d. You can do this by triggering three nested lasts with the events on b, c and d in order to get the timestamp of the starting event on a for the chain. Then compute the delta between the timestamps of start and end of the chain.

in a: Events[Unit]
in b: Events[Unit]
in c: Events[Unit]
in d: Events[Unit]

def t = time(a)
def x = last(last(last(t, b), c), d)
def delta = time(d) - x
out delta
option axis: true
stream a: unit events
stream b: unit events
stream c: unit events
stream d: unit events
stream x: events
stream delta: events
---
10: a = ()
11: b = ()
12: c = ()
13: delta = 3
13: x = 10
13: d = ()
20: a = ()
22: b = ()
24: c = ()
27: delta = 7
27: x = 20
27: d = ()
30: a = ()
32: b = ()
34: a = ()
34: c = ()
36: b = ()
36: delta = 6
36: x = 30
36: d = ()
37: c = ()
39: delta = 5
39: x = 34
39: d = ()
Exercise. Runtime

Assume that we get events for every call and return of a specific function:

option axis: true
stream call: unit events
stream ret: unit events
---
10: call = ()
17: ret = ()
25: call = ()
35: ret = ()
57: call = ()
69: ret = ()

Compute the average runtime of the function. You might want to use sum and count from the standard library in order to compute an average value.

in call: Events[Unit]
in ret: Events[Unit]

def x = last(time(call), ret)
def duration = time(ret) - x

def avg = average(duration)

out avg
option axis: true
stream call: unit events
stream ret: unit events
stream x: events
stream duration: events
stream avg: events
---
10: call = ()
17: avg = 7
17: duration = 7
17: x = 10
17: ret = ()
25: call = ()
35: avg = 8
35: duration = 10
35: x = 25
35: ret = ()
57: call = ()
69: avg = 9
69: duration = 12
69: x = 57
69: ret = ()

Another solution using sum and count:

in call: Events[Unit]
in ret: Events[Unit]

def x = last(time(call),ret)
def duration = time(ret) - x

def avg = if duration == 0 then 0 else sum(duration)/count(duration)

out avg

Understanding Signal Lift and Filter

Actually the slift from the standard library is defined using lift, last and merge:

def slift[T,U,V](a: Events[T], b: Events[U], f: (T,U) => V): Events[V] = {
  def aa = merge(a, last(a, b))
  def bb = merge(b, last(b, a))
  lift(aa, bb, (x: Option[T], y: Option[U]) =>
    if isNone(x) || isNone(y)
    then None[V]
    else Some(f(getSome(x), getSome(y))))
}
in a: Events[Int]
in b: Events[Int]
def c = a + b
out c
stream a: signal
stream b: signal
stream aa: bubbles
stream bb: bubbles
stream c: bubbles
---
1: a = 1
1: b = 2
1: aa = 1
1: bb = 2
1: c = 3
2: b = 4
2: aa = 1
2: bb = 4
2: c = 5
3: a = 3
3: aa = 3
3: bb = 4
3: c = 7

The filter defined in the standard library can be used to filter event streams based on a boolean condition stream. Filtering applies the signal semantics only to the condition stream and not the stream of events which is going to filtered. Otherwise filter could introduce new events in some edge cases which would be counter intuitive.

in a: Events[Int]
in cond: Events[Bool]
def f = filter(a, cond)
out f
def filter[A](events: Events[A], condition: Events[Bool]) = {
  def cc = merge(condition, last(condition, events))
  lift(events, cc, (e: Option[A], c: Option[Bool]) =>
    if isSome(c) && getSome(c) then e else None[A])
}
stream a: bubbles
stream cond: signal
stream cc: bubbles
stream f: bubbles
---
0: cond = true
1: cc = true
1: f = 1
1: a = 1
2: cond = false
2: cc = false
3: a = 2
3: cc = false
4: a = 3
4: cc = false
5: cond = true
5: cc = true
5: f = 4
5: a = 4
6: cc = true
6: f = 5
6: a = 5

Recursive Equations

The equations in a TeSSLa specification can be recursive as long as there is at least one last guarding every recursive definition.

Only the value argument of last can be recursive, the trigger argument – which is the second argument – must be non-recursive.

Such recursive definitions can be used to aggregate data over time. As an example consider the following definition which sums up the values of all events on x:

in x: Events[Int]
def lastValue: Events[Int] = last(withDefault, x)
def withCurrent = lastValue + x
def withDefault = merge(withCurrent, 0)
out withDefault
option timeDomain: [-1,10]
stream x: bubbles
stream lastValue: bubbles
stream withCurrent: bubbles
stream withDefault: bubbles
---
0: withDefault = 0
3: x = 2
3: lastValue = 0
3: withCurrent = 2
3: withDefault = 2
5: x = 1
5: lastValue = 2
5: withCurrent = 3
5: withDefault = 3
8: x = 3
8: lastValue = 3
8: withCurrent = 6
8: withDefault = 6

Note, that every recursive definition needs at least on explicit type annotation.

We take the events on x as trigger for the last and the later defined stream withDefault as value. Since last only refers to value events which have been strictly before the trigger, this definition can be unrolled over time. In the next line we take the lastValue of the aggregation and add the current value of x. Then we merge a default value of 0 into the stream to get a starting point for the recursion. The same recursive definition could be written in one line as

def s: Events[Int] = merge(last(s, x) + x, 0)
Exercise. Counting

Update the above specification so that we count the number of events instead of summing up their values.

in x: Events[Int]
def s: Events[Int] = merge(last(s, x) + 1, 0)
out s
option timeDomain: [-1,10]
stream x: unit events
stream lastValue: bubbles
stream newValue: bubbles
stream withDefault: bubbles
---
0: withDefault = 0
3: x = ()
3: lastValue = 0
3: newValue = 1
3: withDefault = 1
5: x = ()
5: lastValue = 1
5: newValue = 2
5: withDefault = 2
8: x = ()
8: lastValue = 2
8: newValue = 3
8: withDefault = 3
Exercise. Sum Without Default Value

In the above specification we sum up the values or count the number of events. In both specifications we start with a default value of 0 at timestamp 0. Rewrite the specifications so that they do not produce an event at timestamp 0, but start with the first input event.

def s = last(merge(s, 0), x) + x

and

def s = last(merge(s, 0), x) + 1

The standard library provides a generic fold which can be used to express count and sum, too:

in x: Events[Int]
# summing up the event's values
def s = fold(x, 0, Operators.add)
# counting the events
def c = fold(x, 0, (acc, v) => acc + 1)
out s
out c
Exercise. Recursion With Multiple Lasts

Assume that there are two input streams x and y. Build a specification which aggregates their event's values as follows:

  • Start with 0.
  • For every event on x add the event's value.
  • For every event on y multiply the event's value.
in x: Events[Int]
in y: Events[Int]
def s = merge3(last(s, x) + x, last(s, y) * y, 0)
out s

An equivalent solution with fold can be found below. Therefore it is necessary to zip the values of the two streams together to a single stream. For this we use the stdlib function zip:

in a: Events[Int]
in b: Events[Int]

def specialSum(acc: Int, v: (Option[Int], Option[Int])): Int = {
  if isSome(v._1) then 
    acc + getSome(v._1) 
  else
    acc * getSome(v._2) 
}

out fold(zip(a,b), 0, specialSum)
option timeDomain: [0,20]
stream x: events
stream y: events
stream s: signal
---
0: s = 0
7: x = 4
7: s = 4
9: s = 8
9: y = 2
12: x = 1
12: s = 9
15: s = 27
15: y = 3
17: x = 3
17: s = 30

Generating Events With Delay

So far all the TeSSLa operators that we discussed in this tutorial have been timestamp-conservative, i.e. they can only generate an event at a specific timestamp if there has been an event with that timestamp on any of their input streams. The delay operator is the only operator that can create events with new timestamps.

(See the TeSSLa paper for a more detailed discussion of the expressiveness of different TeSSLa fragments.)

The delay takes two arguments:

There is one additional rule: New delays are only activated if they are set together

This additional restriction ensures that there is always only one active delay. Hence, TeSSLa engines only keep track of at most one active delay per delay operator in the specification. Without this restriction they would need to realize a queue of active delays for every delay operator in the specification.

in amount: Events[Int]
in reset: Events[Unit]

def x = delay(amount, reset)

out x
option axis: true
stream amount: events
stream reset: unit events
stream x: unit events
---
5: amount = 2
9: reset = ()
9: amount = 2
11: x = ()
11: amount = 3
14: x = ()
14: amount = 2
15: reset = ()
17: reset = ()
17: amount = 3
20: x = ()
20: amount = 3
22: reset = ()
22: amount = 2
24: x = ()
26: reset = ()

Recursive Equations with Delay

The delay operator can be used in recursive equations if its second argument – the reset stream – is not recursive.

In its simplest form such a recursion will generate an event in a fixed period, as shown in the example below.

Note that the TeSSLa software interpreter has a synchronized internal progress, i.e. no output events are generated after the last input event. Without input events, no output events are computed, even if they do not depend on the inputs. On the other hand, if the interpreter receives an input event at a certain timestamp, all output events up to that timestamp (exclusively) are computed. In the next examples we will therefore use an explicit input stream progress to specify how long we want to generate events.

in progress: Events[Unit]

def x: Events[Unit] = merge(delay(const(3, x), ()), ())

out x
option axis: true
option timeDomain: [-0.9,20.9]
stream progress: unit events
stream x: unit events
---
0: x = ()
3: x = ()
6: x = ()
9: x = ()
12: x = ()
15: x = ()
18: x = ()
20: progress = ()

Note the similarities to recursive equations with last:

We need to merge the recursive stream with a default value to have a starting point for the recursion and we have to arguments to the default operator:

The next canonical example for a recursive equation with the delay operator is a period with a variable frequency:

in progress: Events[Unit]
in f: Events[Int]

def x = merge(f, last(f, delay(x, f)))

out x
option axis: true
stream progress: unit events
stream f: events
stream x: events
---
3: f = 2
3: x = 2
5: x = 2
7: f = 3
7: x = 3
10: x = 3
13: x = 3
14: f = 2
14: x = 2
16: x = 2
18: x = 2
20: x = 2
21: progress = ()

Note how we used the output of the delay to trigger the last. It is allowed to trigger a last with a recursive stream as long as there is at least one operator on the recursive loop that breaks the recursion. In this case the delay prevents the recursion from becoming infinite.

Exercise. Merge in Variable Frequency Delay

What happens in the above TeSSLa specification if an event on f happens simultaneously with a generated event on x? Is the old frequency used for another round or do we immediately use the new delay? How could we change this behavior?

The merge operator prefers events on its first argument. Hence, in the above specification f would win over last(f, delay(x, f)). If we exchange these two arguments of the merge operator, we would get a different behavior, where the new frequency would not be used immediately.

At the end of this tutorial we will see an elaborated example where we combine a complex data structure and the delay operator.

Language Features

You know all the basic TeSSLa operators now. In this section we will shortly discuss some advanced language features.

Macros

Remember how we defined functions on primitive values:

def f(x: Int, y: Int) = if x > y then x - y else x + y

In the same way we can also define macros on streams:

def prev(x: Events[Int]) = last(x,x)

The fundamental difference between functions on primitive values and macros on streams is that macros are expanded at compile time and functions are evaluated at runtime.

To use macros in your code add them to your specification. In the following example we defined a macro to add the value five to the last input value.

def add5ToLast(x: Events[Int]) = last(x,x) + 5
in x: Events[Int]
out add5ToLast(x)

Here is a simple trace to use with the example:

10: x = 1
20: x = 2
30: x = 3
40: x = 4

Functions and macros can have multi-line bodies and temporary stream definitions:

def sum(x: Events[Int]) = {
  def s = merge(last(s, x) + x, 0)
  s
}

Note how we defined a temporary stream s inside that macro and use that stream as the macros value. A block always consists of a sequence of definitions and a final expression. The block evaluates to that final expression.

Exercise. Event Chain Macro

Define a macro that can be used to measure the latency of cause effect chains based on the earlier exercise. It should take the four events of the event chain as inputs and generate an event stream of the durations of all seen chains.

def eventChain(a: Events[Unit], b: Events[Unit], c: Events[Unit], d: Events[Unit]) = {
  def t = time(a)
  def x = last(last(last(t, b), c), d)
  def delta = time(d) - x
  delta
}
Exercise. Runtime Macro

Define a macro that can be used to measure the runtime of a function based on the earlier exercise. It should take the call and return events of the function as inputs and generate an event stream of the current runtimes.

def runtime(call: Events[Unit], ret: Events[Unit]) = {
  def x = last(time(call), ret)
  time(ret) - x
}

Coming back to the function f: For many functions on primitive values one wants a corresponding macro which applies the function on a stream using the signal semantics implemented by slift:

def f(x: Int, y: Int) = if x > y then x - y else x + y
def f2(x: Events[Int], y: Events[Int]) = slift(x, y, f)

Such a macro can be generated automatically using the keyword liftable:

liftable
def f(x: Int, y: Int) = if x > y then x - y else x + y

Now the slift will be wrapped automatically around f if you use f on streams.

Type System

We have already seen the primitive types which are available in TeSSLa. Together with the stream type Events[T] they form the basic available types.

More complex data structures are discussed in the next sections.

Functions

We already used the lift and slift macro which take functions as an argument. Functions are first-class citizens in TeSSLa and can be used like any other primitive type. For example the following function

def f(x: Int, y: Int) = x + 10 * y

has the type

(Int, Int) => Int
Type Inference

Types in TeSSLa are inferred wherever possible, but can be explicitly annotated in many places in order to get better error messages from the type checker.

Currently the type inference requires you to annotate

Literals

Literals, e.g. 5 or false, are automatically converted to streams with one event at timestamp 0 with that value.

In the context of the signal semantics this allows you to write x+5 for a stream x and you end up with x + const(5, ()) which is slift(x, const(5, ()), Operators.add) which is usually what you want.

Generics

TeSSLa supports generic types, so for example instead of

def prev(x: Events[Int]) = last(x,x)

we could also define the more general

def prev[A](x: Events[A]) = last(x,x)
Subtyping

TeSSLa has no subtyping.

Complex Data Structures

Apart from the primitive data types, TeSSLa support different kinds of complex data structures. TeSSLa is defined agnostically with respect to any time or data domain.

Different data structures can be used to represent time and data. Especially the TeSSLa engine written in Scala used in this tutorial supports complex data structures like lists, trees and maps. We first look at the internal data structures, i.e. options, objects and tuples, and then the external data structures, which are provided by the TeSSLa engine.

Options

We already know the Option[T] type from the lift operation, but you can use the option type for other purposes, too. There are the usual constructors None and Some as well as the usual accessors isNone, isSome, getSome, getSomeOrElse. Have a look at the documentation of the standard library on how to use them.

Type Option[T]
Constructors None, Some
Accessors isNone, isSome, getSome, getSomeOrElse
Exercise. Checked Division

Write a function checkedDivision(dividend: Int, divisor: Int): Option[Int] which returns the result of the division if the divisor is not 0 and None otherwise. Use that function with getSomeOrElse to realize a division which returns 0 if the divisor is 0.

def checkedDivision(dividend: Int, divisor: Int) =
  if divisor == 0 then None[Int]
  else Some(dividend / divisor)

out getSomeOrElse(checkedDivision(27, 0), 0)

Objects and Tuples

TeSSLa natively supports objects. Objects are created using the following object literal syntax:

def o = {a = 5, b = false}

This object has the type {a: Int, b: Bool}. Attributes of an object with that type can be accessed using the dot operator:

def x = if o.b then o.a else - o.a

Tuples are objects with the implicit attribute names _1, _2 and so on. They can be created using the following tuple literal syntax:

def t = (5, false)

This tuple has the type (Int, Bool), which is equivalent to {_1: Int, _2: Bool}. Elements of a tuple with that type can be accessed using the dot operator:

def x = if o._2 then o._1 else - o._1

It is possible to have objects (and tuples) of streams and even mixed objects, too:

in a: Events[Int]
def mixed = {a = 5, s = (a + 7, a - 7)}
out mixed.s._1

External Data Structures

External data structures are provided by the TeSSLa engine.

Sets

We can use sets to keep track of all the values that we have seen so far:

in x: Events[Int]
def seen: Events[Set[Int]] = merge(Set.add(last(seen, x), x), Set.empty[Int])
def new = !Set.contains(last(seen, x), x)
out new
stream x: bubbles
stream new: bubbles
---
1: new = true
1: x = 1
2: new = true
2: x = 2
3: new = true
3: x = 3
4: new = false
4: x = 1
5: new = false
5: x = 1
6: new = false
6: x = 1
7: new = false
7: x = 3
8: new = true
8: x = 4
9: new = false
9: x = 2
Map

If we use a map instead of a set we can even keep track how often we have seen that value so far:

in x: Events[Int]

def old = last(seen, x)
def seen: Events[Map[Int, Int]] =
  merge(Map.add(old, x,
    if Map.contains(old, x) then Map.get(old, x) + 1 else 1),
    Map.empty[Int, Int])
def c = Map.get(seen, x)
def new = c == 1

out new
out c
stream x: bubbles
stream new: bubbles
stream c: bubbles
---
1: c = 1
1: new = true
1: x = 1
2: c = 1
2: new = true
2: x = 2
3: c = 1
3: new = true
3: x = 3
4: c = 2
4: new = false
4: x = 1
5: c = 3
5: new = false
5: x = 1
6: c = 4
6: new = false
6: x = 1
7: c = 2
7: new = false
7: x = 3
8: c = 1
8: new = true
8: x = 4
9: c = 2
9: new = false
9: x = 2
List

With a list we can compute the average over all the values that we have seen so far:

in x: Events[Int]

def old = last(seen, x)
def seen: Events[List[Int]] = merge(List.append(old, x), List.empty[Int])
def sums = List.fold(seen, 0, (a: Int, b: Int) => a + b)
def avg = if List.size(seen) > 0 then sums / List.size(seen) else 0

out avg
stream x: plot
stream avg: dots
---
0: avg = 0
1: avg = 10
1: x = 10
2: avg = 15
2: x = 20
3: avg = 20
3: x = 30
4: avg = 17
4: x = 10
5: avg = 16
5: x = 10
6: avg = 15
6: x = 10
7: avg = 17
7: x = 30
8: avg = 20
8: x = 40
9: avg = 20
9: x = 20
List of the Last Three Elements

With a few changes we can adopt this specification so that we only keep track of the last three elements:

in x: Events[Int]

def old = last(seen, x)
def rem = if List.size(old) > 3 then List.tail(old) else old
def seen: Events[List[Int]] = merge(List.append(rem, x), List.empty[Int])
def sum_seen = List.fold(seen, 0, (a: Int, b: Int) => a + b)
def avg = sum_seen / 3

out avg
stream x: plot
stream avg: dots
---
0: avg = 0
1: avg = 3
1: x = 10
2: avg = 10
2: x = 20
3: avg = 20
3: x = 30
4: avg = 23
4: x = 10
5: avg = 23
5: x = 10
6: avg = 20
6: x = 10
7: avg = 20
7: x = 30
8: avg = 30
8: x = 40
9: avg = 33
9: x = 20

Example With Complex Data Structures

We can use the data structures presented in this section to define custom data structures in TeSSLa. In this example we want to compute the average load of a processor in the last five time units. Other than in the last example where we always kept three elements in our list, in this scenario the amount of data points that we have to keep track of heavily depends on the timestamps of the incoming events. In theory there is not even an upper bound for the number of events that might have happened in the last five time units. Thus, we realize this moving window using a queue storing all events that happened in the time window. The stream load contains an event every time the input load changes.

Such a queue with corresponding operators can be defined as follows in TeSSLa

type Queue[D] = List[(Int, D)]

def enq[D](t: Int, d: D, q: Queue[D]) = 
  List.append(q, (t, d))

def Queue_empty[D] = List.empty[(Int, D)]

def Queue_fold[Acc,D](q: Queue[D], zero: Acc, until: Int, f: (Acc, Int, Int, D) => Acc) = {
  def result = List.fold(q, {acc = zero, last = None[(Int,D)]},
    (accum: {acc: Acc, last: Option[(Int, D)]}, el: (Int, D)) =>
      if isNone(accum.last)
      then {acc = accum.acc, last = Some(el)}
      else {acc = f(accum.acc, getSome(accum.last)._1, el._1, getSome(accum.last)._2), last = Some(el)})
  if isNone(result.last) then zero
  else f(result.acc, getSome(result.last)._1, until, getSome(result.last)._2)
}

def remOlder[D](t: Int, q: Queue[D]) =
  Queue_fold(q, Queue_empty[D], -1, (acc: Queue[D], from: Int, to: Int, d: D) =>
    if 0 < to && to <= t then acc
    else enq(max(t, from), d, acc))
Specification

Using these operators we can define the following specification:

in load: Events[Float]

def qLen = 5

def stripped: Events[Queue[Float]] =
  slift(time(load), merge(last(queue, load), Queue_empty[Float]),
    (t: Int, q: Queue[Float]) => remOlder(t - qLen, q))

def queue: Events[Queue[Float]] =
  slift3(time(load), load, stripped, (t: Int, d: Float, q: Queue[Float]) => enq(t,d,q))

def Queue_integral(queue: Queue[Float], until: Int) =
  Queue_fold(queue, 0.0, until,
    (acc: Float, from: Int, to: Int, d: Float) => acc +. d *. intToFloat(to - from) /. intToFloat(qLen))

def avg = slift(queue, time(load), Queue_integral)

out avg
option axis: true
stream load: plot
stream avg: graph
---
1: avg = 0.00
1: load = 0.30
3: avg = 0.12
3: load = 0.50
7: avg = 0.46
7: load = 0.70
10: avg = 0.62
10: load = 0.40
13: avg = 0.52
13: load = 0.30
16: avg = 0.34
16: load = 0.80
Remove Outdated Elements Automatically

Note how the results are slightly inaccurate if an event should be dropped out of the queue, because we only update the generated average with every change in the load. We can fix that using a delay which will remove outdated elements from the queue automatically. First we need a new operator on queues that can compute the timeout, i.e. the timestamp when the next element is going to drop out of the queue.

def Queue_timeout[D](q: Queue[D]) =
  if List.size(q) < 2 then 0
  else List.head(List.tail(q))._1
Improved Specification

Now we can add a stream timeout to the specification which contains the delays used to trigger an update of the queue:

in load: Events[Float]
in progress: Events[Unit]

def qLen = 5

def stripped: Events[Queue[Float]] =
  slift(time(load), merge(last(queue, load), Queue_empty[Float]),
    (t: Int, q: Queue[Float]) => remOlder(t - qLen, q))

def queue: Events[Queue[Float]] =
  slift3(time(load), load, stripped,
    (t: Int, d: Float, q: Queue[Float]) => enq(t,d,q))

def to = slift(time(updated), updated,
  (t: Int, q: Queue[Float]) => Queue_timeout(q) - t + qLen)
def timeout = filter(to, to > 0)

def merged = merge(queue, last(updated, delay(timeout, load)))

def updated: Events[Queue[Float]] = slift(time(merged), merged,
  (t: Int, q: Queue[Float]) => remOlder(t - qLen, q))

def Queue_integral(queue: Queue[Float], until: Int) =
  Queue_fold(queue, 0.0, until,
    (acc: Float, from: Int, to: Int, d: Float) => acc +. d *. intToFloat(to - from) /. intToFloat(qLen))

def avg = slift(updated, time(updated), Queue_integral)

out avg
option axis: true
stream progress: unit events
stream load: plot
stream timeout: events
stream avg: graph
---
1: avg = 0.0
1: load = 0.3
3: avg = 0.12
3: timeout = 5
3: load = 0.5
7: avg = 0.46
7: timeout = 1
7: load = 0.7
8: avg = 0.54
8: timeout = 4
10: avg = 0.62
10: timeout = 2
10: load = 0.4
12: avg = 0.58
12: timeout = 3
13: avg = 0.52
13: timeout = 2
13: load = 0.3
15: avg = 0.36
15: timeout = 3
16: avg = 0.34
16: timeout = 2
16: load = 0.8
18: avg = 0.5
18: timeout = 3
21: avg = 0.8
22: progress = ()

Module system and includes

In TeSSLa it is possible to define macros, streams and variables in dedicated modules. Outside of these modules the functions can be accessed via Module.Submodule.identifier like shown in the following example:

in x: Events[Int]

module Mod {
 module SMod {
  import StreamFunctions
 
   def myCount[A](x: Events[A]) = fold(x, 0, (curr: Int, _: A) => curr + 1)
 }
}

out Mod.SMod.myCount(x)

To access module members directly one can use the import keyword to make these names accessible in the current scope:

in x: Events[Int]

module Mod {
 module SMod {
   import StreamFunctions
   
   def myCount[A](x: Events[A]) = fold(x, 0, (curr: Int, _: A) => curr + 1)
 }
}

import Mod

out SMod.myCount(x)

Note that imported namespaces are not accessible from submodules of the importing modules. If you want to use them there you have to import them again. This is why in the example we explicitly have to import the stdlib module StreamFunctions in Mod.SMod, which contains for example the fold macro. While the StreamFunctions is imported in the outermost scope automatically by the standard library, it is not automatically available in modules.

It is also possible to access definitions from other TeSSLa files. This can be done by the include keyword followed by the relative path of the file which shall be included as string.

include.tessla:

def myCount[A](x: Events[A]) = fold(x, 0, (curr: Int, _: A) => curr + 1)

spec.tessla:

include "include.tessla"

in x: Events[Int]

out myCount(x)

Since version 2.1.0 the TeSSLa compiler also includes a package manager. With the keywords include lib a whole library (which may consist of several TeSSLa files) can be automatically retrieved from the central TeSSLa library repository and included in the current specification. Therefore one provides the name and version of the library in the format ":" behind the `include lib`, as done in the following example:

include lib "tddl:1.0.1"
import TDDL

in x: Events[String]

def p1 = O("a", (Some(2), Some(4)))

def monitor = Monitor(p1)
def res = monitor(x)

out res

When executing this code the compiler first checks if the desired version (1.0.1) of the TeSSLa TDDL library is already available in a folder called dependencies at the same location as the compiled TeSSLa specification. If this is not the case the package manager is trying to download the library from the central TeSSLa library repository. Further it checks if the library is compatible with the current TeSSLa compiler version and then includes the TeSSLa sources of the library as specified in the library.json of the corresponding library.

A list of available TeSSLa libraries can be found here.

Conclusion

Now you learned how to write complex TeSSLa specifications. You can find more practical information on how to use TeSSLa in the documentation of TeSSLa's standard library. More information on the language TeSSLa can be found in the TeSSLa language specification. The theoretical background and expressiveness of TeSSLa is discussed in several publications on TeSSLa.

You can now continue with the tutorial on doing runtime verification of C code with TeSSLa.