Apr 19th, 2019 - written by Kimserey with .
In our previous post on mutability, we discussed the fact that time was to be taken into consideration when mutability was involved. We observed objects lifetime as instant by instant, computing and assigning objects states for each instant. Another way to look at time is to consider the state as a discrete set of values. From this perspective, we can see the time as being an infinite sequence, a stream, and the state can also be seen as a stream computed out of time.
Today we will look at how to represent a stream, and in particular how to model an infinite stream together with operation working on the infinite stream. We will also make a connection with stream implementations in common languages.
A stream is an abstract concept representing a sequence of elements over the time. If we think of the delta between each time sample as instant, in other word zero, we can implement a stream as a list, where all events in time happen instantly.
But as we know, this isn’t the case. Most of the events don’t occur instantly at the scale where we normally work. Therefore to represent the delta, a stream introduces the technique of delayed evaluation. A delayed evaluation, also called lazy evaluation, delays the execution of an expression until needed.
For example, if we want to build a range of integer, we can create interval
procedure.
1
2
3
4
(define (interval low high)
(if (> low high)
'()
(cons low (interval (+ low 1) high))))
And executing interval
would yield a complete list.
1
2
> (interval 0 10)
'(0 1 2 3 4 5 6 7 8 9 10)
At each recursion, the cons
step needs to evaluate the arguments prior being able to apply cons
. Therefore the result of interval
is the fully computed list with intergers from 0 to 10.
One of the problem of having the list fully computed is if we needed only a subset of the list, we would still need to compute the whole list.
For example, if we need to take three elements from the list, we define take
, a procedure accepting an interger n
and a list.
1
2
3
(define (take n xs)
(cond [(= n 0) '()]
[else (cons (car xs) (take (- n 1) (cdr xs)))]))
Using interval
, we would be able to get the three first values of the interval.
1
2
> (take 3 (interval 0 20))
'(0 1 2)
As we saw earlier, (interval 0 20)
will need to be evaluated prior executing the filter. We can witness the performance problem by creating an interval for an extremely large range.
1
2
> (take 3 (interval 0 10000000000000000000000))
. Interactions disabled; out of memory
Even though the resulting subset is composed of only the three first values of the list, the program is unable to complete due to interval
execution provoking an out of memory exception.
A stream solves this problem by introducing delay.
If we think of a list as a pair composed of a an element plus a rest, where the rest points to another element, also a pair, composed of a value plus another rest, and so on until the last element being composed of its value plus the empty list, we can represent a list as followed.
1
2
> (cons 0 (cons 1 (cons 2 (cons 3 '()))))
'(0 1 2 3)
Where car
provides the first value and cdr
the rest.
1
2
3
4
> (car (cons 0 (cons 1 (cons 2 (cons 3 '())))))
0
> (cdr (cons 0 (cons 1 (cons 2 (cons 3 '())))))
'(1 2 3)
A stream is constructed by introducing a delay in execution of the rest. The delay can be implemented using a lambda, which would need to be executed on top of being evaluated. The implementation would look as such:
1
2
> (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream)))))))
'(0 . #<procedure>)
Each cdr
is represented by a lambda delaying the execution. Therefore each element of the list is provided on demand. And at each iteration, the next value is a delayed cdr
which needs to be forced.
If we want the first value, we use car
.
1
2
> (car (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream))))))))
0
And if we want the next value, we use cdr
, returning a procedure which once executed containing the next value in car
position.
1
2
> (cdr (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream))))))))
#<procedure>
To execute the procedure, we create force
, a procedure used to force the execution, and stream-car
and stream-cdr
, car
and cdr
used with streams.
1
2
3
4
5
6
7
8
9
10
11
(define (stream-null? xs)
(eq? xs 'empty-stream))
(define (force expr) (expr))
(define (stream-car xs) (car xs))
(define (stream-cdr xs)
(if (stream-null? xs)
'empty-stream
(force (cdr xs))))
Stream-cdr
forces the execution at the moment when the cdr
is requested. The 'empty-stream
denotes an empty stream, and therefore is used as the last value to denote the end of a stream.
1
2
3
4
5
6
> (stream-cdr (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream))))))))
'(1 . #<procedure>)
> (stream-cdr(stream-cdr (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream)))))))))
#<procedure:>>
'(2 . #<procedure>)
Going back to our implementation of interval
, we can introduce delay by wrapping the rest of the list in a lambda.
1
2
3
4
(define (stream-interval low high)
(if (> low high)
'empty-stream
(cons low (λ () (stream-interval (+ low 1) high)))))
When we execute stream-interval
, we now get a pair composed of a value and the delayed rest of the sequence.
1
2
> (stream-interval 0 1000000000)
'(0 . #<procedure:...osts/streams.rkt:11:16>)
And using stream-cdr
, we can access the next value.
1
2
> (stream-cdr (stream-interval 0 1000000000))
'(1 . #<procedure:...osts/streams.rkt:11:16>)
We can then define take-stream
which wraps the cdr
of the initial cons
in a lambda and uses stream-cdr
.
1
2
3
4
(define (take-stream n xs)
(cond [(= n 0) 'empty-stream]
[else (cons (stream-car xs)
(λ () (take-stream (- n 1) (stream-cdr xs))))]))
Using take-stream
, we solve the issue by delaying the execution of the next elements. At each execution of stream-cdr
, we resolve a new element on demand until the n
’s element were 'empty-stream
is then returned.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
> (take-stream 3 (stream-interval 0 10000))
'(0 . #<procedure:...osts/streams.rkt:29:20>)
> (stream-cdr (take-stream 3 (stream-interval 0 10000)))
'(1 . #<procedure:...osts/streams.rkt:29:20>)
> (stream-cdr (stream-cdr (take-stream 3 (stream-interval 0 10000))))
'(2 . #<procedure:...osts/streams.rkt:29:20>)
> (stream-cdr (stream-cdr (stream-cdr (take-stream 3 (stream-interval 0 10000)))))
'empty-stream
> (stream-cdr (stream-cdr (stream-cdr (stream-cdr (take-stream 3 (stream-interval 0 10000))))))
'empty-stream
Essentially the whole interval no longer needs to be completely evaluated. If we define a for-each-stream
1
2
3
4
5
6
(define (for-each-stream proc xs)
(if (stream-null? xs)
'done
(begin
(proc (stream-car xs))
(for-each-stream proc (stream-cdr xs)))))
we can display the stream without incurring any performance issue.
1
2
3
4
5
> (for-each-stream displayln (take-stream 3 (stream-interval 0 100000000)))
0
1
2
'done
The stream is iterated for as many elements as needed by the take-stream
filter.
Stream implementation with delay allows us to implement infinite sequences. For example instead of having an interval, we can implement a procedure constructing a sequence starting for a particular integer.
1
2
(define (integers-starting-from n)
(cons n (λ () (integers-starting-from (+ n 1)))))
If the cdr
was not delayed, this procedure implementation would recurse infinitely. But because it is delayed, we can use stream-cdr
to query the next element as necessary.
1
2
3
4
5
> (integers-starting-from 1)
'(1 . #<procedure:...osts/streams.rkt:40:10>)
> (stream-cdr (integers-starting-from 1))
'(2 . #<procedure:...osts/streams.rkt:40:10>)
In our previous post about Fibonacci, we saw that Fibonacci was defined as $f_{n}=f_{n-1} + f_{n-2}$.
We can write Fibonacci in term of stream
1
2
3
(define (fib-stream a b) (cons a (λ () (fib-stream b (+ a b)))))
(define fibonacci (fib-stream 0 1))
Similarly to integers-starting-from
, fib-stream
infinitely recurse providing at each iteration the next Fibonacci number. In order to appreciate this implementation, we define stream-ref
, a procedure returning an element of the stream at an index n
.
1
2
3
4
5
(define (stream-ref n xs)
(cond
[(stream-null? xs) 'empty-stream]
[(= n 0) (car xs)]
[else (stream-ref (- n 1) (stream-cdr xs))]))
We can then use stream-ref
to retrieve a specific Fibonacci number from the infinite sequence of Fibonacci number.
1
2
3
4
5
> (stream-ref 10 fibonacci)
55
> (stream-ref 300 fibonacci)
222232244629420445529739893461909967206666939096499764990979600
The concept of a stream is expended to data streams. In programming languages, when talking about streams, we usually refer to data being transferred from a system to another.
For example, reading a file in a program would be done via a stream, in dotnet C#, that would be via a FileStream
or StreamReader
.
1
2
3
4
5
6
7
using (StreamReader stream = File.OpenText("./file.txt"))
{
while (!stream.EndOfStream)
{
Console.WriteLine(stream.ReadLine());
}
}
The StreamReader
implementation provides a way to represent a sequence of data composing a file, which can be iterated in multiple stages. With ./file.txt
, we used the lines as separator and iterated over the whole content line by line. We could have iterated block of bytes by block of bytes which would have been appropriate if there was no clear way of splitting the data.
The same type of data stream is used for communication over transport layer protocols. In applications, we handle all sort of transmissions, with data transmitted over the time, as streams. And we represent packets of data composing the overal message as elements of the stream.
The delay method provides a way to the receiver to read packets of data on demand rather than forcing the whole transmission at a single time.
Today we looked into what streams were. We started by looking at how delay could be introduced and saw how time could be modeled with streams. We then looked into creating infinite streams and looked into one example where it could be used. Lastly we made the link between the concept and the implementation in common programming languages. I hope you liked this post and I see you on the next one!