Collecting broadcast UDP packets using async networking in Rust

I’m a big fan of Rust, although I usually have little free time to fiddle with it. But recently when I had a task at my job to create a small, greenfield networking application I decided to take the chance and experiment with asynchronous programming and the Tokio framework in Rust. I didn’t just succeed but actually had so much fun, that I even managed to push it to the other extreme: the whole console application essentially became a single async event stream.

ORIGINALLY PUBLISHED ON MEDIUM ON JULY 8, 2019

The application’s task is to discover devices on the local network that broadcast over UDP: it listens on a given port for a minute and collects packets that adhere to a specific format, along with their originator IP addresses. In this article, I’ll only cover the interesting parts, but you can find the complete source on Tresorit’s GitHub.

Asynchronous programming enables you to continue working on tasks on a single thread while you’re application waits for some long-running operation to complete, resulting in a more efficient use of CPU time. When you’re doing async programming, you have an event loop at the core of your application, which dispatches execution to the processing functions once an external event arises. In Rust terms, you define a long-running, asynchronous task, called a Future, that will be repeatedly polled by the Executoruntil the external condition it’s waiting for is fulfilled and it will finally return its actual result. Rust’s nice functional-style operators also appear here that allows you to chain and combine your futures to implement a more complex, yet fully asynchronous tasks without sacrificing efficiency.

From datagram bytes to a packet stream

The UDP packets we’re looking for have a simple JSON payload, so we’re starting with declaring that as a regular Rust struct, along with annotations for the powerful Serde library that will do the heavy lifting of JSON deserialization for us.

JSON payload object as a Rust structure, to be deserialized by Serde JSON

The Tokio framework gives you all building blocks to start doing async networking, starting with UdpSocket and UdpFramed. The latter converts incoming UDP datagrams into an async Stream: a long-running async operation which, unlike to a simple Future, can yield several results of the same type before being terminated — an ideal async model for a network connection.

An UdpFramed instance can even assemble higher level protocol data structures for us out of the network byte stream, regardless of how they are aligned within the stream; they can even span over multiple datagrams. All we need to do is to create an implementation of the Decoder trait.

Continuously decoding JSON payload from UDP datagram byte stream

Here we’re doing two things:

  1. Declaring the return type of the async stream by specifying the associated type Item. There’s a small trick here. We’re more interested in collecting as much valid packets from multiple sources as possible rather than strictly adhering to the protocol format, so we even accept ones that cannot be decoded and simply drop them.
    For this we specify the return type to be Option<BeaconPacket>, where None will signal a malformed packet.
  2. Implementing the actual decode routine in the decode function. Note that the return type of the function is actually Result<Option<Option<...>>>. The outermost Result is for signaling network or protocol errors — cases we would like to avoid if possible to keep the packets flowing, so we always return Ok. The middle Option is for signaling UdpFramed to give more bytes for decoding, we use that when an empty buffer is encountered. The innermost Option is the actual return type carrying a (in)valid protocol data structure — the result of a (un)successful JSON deserialization of the datagram payload bytes.

Now we have everything at our hands to build a nice asynchronous Stream of incoming beacon packets out of a plain UDP socket.

Building an async stream that returns (host_id, ip_address) pairs for the received UDP packets

We open the UDP socket, wrap it in a UDPFramed instance and also set up some transformations on the stream before returning it. These transformations include dropping the invalid packets (note the inner mapthat will effectively translate inner Nones to outer Nones); extracting the source IP addresses and the host_int field from the decoded packets, because only these two pieces of information we actually need to identify the originator devices; and finally converting the result type to Event — more on this last step later.

Adding a countdown timer

The devices on the local network emit UDP broadcast packets at regular intervals of 30 seconds. Therefore the discovery needs to run for at least this long, but I doubled it to account for lost packets: our application waits and actively listens for 60 seconds and then prints the results.

Photo by Agê Barros on Unsplash

There is a Timeout future in the Tokio library that will fire and terminate another future or stream after a given period, however we cannot use that here because of two reasons:

  1. If a valid item is returned from the stream before the timeout period elapses, it will effectively cancel the timeout: it is designed for cancelling unsuccessful futures not for general termination use-cases.
  2. Having a console application doing seemingly nothing for a minute is not quite user friendly. It would be much nicer to have visual feedback of the remaining time.

