I’m a big fan of Rust, although I usually have little free time to fiddle with it. But recently when I had a task at my job to create a small, greenfield networking application I decided to take the chance and experiment with asynchronous programming and the Tokio framework in Rust. I didn’t just succeed but actually had so much fun, that I even managed to push it to the other extreme: the whole console application essentially became a single async event stream.
ORIGINALLY PUBLISHED ON MEDIUM ON JULY 8, 2019
The application’s task is to discover devices on the local network that broadcast over UDP: it listens on a given port for a minute and collects packets that adhere to a specific format, along with their originator IP addresses. In this article, I’ll only cover the interesting parts, but you can find the complete source on Tresorit’s GitHub.
Asynchronous programming enables you to continue working on tasks on a single thread while you’re application waits for some long-running operation to complete, resulting in a more efficient use of CPU time. When you’re doing async programming, you have an event loop at the core of your application, which dispatches execution to the processing functions once an external event arises. In Rust terms, you define a long-running, asynchronous task, called a
Future, that will be repeatedly polled by the
Executoruntil the external condition it’s waiting for is fulfilled and it will finally return its actual result. Rust’s nice functional-style operators also appear here that allows you to chain and combine your futures to implement a more complex, yet fully asynchronous tasks without sacrificing efficiency.
From datagram bytes to a packet stream
The UDP packets we’re looking for have a simple JSON payload, so we’re starting with declaring that as a regular Rust struct, along with annotations for the powerful Serde library that will do the heavy lifting of JSON deserialization for us.
The Tokio framework gives you all building blocks to start doing async networking, starting with
UdpFramed. The latter converts incoming UDP datagrams into an async
Stream: a long-running async operation which, unlike to a simple
Future, can yield several results of the same type before being terminated — an ideal async model for a network connection.
UdpFramed instance can even assemble higher level protocol data structures for us out of the network byte stream, regardless of how they are aligned within the stream; they can even span over multiple datagrams. All we need to do is to create an implementation of the
Here we’re doing two things:
- Declaring the return type of the async stream by specifying the associated type
Item. There’s a small trick here. We’re more interested in collecting as much valid packets from multiple sources as possible rather than strictly adhering to the protocol format, so we even accept ones that cannot be decoded and simply drop them.
For this we specify the return type to be
Nonewill signal a malformed packet.
- Implementing the actual decode routine in the
decodefunction. Note that the return type of the function is actually
Result<Option<Option<...>>>. The outermost
Resultis for signaling network or protocol errors — cases we would like to avoid if possible to keep the packets flowing, so we always return
Ok. The middle
Optionis for signaling
UdpFramedto give more bytes for decoding, we use that when an empty buffer is encountered. The innermost
Optionis the actual return type carrying a (in)valid protocol data structure — the result of a (un)successful JSON deserialization of the datagram payload bytes.
Now we have everything at our hands to build a nice asynchronous
Stream of incoming beacon packets out of a plain UDP socket.
(host_id, ip_address)pairs for the received UDP packets
We open the UDP socket, wrap it in a
UDPFramed instance and also set up some transformations on the stream before returning it. These transformations include dropping the invalid packets (note the inner
mapthat will effectively translate inner
Nones to outer
Nones); extracting the source IP addresses and the
host_int field from the decoded packets, because only these two pieces of information we actually need to identify the originator devices; and finally converting the result type to
Event — more on this last step later.
Adding a countdown timer
The devices on the local network emit UDP broadcast packets at regular intervals of 30 seconds. Therefore the discovery needs to run for at least this long, but I doubled it to account for lost packets: our application waits and actively listens for 60 seconds and then prints the results.
There is a
Timeout future in the Tokio library that will fire and terminate another future or stream after a given period, however we cannot use that here because of two reasons:
- If a valid item is returned from the stream before the timeout period elapses, it will effectively cancel the timeout: it is designed for cancelling unsuccessful futures not for general termination use-cases.
- Having a console application doing seemingly nothing for a minute is not quite user friendly. It would be much nicer to have visual feedback of the remaining time.
So instead of
Timeout, we’re going to use
Interval that is a built-in stream that yields a result periodically — in this case once every second.
This is a quite straightforward implementation, but note two peculiarities:
- We include the remaining seconds in the result. However, the stream is not terminated automatically when reaching zero, so we must take care that the number can never go below zero.
Eventtype appears again as the result of this stream as well. I’ll cover the reasons in the next section.
Putting it all together
Let’s start with the
Event type right away. It’s a great example of using Rust’s feature-rich enum type to create type safe unions: it either can be a Host ID — IP Address pair or a number of remaining seconds.
As you saw earlier, we use this enum as the result type of both the UDP and the timer streams. This enables us to do the most interesting part: combining async streams together to implement application logic.
Supporting optional streams
Before we move on to that, there’s one last thing that needs to be done. As you remember, our UDP stream can handle both IP protocol versions, but only one at a time. We would like to benefit from this and build IPv4/6 support into our application: it can listen for incoming packets on either an IPv4 or IPv6 interface, or even on both of them simultaneously.
From the source code’s perspective it means that we will have one or two UDP active streams concurrently. The latter could easily be done by applying the
select combinator on the stream: it will yield the results from both streams, in order they arrive. There’s a caveat however. The underlying type of a
selected stream is different from a single stream: the first one is
Select<impl Stream<Event>, …> while the second one is some opaque
impl Stream<Event>, more precisely
FilterMap<UdpFramed<…>, …> in our case.
In the Tokio library, the
Either type serves for combining two futures of separate types where only one of them can be “active” at a given time — unlike to
Select where both child streams are active. Unfortunately
Either is not implemented for streams in the framework, only for futures. Thus, we ended up rolling our own implementation, which is actually quite straightforward.
The main function
Now that we have all building blocks available, we can finally put it together in the
main function of our application. Let’s go through the steps we’d like to implement before we jump to the code.
- First, we create the UDP streams that will emit
HostFoundevents for valid incoming packets on either the IPv4 or IPv6 interfaces, or both. We’re using our
EitherStreamimplementation to allow any combination of the two protocol versions.
- We create the other source stream that will emit
Countdownevents once every second and combine it to the UDP streams resulting in a stream of any
Events. Due to the common result type, we could do that as easy as a simple
- The next transformation will print out the remaining time to the console each time a
Countdownevent is encountered. More importantly, the
take_whileoperator will filter out all events with the remaining time counter being zero, effectively terminating the whole stream — along with the UDP sockets as well. A pretty elegant way to do so.
- If we got this far in the stream, the countdown events are no longer used for anything, they just noise in the code, so we simply filter them out, keeping only
(host_id, ip_address)pairs in the stream.
- These pairs will then be fed to a
HashMapto filter multiple packets from the same host device. Remember, our discovery process runs longer than the beacon packet interval, and a host may also broadcast over both IP protocol versions.
- When the stream is eventually terminated, the above map will contain the actual result of our application — the list of IP addresses we were looking for — so the only thing left is to display it to the user.
And basically that’s it. I’ve omitted some utility functions for clarity, but otherwise our application is now complete. 🎉
I learned a lot while creating this application. Asynchronous programming in Rust is still quite new (so is Rust itself), and requires a somewhat different mindset than you are used to, but creating production-ready applications with it is definitely possible.
The example application here is a bit extreme in the way how async streams and functional style combinators are used exclusively over the source code, making it harder to follow. Async style is not suitable for every use-case, I think that in a professional environment you should end up with a more healthier combination of async and imperative code. On the other hand, I wanted to show that once you get accustomed to this style, it can easily drag you in — and you’ll still be having fun in the meantime. 😉
ORIGINALLY PUBLISHED ON MEDIUM ON JULY 8, 2019