So instead of Timeout, we’re going to use Interval that is a built-in stream that yields a result periodically — in this case once every second.

Building an async stream producing countdown timer events

This is a quite straightforward implementation, but note two peculiarities:

  1. We include the remaining seconds in the result. However, the stream is not terminated automatically when reaching zero, so we must take care that the number can never go below zero.
  2. The Event type appears again as the result of this stream as well. I’ll cover the reasons in the next section.

Putting it all together

Let’s start with the Event type right away. It’s a great example of using Rust’s feature-rich enum type to create type safe unions: it either can be a Host ID — IP Address pair or a number of remaining seconds.

A simple enum to carry possible stream events

As you saw earlier, we use this enum as the result type of both the UDP and the timer streams. This enables us to do the most interesting part: combining async streams together to implement application logic.

Supporting optional streams

Before we move on to that, there’s one last thing that needs to be done. As you remember, our UDP stream can handle both IP protocol versions, but only one at a time. We would like to benefit from this and build IPv4/6 support into our application: it can listen for incoming packets on either an IPv4 or IPv6 interface, or even on both of them simultaneously.

From the source code’s perspective it means that we will have one or two UDP active streams concurrently. The latter could easily be done by applying the select combinator on the stream: it will yield the results from both streams, in order they arrive. There’s a caveat however. The underlying type of a selected stream is different from a single stream: the first one is Select<impl Stream<Event>, …> while the second one is some opaque impl Stream<Event>, more precisely FilterMap<UdpFramed<…>, …> in our case.

In the Tokio library, the Either type serves for combining two futures of separate types where only one of them can be “active” at a given time — unlike to Select where both child streams are active. Unfortunately Either is not implemented for streams in the framework, only for futures. Thus, we ended up rolling our own implementation, which is actually quite straightforward.

Combinator type for selecting one of two separate underlying streams

The main function

Now that we have all building blocks available, we can finally put it together in the main function of our application. Let’s go through the steps we’d like to implement before we jump to the code.

Visualization of the event pipeline by Ábel Siptár
  1. First, we create the UDP streams that will emit HostFound events for valid incoming packets on either the IPv4 or IPv6 interfaces, or both. We’re using our EitherStream implementation to allow any combination of the two protocol versions.
  2. We create the other source stream that will emit Countdown events once every second and combine it to the UDP streams resulting in a stream of any Events. Due to the common result type, we could do that as easy as a simple select call.
  3. The next transformation will print out the remaining time to the console each time a Countdown event is encountered. More importantly, the take_while operator will filter out all events with the remaining time counter being zero, effectively terminating the whole stream — along with the UDP sockets as well. A pretty elegant way to do so.
  4. If we got this far in the stream, the countdown events are no longer used for anything, they just noise in the code, so we simply filter them out, keeping only (host_id, ip_address) pairs in the stream.
  5. These pairs will then be fed to a HashMap to filter multiple packets from the same host device. Remember, our discovery process runs longer than the beacon packet interval, and a host may also broadcast over both IP protocol versions.
  6. When the stream is eventually terminated, the above map will contain the actual result of our application — the list of IP addresses we were looking for — so the only thing left is to display it to the user.
The complete event processor pipeline in the main function of the application

And basically that’s it. I’ve omitted some utility functions for clarity, but otherwise our application is now complete. 🎉

Conclusion

I learned a lot while creating this application. Asynchronous programming in Rust is still quite new (so is Rust itself), and requires a somewhat different mindset than you are used to, but creating production-ready applications with it is definitely possible.

Photo by Donald Giannatti on Unsplash

The example application here is a bit extreme in the way how async streams and functional style combinators are used exclusively over the source code, making it harder to follow. Async style is not suitable for every use-case, I think that in a professional environment you should end up with a more healthier combination of async and imperative code. On the other hand, I wanted to show that once you get accustomed to this style, it can easily drag you in — and you’ll still be having fun in the meantime. 😉

ORIGINALLY PUBLISHED ON MEDIUM ON JULY 8, 2019

Leave a Reply

Your email address will not be published. Required fields are marked *

*